In [None]:
# Version History
#print('Version 1.0.0: 08/26/2022 3:48pm - Nate Calvanese - Moving transformation functions to a shared utility notebook')
#print('Version 1.0.1: 08/30/2022 11:00am - Nate Calvanese - Moving in additional utility functions')
#print('Version 1.0.2: 08/30/2022 11:00am - Nate Calvanese - Moving in FAPI workspace functions')
#print('Version 1.0.3: 09/16/2022 10:46am - Nate Calvanese - Update to find_file_in_inventory function to deal with NaN')
#print('Version 1.0.4: 09/19/2022 5:06pm - Nate Calvanese - Added find_fileref_fields function')
print('Version 1.0.5: 09/23/2022 4:29pm - Nate Calvanese - Moved in ELT pipeline functions')


# Imports

In [9]:
import import_ipynb
import uuid
import pandas as pd
from time import sleep
from firecloud import api as fapi
import data_repo_client
import json
import re
import logging
import datetime
from time import sleep
from google.cloud import storage
from google.cloud import bigquery
import google.auth
import google.auth.transport.requests
import source_files_creation as sfc
import build_file_inventory as bfi
import process_table_data as ptd
import build_mapping_query as bmq
import output_data_validation as odv

# Configure logging format
logging.basicConfig(format="%(asctime)s - %(levelname)s: %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p", level=logging.INFO)


importing Jupyter notebook from source_files_creation.ipynb
Version 1.0.3: 09/23/2022 11:53am - Nate Calvanese - Made source workspace configurable
importing Jupyter notebook from ingest_pipeline_utilities.ipynb
Version 1.0.5: 09/23/2022 11:13am - Nate Calvanese - Moved in ELT pipeline functions
importing Jupyter notebook from build_file_inventory.ipynb
Version 1.0.0: 09/08/2022 07:56pm - Nate Calvanese - Initial Version
importing Jupyter notebook from process_table_data.ipynb
Version: 1.0.1: 9/16/2022 10:57am - Nate Calvanese - Fixed bug in file_inventory table creation
importing Jupyter notebook from build_mapping_query.ipynb
Version 1.0.5: 09/21/2022 11:58am - Nate Calvanese - Made multi-column array agg return array with distinct values
importing Jupyter notebook from output_data_validation.ipynb
Version 2.0.2: 09/14/2022 10:23am -- Made output directory and validation schema more configurable
phs1234


# Transformation Functions

In [28]:
# Function to convert list represented as string to a list data type
def str_list_to_list(in_str, list_delim):
    out_list = []
    out_list = in_str.split(sep=list_delim)
    return out_list

# Function to concatenate a string value to each entry in a list (either 'prefix' or 'suffix')
def concat_str_to_list(in_str, in_list, delim='_', mode='prefix'):
    out_list = []
    for item in in_list:
        if mode == 'prefix':
            out_list.append(in_str + delim + item)
        elif mode == 'suffix':
            out_list.append(item + delim + instr)
        else:
            out_list.append(item)
    return out_list

# Function to convert non-null values from a list of columns into a single list
def df_cols_to_list(in_list):
    out_list = []
    for entry in in_list:
        if isinstance(entry, list):
            for item in entry:
                if pd.notnull(item):
                    out_list.append(str(item))
        else:
            if pd.notnull(entry):
                out_list.append(str(entry))
    return out_list

# Function to add value to existing list (or create new list)
def add_to_list(curr_value, new_value):
    new_list = []
    if new_value == None:
        if isinstance(curr_value, list):
            new_list = curr_value
        else:
            new_list.append(curr_value)
    elif isinstance(new_value, list):
        if curr_value == None:
            new_list = new_value
        elif isinstance(curr_value, list):
            new_list = curr_value
            for item in new_value:
                if item not in curr_value:
                    new_list.append(item)      
        elif not isinstance(curr_value, list):
            new_list.append(curr_value)
            for item in new_value:
                if item != curr_value:
                    new_list.append(item) 
    elif not isinstance(new_value, list):
        if curr_value == None:
            new_list.append(new_value)
        elif isinstance(curr_value, list):
            new_list = curr_value
            if new_value not in curr_value:
                new_list.append(new_value)         
        elif not isinstance(curr_value, list):
            new_list.append(curr_value)
            if new_value != curr_value:
                new_list.append(new_value)
    return_list = [str(x) for x in new_list]
    return return_list 

# Function to apply uuid_v5 hash to input
def uuid_hash(value):
    try:
        if isinstance(value, list):
            out_list = []
            for item in value:
                out_list.append(str(uuid.uuid5(uuid.NAMESPACE_OID, str(item))))
            return out_list
        else:
            out_string = str(uuid.uuid5(uuid.NAMESPACE_OID, str(value)))
            return out_string
    except:
        return

# Function to retrieve and apply vocabulary mapping
def map_column_values(col, map_file_path, null_unmatched=False):
    try:
        # Read file into dataframe and convert to dictionary
        df_map_file = pd.read_csv(map_file_path, delimiter = ',')
        src_col_name = df_map_file.columns[0]
        tar_col_name = df_map_file.columns[1]
        map_dict = df_map_file.set_index(src_col_name).to_dict()[tar_col_name]

        # Iterate through series items and map
        for idx, val in col.iteritems():
            if not isinstance(val, (list, dict, set, tuple)):
                if null_unmatched == False:
                    col.at[idx] = map_dict.get(val, val)
                else:
                    col.at[idx] = map_dict.get(val, None)
            elif isinstance(val, list):
                new_list = []
                for entry in val:
                    if null_unmatched == False:
                        new_list.append(map_dict.get(entry, entry))
                    else:
                        new_list.append(map_dict.get(entry, None))
                col.at[idx] = list(filter(None, new_list))
            else:
                if null_unmatched == True:
                    col.at[idx] = None
        return col
    except FileNotFoundError:
        print('Optional map file {path} not found. File will not be used.'.format(path = map_file_path))
        return col
    except IndexError:
        print('Optional map file {path} is malformed (two columns expected: source and target). File will not be used.'.format(path = map_file_path))
        return col
    except:
        print('Unknown error occurred while applying map file {path}. Mapping may not be fully applied.'.format(path = map_file_path))
        return col

# Function to return first value from list that is a subset of the search term
def get_match_val_in_list(search_val, search_list):
    if len(search_list) == 0:
        return None
    elif len(search_list) == 1:
        return search_list[0]
    else:
        return_str = ''
        for item in search_list:
            if item in search_val:
                return_str = item
                break
        return return_str 

# Function to search for file in file inventory and return the associated fileref object
def find_file_in_inventory(search_string, file_inventory, return_field="file_id", match_multi=True, match_regex=""):
    # Intialize variables
    fileref_obj = []
    if match_regex == "" or match_regex == None:
        match_regex = ""
    
    # Loop through file inventory, record fileref_obj entry where matches are found, and return matches
    match_cnt = 0
    if not (search_string == None or pd.isna(search_string)):
        for entry in file_inventory:
            if search_string in entry['uri'] and re.search(match_regex, entry['uri']):
                match_cnt += 1
                if return_field == 'file_id':
                    fileref_obj.append(entry['file_id'])
                else:
                    fileref_obj.append(entry['file_ref'])
                if match_multi == False:
                    break
    if match_multi == True:
        if match_cnt == 0:
            return []
        else:
            return fileref_obj
    else:
        if match_cnt == 0:
            return None
        else:
            return fileref_obj[0]

# Function to encode table/field names (removing unwanted characters, setting to lower case, etc.)
def encode_name(string):
    out_str = string.strip()
    out_str = re.sub('^entity:', '', out_str) # Specific to the workspace case, should prob be configurable
    out_str = re.sub('[\-\.\-:]', '_', out_str)
    out_str = re.sub('[^a-z0-9_]', '', out_str)
    if re.match('^[_0-9]+', out_str):
        out_str = 't_' + out_str
    return out_str

# Function to map data types to TDR types
def map_datatype(string):
    lookup = {
        'string': 'string',
        'int': 'integer',
        'int8': 'integer',
        'int16': 'integer', 
        'int32': 'integer', 
        'int64': 'integer', 
        'boolean': 'boolean',
        'float': 'float',
        'float32': 'float',
        'float64': 'float'
    }
    return lookup.get(string.lower(), 'other')

# Function to construct relationship object between two fields
def construct_relationship(src_table, src_column, tar_table, tar_column):
    rel_name = src_table + '_' + src_column + '__to__' + tar_table + '_' + tar_column
    rel_dict = {'name': rel_name, 'from': {'table': src_table, 'column': src_column}, 'to': {'table': tar_table, 'column': tar_column}}
    return rel_dict

# Function to properly format a PHS id
def format_phs_id(input_str):
    try:
        num = re.search("phs0*([1-9]+)", input_str, re.IGNORECASE).group(1)
    except:
        num = ""
    if num:
        output_str = "phs" + str(num).zfill(6)
    else:
        output_str = ""
    return output_str


# Firecloud Utility Functions

In [None]:
# Function to pull and format workspace attributes
def get_workspace_attributes(ws_project, ws_name):
    # Pull workspace attributes
    ws_attributes = fapi.get_workspace(ws_project, ws_name, fields="workspace.attributes, workspace.authorizationDomain, workspace.googleProject, workspace.bucketName").json()
    # Format workspace attributes (replace nested dictionaries with the lists they represent, for example)
    for key, val in ws_attributes["workspace"]["attributes"].items():
        if isinstance(val, dict):
            if val.get("items") != None:
                ws_attributes["workspace"]["attributes"][key] = val["items"]
    # Add additional attributes for PHS ID and Workspace Name
    ws_attributes["workspace"]["project"] = ws_project
    ws_attributes["workspace"]["name"] = ws_name
    ws_attributes["workspace"]["attributes"]["phs_id"] = ""
    if ws_attributes["workspace"]["attributes"].get("tag:tags"):
        for item in ws_attributes["workspace"]["attributes"]["tag:tags"]:
            if 'dbgap' in item.lower(): 
                ws_attributes["workspace"]["attributes"]["phs_id"] = item.lower().split(":", 1)[1].strip()
    return ws_attributes["workspace"]

# Function to scan entity fields for potential file references
def find_and_add_fileref_fields(ws_project, ws_name, data_files_bucket, data_file_refs_dict, rows_to_scan=100):
    # Pre-process data_file_refs_dict
    data_file_refs_set = set()
    for key, val in data_file_refs_dict.items():
        entity_name = re.sub("^ws_", "", key.split(".")[0])
        for column_entry in val:
            data_file_refs_set.add(entity_name + "." + column_entry["column"])
    
    # Get list of entity types
    response_etypes = fapi.list_entity_types(ws_project, ws_name)
    dict_all_etypes = json.loads(response_etypes.text)
    etypes_list = [key for key in dict_all_etypes.keys()]
    
    # Loop through entity types
    file_path = "gs://" + data_files_bucket
    fileref_attr_set = set()
    for etype in etypes_list:
        entities_resp = fapi.get_entities(ws_project, ws_name, etype)
        entities_dict = json.loads(entities_resp.text)
        for i in range(0, rows_to_scan):
            try:
                for key, val in entities_dict[i]["attributes"].items():
                    if file_path in val:
                        fileref_attr_set.add(etype + "." + key)
            except:
                break
    
    # Determine if field is already in data_file_refs dictionary and add if not
    final_fileref_attr_set = set()
    if fileref_attr_set:
        for fileref in fileref_attr_set:
            if fileref not in data_file_refs_set:
                final_fileref_attr_set.add(fileref)
                file = "ws_" + fileref.split(".")[0] + ".tsv"
                column = fileref.split(".")[1]
                column_dict = {
                    "column": column,
                    "method": "file_path_match",
                    "match_multiple_files": True, 
                    "match_regex": None,
                    "create_new_field": True,
                    "new_field_name": column + "_file_id"
                }
                if data_file_refs_dict.get(file):
                    data_file_refs_dict.append(column_dict)
                else:
                    data_file_refs_dict[file] = [column_dict]
    else:
         final_fileref_attr_set = fileref_attr_set
    return list(final_fileref_attr_set), data_file_refs_dict   


# TDR Utility Functions

In [None]:
# Class containing TDR functions
class TdrUtils:
    
    # Initialization function
    def __init__(self, jobs_api):
        self.jobs_api = jobs_api

    # Function to wait for TDR job completion and return results
    def wait_for_job(self, job_model):
        result = job_model
        print("TDR Job ID: " + job_model.id)
        while True:
            if result == None or result.job_status == "running":
                sleep(10)
                result = self.jobs_api.retrieve_job(job_model.id)
            elif result.job_status == 'failed':
                return self.jobs_api.retrieve_job_result(job_model.id), job_model.id
            elif result.job_status == "succeeded":
                return self.jobs_api.retrieve_job_result(job_model.id), job_model.id
            else:
                raise "Unrecognized job state: {}".format(result.job_status)

# "EL" Pipeline Functions

In [4]:
# Function to determine whether a specified file exists in cloud storage
def file_exists(key, params):
    logging.info(f"Checking for file {key}")
    storage_client = storage.Client()
    bucket = storage_client.bucket(params["ws_bucket_name"])
    return storage.Blob(bucket=bucket, name=key).exists(storage_client)

# Function to find and execute ingests
def run_el_ingests(dataset_id, params, ingest_list):
    for table in ingest_list:
        target_table = table
        source_file_name = table + ".json"
        source_rel_file_path = "{}/{}".format(params["el_output_dir"], source_file_name)
        source_full_file_path = "{}/{}/{}".format(params["ws_bucket"], params["el_output_dir"], source_file_name)
        if file_exists(source_rel_file_path, params):
            logging.info(f"Running ingest from {source_file_name} to table {target_table}")
            ingest_request = {
                "table": target_table,
                "profile_id": params["profile_id"],
                "ignore_unknown_values": True,
                "resolve_existing_files": True,
                "updateStrategy": "replace",
                "format": "json",
                "load_tag": "Ingest for {}".format(params["src_ws_name"]),
                "path": source_full_file_path
            }
            attempt_counter = 0
            while True:
                try:
                    ingest_request_result, job_id = tdr_utils.wait_for_job(datasets_api.ingest_dataset(id=dataset_id, ingest=ingest_request))
                    logging.info("Ingest from file {} succeeded: {}".format(source_file_name, str(ingest_request_result)[0:1000]))
                    params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Ingests", f"File: {source_file_name}", "Success", "Job_ID: {} - Truncated Response: {}".format(job_id, str(ingest_request_result)[0:1000])])
                    break
                except Exception as e:
                    logging.error("Error on Dataset Ingest: {}".format(str(e)))
                    attempt_counter += 1
                    if attempt_counter < 2:
                        logging.info("Retrying Dataset Ingest (attempt #{})...".format(str(attempt_counter)))
                        sleep(10)
                        continue
                    else:
                        logging.error("Maximum number of retries exceeded. Recording error to pipeline results.")
                        params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Ingests", f"File: {source_file_name}", "Error", str(e)])
                        break
        else:
            logging.warning(f"Source table data file does not exist.  Skipping: {source_file_name}")
            params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Ingests", f"File: {source_file_name}", "Warning", f"Source table data file does not exist.  Skipping: {source_file_name}"])
            continue

# Function to add TDR dataset SA to the appropriate Terra groups and workspace ACLs
def set_up_dataset_ingest_sa(dataset_id, params, new_dataset):
    logging.info("Setting up dataset ingest service account (SA)")
    # Collect dataset-specific SA from TDR
    dataset_details = datasets_api.retrieve_dataset(id=dataset_id)
    service_account = dataset_details.ingest_service_account
    if not service_account:
        error_message = "No dataset ingest service account found. Ensure the dedicatedIngestServiceAccount parameter has been set to True on dataset creation."
        logging.error(error_message)
        params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Service Account Setup", "Add SA to Anvil Ingest Group", "Error", error_message])
        params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Service Account Setup", "Add SA to Workspace", "Error", error_message])
    else:
        # Add SA user to the Anvil Ingest Terra Group
        anvil_ingest_sa_group = "anvil_tdr_ingest" # full email: anvil_tdr_ingest@firecloud.org
        res = fapi.add_user_to_group(anvil_ingest_sa_group, "member", service_account)
        if res.status_code != 204: 
            if new_dataset:
                error_message = f"Error adding dataset ingest SA to {anvil_ingest_sa_group} group: {res.text}"
                logging.error(error_message)
                params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Service Account Setup", "Add SA to Anvil Ingest Group", "Error", error_message])
            else:
                error_message = f"Error adding dataset ingest SA to {anvil_ingest_sa_group} group (Note that it may have already been added when the dataset SA was first created): {res.text}"
                logging.warning(error_message)
                params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Service Account Setup", "Add SA to Anvil Ingest Group", "Warning", error_message])
        else:
            logging.info(f"Dataset service account {service_account} added to {anvil_ingest_sa_group}")
            params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Service Account Setup", "Add SA to Anvil Ingest Group", "Success", service_account])

        # Add SA user to workspace as Reader
        acl_updates=[{"email": service_account,
                     "accessLevel":"READER",
                     "canShare":False}]
        res = fapi.update_workspace_acl(params["ws_project"], params["ws_name"], acl_updates=acl_updates)
        if res.status_code != 200:
            if new_dataset:
                error_message = f"Error adding dataset ingest SA to workspace: {res.text}"
                logging.error(error_message)
                params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Service Account Setup", "Add SA to Workspace", "Error", error_message])
            else:
                error_message = f"Error adding dataset ingest SA to workspace (Note that it may have already been added when the dataset SA was first created): {res.text}"
                logging.warning(error_message)
                params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Service Account Setup", "Add SA to Workspace", "Warning", error_message])
        else:
            logging.info(f"Dataset service account {service_account} added to workspace as READER (without SHARE) permissions successfully.")
            params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Service Account Setup", "Add SA to Workspace", "Success", service_account])

# Function to create TDR dataset (with retry logic)
def create_tdr_dataset(dataset_id, params) -> str:
    logging.info("Creating new dataset: {}".format(params["dataset_name"]))
    # Pull down TDR schema from cloud storage
    try:
        storage_client = storage.Client()
        bucket = storage_client.get_bucket(params["ws_bucket_name"])
        schema_blob = bucket.blob(params["el_schema_file"])
        schema = json.loads(schema_blob.download_as_string(client=None))
    except Exception as e:
        logging.error("Error retrieving TDR schema object: {}".format(str(e)))
        params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Creation or Retrieval", "Create New Dataset", "Error", "Error retrieving TDR schema object: {}".format(str(e))])
        return
    
    # Build and execute dataset creation request
    properties_dict = {
        "consent_name": params["consent_name"],
        "auth_domains": params["auth_domains"],
        "source_workspaces": [params["src_ws_name"]]
    }
    dataset_request = {
        "name": params["dataset_name"],
        "description": "TDR Dataset for workspace ".format(params["src_ws_name"]),
        "defaultProfileId": params["profile_id"],
        "region": "us-central1",
        "cloudPlatform": "gcp",
        "phsId": params["phs_id"],
        "experimentalSelfHosted": True,
        "dedicatedIngestServiceAccount": True,
        "properties": properties_dict,
        "schema": schema
    }
    attempt_counter = 0
    while True:
        try:
            create_dataset_result, job_id = tdr_utils.wait_for_job(datasets_api.create_dataset(dataset=dataset_request))
            logging.info("Dataset Creation succeeded: {}".format(create_dataset_result))
            dataset_id = create_dataset_result["id"]
            params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Creation or Retrieval", "Create New Dataset", "Success", "Job_ID: {} - Truncated Response: {}".format(job_id, str(create_dataset_result)[0:1000])])
            try:
                resp = datasets_api.add_dataset_policy_member(id=dataset_id, policy_name="steward", policy_member={"email": "anvil_tdr_ingest@firecloud.org"})
            except:
                logging.warning("Error on adding additional policy members to dataset: {}".format(resp))
            return dataset_id
        except Exception as e:
            logging.error("Error on Dataset Creation: {}".format(str(e)))
            attempt_counter += 1
            if attempt_counter < 2:
                logging.info("Retrying Dataset Creation (attempt #{})...".format(str(attempt_counter)))
                sleep(10)
                continue
            else:
                logging.error("Maximum number of retries exceeded. Recording error to pipeline results.")
                params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Creation or Retrieval", "Create New Dataset", "Error", str(e)])
                return dataset_id

# Function to patch TDR dataset (with retry logic)
def patch_tdr_dataset(dataset_id, params) -> str:
    logging.info("Patching properties for existing dataset: {}".format(params["dataset_name"]))
    # Retrieve existing data set properties
    try:
        dataset_info = datasets_api.retrieve_dataset(id=dataset_id, include=["PROPERTIES"]).to_dict()
        auth_domains = dataset_info["properties"]["auth_domains"]
        src_workspaces = dataset_info["properties"]["source_workspaces"]
        for ad in params["auth_domains"]:
            if ad not in auth_domains:
                auth_domains.append(ad)
        if params["src_ws_name"] not in src_workspaces:
            src_workspaces.append(params["src_ws_name"])
        properties_dict = {
            "consent_name": params["consent_name"],
            "auth_domains": auth_domains,
            "source_workspaces": src_workspaces
        }
    except:
        properties_dict = {
            "consent_name": params["consent_name"],
            "auth_domains": params["auth_domains"],
            "source_workspaces": [params["src_ws_name"]]
        }
    # Execute dataset patch request
    attempt_counter = 0
    while True:
        try:
            phs_id = params["phs_id"]
            resp = datasets_api.patch_dataset(id=dataset_id, dataset_patch_request_model={"phsId": phs_id, "properties": properties_dict})
            params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Creation or Retrieval", "Patch Existing Dataset", "Success", resp])
            return dataset_id
        except Exception as e:
            logging.error("Error on Dataset Patch: {}".format(str(e)))
            attempt_counter += 1
            if attempt_counter < 2:
                logging.info("Retrying Dataset Patch (attempt #{})...".format(str(attempt_counter)))
                sleep(10)
                continue
            else:
                logging.error("Maximum number of retries exceeded. Recording error to pipeline results.")
                params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Creation or Retrieval", "Patch Existing Dataset", "Error", str(e)])
                return dataset_id

# Function to create or retrieve specified TDR dataset
def create_or_retrieve_dataset(params):
    logging.info("Attempting to create or retrieve the specified TDR dataset")
    # Pull down TDR schema from cloud storage
    dataset_id = ""
    new_dataset = False
    try:
        dataset_list = datasets_api.enumerate_datasets(filter=params["dataset_name"])
        if dataset_list.items:
            for dataset in dataset_list.items:
                if dataset.name == params["dataset_name"]:
                    dataset_id = str(dataset.id)
        params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Creation or Retrieval", "Enumerate Datasets", "Success", "{} datasets found. Matching dataset_id = {}".format(len(dataset_list.items), dataset_id)])
    except Exception as e:
        logging.error("Error on Dataset Enumeration: {}".format(str(e)))
        params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Creation or Retrieval", "Enumerate Datasets", "Error", str(e)])
        return
    if dataset_id == "":
        dataset_id = create_tdr_dataset(dataset_id, params)
        new_dataset = True
        return dataset_id, new_dataset
    else:
        logging.info("Dataset already exists! ID = {}".format(dataset_id))
        patch_tdr_dataset(dataset_id, params)
        return dataset_id, new_dataset

# Function to run file inventory build step, with retry logic
def run_build_file_inventory(params):
    inventory = []
    attempt_counter = 0
    while True:
        try:
            inventory = bfi.build_inventory(params)
            logging.info("File Inventory build succeeded. {} files found.".format(len(inventory)))
            params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "File Inventory Creation", "Build File Inventory", "Success", "{} files found".format(len(inventory))])
            return inventory
        except Exception as e:
            logging.error("Error on File Inventory Creation: {}".format(str(e)))
            attempt_counter += 1
            if attempt_counter < 2:
                logging.info("Retrying File Inventory Creation (attempt #{})...".format(str(attempt_counter)))
                sleep(10)
                continue
            else:
                logging.error("Maximum number of retries exceeded. Recording error to pipeline results.")
                params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "File Inventory Creation", "Build File Inventory", "Error", str(e)])
                return inventory
    return inventory  

# Function to orchestrate the various components of the ingest pipeline
def run_el_pipeline(workspace, params):
    
    # Setup Google creds and establish TDR clients
    creds, project = google.auth.default()
    auth_req = google.auth.transport.requests.Request()
    creds.refresh(auth_req)
    config = data_repo_client.Configuration()
    config.host = "https://data.terra.bio"
    config.access_token = creds.token
    api_client = data_repo_client.ApiClient(configuration=config)
    api_client.client_side_validation = False
    global datasets_api
    datasets_api = data_repo_client.DatasetsApi(api_client=api_client)
    global jobs_api
    jobs_api = data_repo_client.JobsApi(api_client=api_client)
    global tdr_utils
    tdr_utils = TdrUtils(jobs_api)
    
    # Step 1: Set Variables for Pipeline
    workspace_name = workspace[0] 
    logging.info("Starting Extract and Load (EL) Pipeline for {}.".format(workspace_name))
    params["src_ws_name"] = workspace_name
    params["src_ws_project"] = workspace[1]
    params["dataset_name"] = workspace[2]
    params["input_dir"] = "ingest_pipeline/input/{}/table_data".format(workspace_name)
    params["file_inventory_dir"] = "ingest_pipeline/input/{}/data_files/file_inventory".format(workspace_name)
    params["el_output_dir"] = "ingest_pipeline/output/source/{}/table_data".format(workspace_name)
    params["el_schema_file"] = "ingest_pipeline/output/source/{}/schema/tdr_schema_object.json".format(workspace_name)
    current_datetime = datetime.datetime.now()
    current_datetime_string = current_datetime.strftime("%Y%m%d%H%M")
    params["pipeline_results"] = []

    # Step 2: Create Source Files
    if params["skip_source_files_creation"] == True:
        logging.info("Skipping source files creation.")
        params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Source Files Creation", "Create Source Files", "Skipped", "User request"])
    else:
        logging.info("Running source files creation.")
        try:
            log_status, log_string = sfc.create_source_table_data_files(params)
            params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Source Files Creation", "Create Source Files", log_status, log_string])
        except Exception as e:
            logging.error("Error creating source files: {}".format(str(e)))
            params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Source Files Creation", "Create Source Files", "Error", str(e)])
    
    # Step 3: Build File Inventory
    if params["skip_file_inventory_creation"] == True:
        logging.info("Skipping file inventory creation.")
        file_inventory = {}
        params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "File Inventory Creation", "Build File Inventory", "Skipped", "User request"])
    else:
        logging.info("Building file inventory.")
        file_inventory = run_build_file_inventory(params)
    params["file_inventory"] = file_inventory
    
    # Step 4: Process Table Data
    if params["skip_table_data_processing"] == True:
        logging.info("Skipping table data processing.")
        target_tables = {}
        params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Table Data Processing", "Ingest Pre-Processing", "Skipped", "User request"])
    else:
        logging.info("Processing table data for ingest.")
        fileref_list, params["data_file_refs"] = find_and_add_fileref_fields(params["src_ws_project"], params["src_ws_name"], params["data_files_src_bucket"], params["data_file_refs"])
        logging.info("Additional file reference fields found and marked for processing: " + ", ".join(fileref_list))
        target_tables, log_status, log_string = ptd.process_table_data(params)
        params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Table Data Processing", "Ingest Pre-Processing", log_status, log_string])
    
    # Step 5: Create or Retrieve Dataset
    dataset_id, new_dataset = create_or_retrieve_dataset(params)
    if len(dataset_id) == 0:
        logging.error("No TDR dataset created or retrieved. Exiting pipeline.")
        return dataset_id, params["pipeline_results"]
    else:
        params["dataset_id"] = dataset_id
    if new_dataset == True:
        logging.info("Sleeping for a few minutes to let policy/permission changes propogate for new dataset...")
        sleep(60)
    
    # Step 6: Set up Dataset-specific Service Account
    set_up_dataset_ingest_sa(dataset_id, params, new_dataset)
    logging.info("Sleeping for a few minutes to let policy/permission changes propogate...")
    if new_dataset == True:
        sleep(300)
    else:
        sleep(60)
    
    # Step 7: Ingest Data to Dataset
    if params["skip_ingests"] == True:
        logging.info("Skipping dataset ingests")
        params["pipeline_results"].append([params["src_ws_name"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Ingests", "File: All", "Skipped", "User request"])
    else:
        logging.info("Running dataset ingests")
        if len(params["ingest_list_override"]) > 0:
            target_tables = params["ingest_list_override"]
        run_el_ingests(dataset_id, params, target_tables)   
    
    # Aggregate and Write Out Pipeline Results
    df_results = pd.DataFrame(params["pipeline_results"], columns = ["Workspace", "Time", "Step", "Task", "Status", "Message"])
    output_file_path = "pipeline_results_log_" + current_datetime_string + ".tsv"
    destination_dir = params["ws_bucket"] + "/ingest_pipeline/output/source/{}/logs".format(workspace_name)
    df_results.to_csv(output_file_path, index=False, sep="\t")
    !gsutil cp $output_file_path $destination_dir/ 2> stdout
    !rm $output_file_path
    
    # Display Pipeline Results
    logging.info("The ingest pipeline has completed for {}.".format(workspace_name))
    logging.info("Pipeline Results:")
    display(df_results)
    return dataset_id, params["pipeline_results"]


# "T" Pipeline Functions

In [3]:
# Function to look up dbgap consent codes
def lookup_consent_code(params, phs_id, consent_name):
    logging.info("Attempting to lookup consent code using PHS: {} and Consent Name: {}".format(phs_id, consent_name))
    # Attempt to read source files into data frame, checking for missing file
    src_file_path = params["ws_bucket"] + "/ingest_pipeline/resources/consent_code_lookup/dbgap_consents.csv"
    try:
        df = pd.read_csv(src_file_path, delimiter = ',')
    except:
        err = "Consent code lookup file not found at {}".format(src_file_path)
        logging.error(err)
        params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Snapshot Creation", "Lookup Consent Code", "Warning", err])
        return consent_name
    # Map consent name to consent code and return string
    df2 = df[(df.phs == phs_id) & (df.consent_short_name == consent_name)]
    if len(df2) > 0:
        consent_code = "c" + str(df2["consent_code"].values[0])
        return consent_code
    else:
        return consent_name
            
# Function to create and share a snapshot            
def create_and_share_snapshot(params):
    logging.info("Creating full-view snapshot")
    # Attempt to collect and map consent code
    try:
        phs_short_id = re.search('([0-9]{4,6})', params["phs_id"]).group(1)
    except:
        phs_short_id = ""
    if len(params["consent_name"]) > 0 and len(phs_short_id) > 0:
        consent_code = lookup_consent_code(params, int(phs_short_id), params["consent_name"])
    else:
        logging.warning("Unable to lookup consent code. Consent name and/or PHS ID missing for lookup.")
        consent_code = params["consent_name"]
        params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Snapshot Creation", "Lookup Consent Code", "Warning", "Consent name and/or PHS ID missing for lookup"])
    
    # Create reader list for snapshot, including auth domain(s) on WS
    reader_list = params["snapshot_readers_list"]
    auth_domain_list = params["auth_domains"] 
    for ad_entry in auth_domain_list:
        reader_list.append(ad_entry + "@firecloud.org")
    
    # Create and submit snapshot creation request
    logging.info("Submitting snapshot request")
    snapshot_req = {
        "name": params["snapshot_name"],
        "description": "Full view snapshot of " + params["dataset_name"],
        "consentCode": consent_code,
        "contents": [{
            "datasetName": params["dataset_name"],
            "mode": "byFullView"
        }],
        "readers": reader_list,
        "profileId": params["profile_id"]
    }
    attempt_counter = 0
    while True:
        try:
            create_snapshot_result, job_id = tdr_utils.wait_for_job(snapshots_api.create_snapshot(snapshot=snapshot_req))
            logging.info("Snapshot Creation succeeded: {}".format(create_snapshot_result))
            params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Snapshot Creation", "Create and Share Snapshot", "Success", "Job_ID: {} - Truncated Response: {}".format(job_id, str(create_snapshot_result)[0:1000])])
            break
        except Exception as e:
            logging.error("Error on Snapshot Creation: {}".format(str(e)))
            attempt_counter += 1
            if attempt_counter < 2:
                logging.info("Retrying Snapshot Creation (attempt #{})...".format(str(attempt_counter)))
                sleep(10)
                continue
            else:
                logging.error("Maximum number of retries exceeded. Recording error to pipeline results.")
                params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Snapshot Creation", "Create and Share Snapshot", "Error", str(e)])
                break

# Function to find and execute ingests
def run_t_ingests(params):
    dataset_id = params["dataset_id"]
    query_set = params["query_set"]
    override_list = params["ingest_list_override"]
    # Determine set of ingests to run
    ingest_list = query_set["transforms"].keys()
    filtered_ingest_list = []
    if override_list:
        for item in ingest_list:
            if item in override_list:
                filtered_ingest_list.append(item)
    else:
        filtered_ingest_list = ingest_list
    # Loop through and ingest files
    for item in filtered_ingest_list:
        target_table = item
        source_file_name = target_table + ".json" 
        source_rel_file_path = "{}/{}".format(params["t_output_dir"], source_file_name)
        source_full_file_path = "{}/{}/{}".format(params["ws_bucket"], params["t_output_dir"], source_file_name)
        if file_exists(source_rel_file_path, params):
            logging.info(f"Running ingest from {source_file_name} to table {target_table}")
            ingest_request = {
                "table": target_table,
                "profile_id": params["profile_id"],
                "ignore_unknown_values": True,
                "resolve_existing_files": True,
                "updateStrategy": "replace",
                "format": "json",
                "load_tag": "Ingest for {}".format(params["dataset_id"]),
                "path": source_full_file_path
            }
            attempt_counter = 0
            while True:
                try:
                    ingest_request_result, job_id = tdr_utils.wait_for_job(datasets_api.ingest_dataset(id=dataset_id, ingest=ingest_request))
                    logging.info("Ingest from file {} succeeded: {}".format(source_file_name, str(ingest_request_result)[0:1000]))
                    params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Ingests", f"File: {source_file_name}", "Success", "Job_ID: {} - Truncated Response: {}".format(job_id, str(ingest_request_result)[0:1000])])
                    break
                except Exception as e:
                    logging.error("Error on Dataset Ingest: {}".format(str(e)))
                    attempt_counter += 1
                    if attempt_counter < 2:
                        logging.info("Retrying Dataset Ingest (attempt #{})...".format(str(attempt_counter)))
                        sleep(10)
                        continue
                    else:
                        logging.error("Maximum number of retries exceeded. Recording error to pipeline results.")
                        params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Ingests", f"File: {source_file_name}", "Error", str(e)])
                        break
        else:
            logging.warning(f"Metadata file does not exist.  Skipping: {source_file_name}")
            params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Ingests", f"File: {source_file_name}", "Warning", f"Metadata file does not exist.  Skipping: {source_file_name}"])
            continue              
                
# Function to extend schema of existing TDR dataset
def run_schema_extension(params):
    dataset_id = params["dataset_id"]
    target_schema = params["target_schema"]
    mapping_target = params["mapping_target"]
    # Add source_datarepo_row_id column to all tables
    source_row_ids_col_def = {"name": "source_datarepo_row_ids", "datatype": "string", "array_of": True}
    for table_entry in target_schema["tables"]:
        table_entry["columns"].append(source_row_ids_col_def)
    # Retrieve current TDR schema and diff with target_schema
    logging.info("Retrieving current TDR schema to determine new tables and relationships to add.")
    additional_tables = []
    additional_relationships = []
    try:
        response = datasets_api.retrieve_dataset(id=dataset_id, include=["SCHEMA", "ACCESS_INFORMATION"]).to_dict()
        current_table_list = [table["name"] for table in response["schema"]["tables"]]
        current_rel_list = [rel["name"] for rel in response["schema"]["relationships"]]
        for table_entry in target_schema["tables"]:
            if table_entry["name"] not in current_table_list:
                additional_tables.append(table_entry)
        for rel_entry in target_schema["relationships"]:
            if rel_entry["name"] not in current_rel_list:
                additional_relationships.append(rel_entry)
    except Exception as e:
        logging.warning("Error retrieving source schema from TDR. Will attempt to add all tables and relationships to schema. Error: {}".format(e))
        additional_tables = target_schema["tables"]
        additional_relationships = target_schema["relationships"]
    # Update TDR dataset schema to include additional tables and relationships
    add_table_list = [table["name"] for table in additional_tables]
    add_rel_list = [rel["name"] for rel in additional_relationships]
    if add_table_list or add_rel_list:
        logging.info("Submitting TDR schema extension request.\n\tTables to add: {tabs}\n\tRelationships to add: {rels}".format(tabs=add_table_list, rels=add_rel_list))
        schema_update_request = {
            "description": "Adding tables and relationships for mapping target: {}.".format(params["mapping_target"]),
            "changes": {
                "addTables": additional_tables,
                "addRelationships": additional_relationships
            }
        }
        attempt_counter = 0
        while True:
            try:
                schema_update_result, job_id = tdr_utils.wait_for_job(datasets_api.update_schema(id=dataset_id, dataset_schema_update_model=schema_update_request))
                logging.info("TDR schema extension succeeded: {}".format(str(schema_update_result)[0:1000]))
                params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "TDR Schema Extension", "Extend TDR Schema", "Success", "Job_ID: {} - Truncated Response: {}".format(job_id, str(schema_update_result)[0:1000])])
                break
            except Exception as e:
                logging.error("Error on TDR schema extension: {}".format(str(e)))
                attempt_counter += 1
                if attempt_counter < 2:
                    logging.info("Retrying TDR schema extension (attempt #{})...".format(str(attempt_counter)))
                    sleep(10)
                    continue
                else:
                    logging.error("Maximum number of retries exceeded. Recording error to pipeline results.")
                    params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "TDR Schema Extension", "Extend TDR Schema", "Error", str(e)])
                    break
    else:
        logging.info("No new tables or relationships to add to the TDR schema.")
        params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "TDR Schema Extension", "Extend TDR Schema", "Success", "No new tables or relationships to add to the TDR schema."])
                
# Function to loop through and execute transform queries
def run_transform_queries(params):
    # Determine set of transforms to run
    query_set = params["query_set"]["transforms"]
    override_list = params["transform_list_override"]
    filtered_query_set = {}
    if override_list:
        for key, val in query_set.items():
            if key in override_list:
                filtered_query_set[key] = val
    else:
        filtered_query_set = query_set
    # Read in and execute queries
    client = bigquery.Client()
    for key, val in filtered_query_set.items():
        target_table = key
        target_file = target_table + ".json"
        destination_dir = params["t_output_dir"]
        logging.info("Running transform for target table: {}".format(target_table))
        query = val["query"]
        try:
            df = client.query(query).result().to_dataframe()
            records_json = df.to_json(orient='records') 
            records_list = json.loads(records_json)
            records_cnt = len(records_list)
            with open(target_file, 'w') as outfile:
                for idx, val in enumerate(records_list):
                    json.dump(val, outfile)
                    if idx < (records_cnt - 1):
                        outfile.write('\n')
            !gsutil cp $target_file $ws_bucket/$destination_dir/ 2> stdout
            !rm $target_file
            params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Transformed Files Creation", "File: {}".format(target_file), "Success", ""])
        except Exception as e:
            params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Transformed Files Creation", "File: {}".format(target_file), "Error", str(e)])
                
# Function to orchestrate the various components of the transformation ingest pipeline
def run_t_pipeline(params):
    
    # Setup Google creds and establish TDR clients
    creds, project = google.auth.default()
    auth_req = google.auth.transport.requests.Request()
    creds.refresh(auth_req)
    config = data_repo_client.Configuration()
    config.host = "https://data.terra.bio"
    config.access_token = creds.token
    api_client = data_repo_client.ApiClient(configuration=config)
    api_client.client_side_validation = False
    global datasets_api
    datasets_api = data_repo_client.DatasetsApi(api_client=api_client)
    global snapshots_api
    snapshots_api = data_repo_client.SnapshotsApi(api_client=api_client)
    global jobs_api
    jobs_api = data_repo_client.JobsApi(api_client=api_client)
    global tdr_utils
    tdr_utils = TdrUtils(jobs_api)
    
    # Step 1: Set Variables for Pipeline
    src_dataset = params["dataset_name"] + " ({})".format(params["dataset_id"])
    logging.info("Starting Transformation (T) Pipeline for {}.".format(src_dataset))
    params["src_dataset"] = src_dataset
    params["target_schema"] = {}
    params["query_set"] = {}
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(params["ws_bucket_name"])
    blob = bucket.blob("ingest_pipeline/output/transformed/{}/{}/schema/mapping_schema_object.json".format(params["mapping_target"], params["dataset_id"]))
    params["target_schema"] = json.loads(blob.download_as_string(client=None))
    blob = bucket.blob("ingest_pipeline/output/transformed/{}/{}/queries/transform_query_set.json".format(params["mapping_target"], params["dataset_id"]))
    params["query_set"] = json.loads(blob.download_as_string(client=None))
    current_datetime = datetime.datetime.now()
    current_datetime_string = current_datetime.strftime("%Y%m%d%H%M")
    params["snapshot_name"] = params["dataset_name"] + "_" + current_datetime_string  
    params["t_output_dir"] = "ingest_pipeline/output/transformed/{}/{}/table_data".format(params["mapping_target"], params["dataset_id"])
    params["t_val_output_dir"] = "ingest_pipeline/output/transformed/{}/{}/validation".format(params["mapping_target"], params["dataset_id"])
    params["validation_schema_file"] = "ingest_pipeline/mapping/{}/mapping_schema_object.json".format(params["mapping_target"])
    params["pipeline_results"] = []
    
    # Step 2: Confirm Transformation Artifacts
    logging.info("Attempting to confirm transform artifact retrieval (target schema and transform queries).")
    if params["target_schema"] and params["query_set"]:
        logging.info("Transform artifact retrieval confirmed.")
        params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Transform Artifact Retrieval", "Confirm Transform Artifact Retrieval", "Success", ""])
    else:
        if not params["target_schema"] and not params["query_set"]:
            error_message = "Target schema and transform queries both empty."
            logging.info(error_message)
            params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Transform Artifact Retrieval", "Retrieve Target Schema and Transform Queries", "Error", error_message])
            return params["pipeline_results"]
        elif not params["target_schema"]:
            error_message = "Target schema empty."
            logging.info(error_message)
            params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Transform Artifact Retrieval", "Retrieve Target Schema and Transform Queries", "Error", error_message])
            return params["pipeline_results"]
        else:
            error_message = "Transform queries empty."
            logging.info(error_message)
            params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Transform Artifact Retrieval", "Retrieve Target Schema and Transform Queries", "Error", error_message])
            return params["pipeline_results"]
                 
    # Step 3: Create Transformed Files
    if params["skip_transforms"] == True:
        logging.info("Skipping transformed files creation.")
        params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Transformed Files Creation", "File: All", "Skipped", "User request"])
    else:
        logging.info("Running transformed files creation.")
        run_transform_queries(params)
        
    # Step 4: Identify TDR Dataset and Extend Schema 
    if params["skip_schema_extension"] == True:
        logging.info("Skipping TDR schema extension.")
        params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "TDR Schema Extension", "Extend TDR Schema", "Skipped", "User request"])
    else:
        if len(params["dataset_id"]) == 0:
            logging.error("No TDR dataset specified. Exiting pipeline.")
            params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "TDR Schema Extension", "Extend TDR Schema", "Error", "No TDR dataset specified."])
            return params["pipeline_results"]
        else:
            logging.info("Running TDR schema extension.")
            run_schema_extension(params)
            
    # Step 5: Ingest Transformed Files
    if params["skip_ingests"] == True:
        logging.info("Skipping dataset ingests")
        params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Dataset Ingests", "File: All", "Skipped", "User request"])
    else:
        logging.info("Running dataset ingests")
        run_t_ingests(params)
    
    # Step 6: Create and Share Snapshot
    int_df_results = pd.DataFrame(params["pipeline_results"], columns = ["Dataset", "Time", "Step", "Task", "Status", "Message"])
    errors = int_df_results[int_df_results["Status"].str.contains("Error")]
    if len(errors) > 0:
        logging.info("Skipping snapshot creation due to upstream errors")
        params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Snapshot Creation", "Create and Share Snapshot", "Skipped", "Errors found upstream in ingest pipeline"])
    elif params["skip_snapshot_creation"] == True:
        logging.info("Skipping snapshot creation on user request")
        params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Snapshot Creation", "Create and Share Snapshot", "Skipped", "User request"])
    else:
        logging.info("Running snapshot creation")
        try:
            create_and_share_snapshot(params)
        except Exception as e:
            logging.error("Error running snapshot creation: {}".format(str(e)))
            params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Snapshot Creation", "Create and Share Snapshot", "Error", str(e)])
    
    # Step 7: Data Profiling and Validation
    if len(errors) > 0:
        logging.info("Skipping output data validation due to upstream errors")
        params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Output Data Validation", "Profile and Validate Data", "Skipped", "Errors found upstream in ingest pipeline"])
    elif params["skip_data_validation"] == True:
        logging.info("Skipping output data validation on user request")
        params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Output Data Validation", "Profile and Validate Data", "Skipped", "User request"])
    else:
        logging.info("Running output data validation")
        try:
            odv.profile_data(params["dataset_id"], "dataset", params["t_val_output_dir"], params["validation_schema_file"])
            params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Output Data Validation", "Profile and Validate Data", "Success", ""])
        except Exception as e:
            logging.error("Error running output data validation: {}".format(str(e)))
            params["pipeline_results"].append([params["src_dataset"], datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Output Data Validation", "Profile and Validate Data", "Error", str(e)])   
    
    # Aggregate and Write Out Pipeline Results
    df_results = pd.DataFrame(params["pipeline_results"], columns = ["Dataset", "Time", "Step", "Task", "Status", "Message"])
    output_file_path = "pipeline_results_log_" + current_datetime_string + ".tsv"
    destination_dir = "ingest_pipeline/output/transformed/{}/{}/logs".format(params["mapping_target"], params["dataset_id"])
    df_results.to_csv(output_file_path, index=False, sep="\t")
    !gsutil cp $output_file_path $ws_bucket/$destination_dir/ 2> stdout
    !rm $output_file_path
    
    # Display Pipeline Results
    logging.info("The ingest pipeline has completed for {}.".format(params["src_dataset"]))
    logging.info("Pipeline Results:")
    display(df_results)
    return params["pipeline_results"]
