### Azure Snapshot Export Example
 This notebook:
 - Exports an Azure snapshot using python client (Same call can be made from swagger/curl)
 - Reads from parquet file included in the output from the export
 - Streams the file from Azure and copies it into a GCP Bucket
 - Imports the parquet file from the GCP Bucket into a BQ Dataset

# Setup

In [64]:
%%capture
import sys
!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install pandas
!{sys.executable} -m pip install --upgrade data_repo_client
!{sys.executable} -m pip install pyarrow
!{sys.executable} -m pip install fastparquet
!{sys.executable} -m pip install google-cloud-bigquery
!{sys.executable} -m pip install google-cloud-storage

import pandas as pd
# import pyarrow as pa
import datetime, uuid, urllib, os, time, getpass, uuid, json
# from tdr_utils import TdrUtils
import logging
logger = logging.getLogger()
logger.setLevel(logging.CRITICAL)
from data_repo_client import ApiClient, ApiException, Configuration, DatasetsApi, SnapshotsApi, JobsApi, ResourcesApi, DataRepositoryServiceApi
from IPython.core.display import display, clear_output, HTML
from tdr_utils import TdrUtils

### Authenticate 

In [65]:
# Set up configuration
config = Configuration()
config.host="https://data.shelbee.bee.envs-terra.bio/"
token=!gcloud auth print-access-token sholden@broadinstitute.org
config.access_token = token[0]

apiClient = ApiClient(configuration=config)
apiClient.client_side_validation = False

# Create required API Clients
snapshots_api = SnapshotsApi(api_client=apiClient)
jobs_api = JobsApi(api_client=apiClient)
tdr_utils = TdrUtils(jobs_api)


# Snapshot Export

In [66]:
# Example Snapshot/table to export
exisitingSnapshotId = 'e3638824-9ed9-408e-b3f5-cba7585658a3'
exampleTableName = 'variant'

# View data using TDR endpoint
snapshots_api.lookup_snapshot_preview_by_id(exisitingSnapshotId, exampleTableName, filter="WHERE id IN ('1:93814411:A:G', '1:85011183:A:C')")

{'filtered_row_count': 2,
 'result': [{'alt': 'C',
             'chromosome': '1',
             'datarepo_row_id': '004EF96A-F144-44FE-B86D-9F77296107EC',
             'id': '1:85011183:A:C',
             'position': 85011183,
             'reference': 'A'},
            {'alt': 'G',
             'chromosome': '1',
             'datarepo_row_id': '00728302-8D2B-40FA-986F-8197D9DC8924',
             'id': '1:93814411:A:G',
             'position': 93814411,
             'reference': 'A'}],
 'total_row_count': 1004}

In [67]:
# Now, let's export the snapshot and take a look at the same "variant" table
export_snapshot_result = tdr_utils.wait_for_job(snapshots_api.export_snapshot(exisitingSnapshotId,  validate_primary_key_uniqueness='false'))


# Copy Azure Parquet Files to GCP Bucket
- Build Function to perform copy

In [68]:
import requests
from google.cloud import storage

def copyFileToGCPBucket(parquet_uri, destination_bucket, destination_file_name):
    with requests.get(parquet_uri, stream=True) as r:
        client = storage.Client()
        bucket = client.get_bucket(destination_bucket)
        blob = bucket.blob(destination_file_name)
        blob.upload_from_file(r.raw)

# Import parquet to BigQuery

In [69]:
from google.cloud import bigquery

existing_gcp_project = "broad-jade-sh"
existing_dataset_name = "test_parquet_export"

def importToBQ(new_table_name, source_gs_path, columns):

    # Construct a BigQuery client object.
    client = bigquery.Client(project=existing_gcp_project)
    # Set table_id to the ID of the table to create.
    table_id = "{}.{}.{}".format(existing_gcp_project, existing_dataset_name, new_table_name)

    job_config = bigquery.LoadJobConfig(
        #Note: It is possible to build the schema based on the export manifest, but BQ requires the datatypes to exactly match
        # the parquet data types. So, thisn't helpful in handling array fields, timestamps, etc.
        #schema=buildSchema(columns),
        source_format=bigquery.SourceFormat.PARQUET,
    )

    load_job = client.load_table_from_uri(
        source_gs_path, table_id, job_config=job_config
    )  # Make an API request.

    load_job.result()  # Waits for the job to complete.

    destination_table = client.get_table(table_id)
    print("Loaded {} rows.".format(destination_table.num_rows))

# Create BQ View to handle data parsing and arrays

In [72]:
from google.cloud import bigquery

withClauseName = 'with_clause'
def buildWithStatement(columns, source_id):
    query = f"WITH {withClauseName} AS (SELECT "
    for column in columns:
        if column['array_of']:
            query += f"JSON_EXTRACT_STRING_ARRAY({column['name']}, '$') AS {column['name']}, "
        else:
            query += f"{column['name']}, "
    query = query[:-2]
    query += f" FROM {source_id})"
    return query
def parseDatatype(datatype, arrayOf, columnName):
    if datatype == "date":
        return f'DATE({columnName})'
    # Still not loving the processing required for the time field type. This trims the time down to just hour:minute:second and trims milliseconds
    elif datatype == "time":
        if arrayOf:
            return f'PARSE_TIME("%H:%M:%S", REGEXP_EXTRACT({columnName}, r"^[0-9:]+"))'
        else:
            return f'FORMAT_TIME("%H:%M:%S", EXTRACT(TIME FROM {columnName}))'
    elif datatype == "datetime":
        return f'DATETIME({columnName})'
    elif datatype == "timestamp":
        return f'TIMESTAMP({columnName})'
    elif datatype == "integer" or datatype == "int64":
        return f'CAST({columnName} AS INT64)'
    elif datatype == "float" or datatype == "float64":
        return f'CAST({columnName} AS FLOAT64)'
    elif datatype == "numeric":
        return f'CAST({columnName} AS NUMERIC)'
def buildSelectStatement(columns):
    query = f"SELECT "
    for column in columns:
        if column['array_of'] and column['datatype'] != "string":
            query += f"ARRAY(SELECT {parseDatatype(column['datatype'], column['array_of'], 'alt')} FROM UNNEST({column['name']}) AS alt) AS {column['name']}, "
        elif column['datatype'] in ["date", "datetime", "time", "timestamp", "int", "int64", "float", "float64", "numeric"]:
            query += f"{parseDatatype(column['datatype'], column['array_of'], column['name'])} AS {column['name']}, "
        else:
            query += column['name'] + ", " 
               
    query = query[:-2]
    return query
        
def buildQuery(columns, source_id):
    return f"{buildWithStatement(columns, source_id)} {buildSelectStatement(columns)} FROM {withClauseName}"

def createView(columns, table_name, view_name):
    client = bigquery.Client()

    view_id = f"{existing_gcp_project}.{existing_dataset_name}.{view_name}"
    source_id = f"{existing_gcp_project}.{existing_dataset_name}.{table_name}"
    view = bigquery.Table(view_id)

    view.view_query = buildQuery(columns, source_id)

    # Make an API request to create the view.
    view = client.create_table(view)
    print(f"Created {view.table_type}: {str(view.reference)}")


# Example Copy and Import to BQ
### Variant Table

In [71]:
# Towards the bottom of the export manifest, we can find the signed URLs for the JSON manifest and the parquet files containing the tabular data.
# You'll find two urls for each table in this example
# The first URL is the directory location, the second is the actual file
# There can be multiple parquet files if the table is large
variantParquet = export_snapshot_result['format']['parquet']['location']['tables'][5]['paths'][1]
columns = export_snapshot_result['snapshot']["tables"][5]["columns"]
copyFileToGCPBucket(variantParquet, "test-data-sholden", "test-parquet-export/variant.parquet")
variant_uri = "gs://test-data-sholden/test-parquet-export/variant.parquet"
importToBQ("variant_table", variant_uri, columns)
createView(columns, "variant_table", "variant_view")

Loaded 1004 rows.
WITH with_clause AS (SELECT id, alt, chromosome, position, reference FROM broad-jade-sh.test_parquet_export.variant_table) SELECT id, alt, chromosome, position, reference FROM with_clause
Created VIEW: broad-jade-sh.test_parquet_export.variant_view


## All Data Types Table

In [73]:
# Test with another table - "all_data_types"
all_data_types_parquet = export_snapshot_result['format']['parquet']['location']['tables'][6]['paths'][1]
columns = export_snapshot_result['snapshot']["tables"][6]["columns"]
copyFileToGCPBucket(all_data_types_parquet, "test-data-sholden", "test-parquet-export/all-data-types.parquet")
all_uri = "gs://test-data-sholden/test-parquet-export/all-data-types.parquet"
importToBQ("all_data_types_table", all_uri, columns)
createView(columns, "all_data_types_table", "all_data_types_view")

Loaded 5 rows.
Created VIEW: broad-jade-sh.test_parquet_export.all_data_types_view
