In [28]:
import subprocess
import re
import os
import glob
import json
import shutil
from google.cloud import bigquery

def run_dap_command(command, namespace="canvas"):
    try:
        base_url = "https://api-gateway.instructure.com"  # Replace with your actual base URL
        client_id = "us-east-1#af30f4c1-f587-4acc-a0a7-52f4b249207d"            # Replace with your actual client ID
        client_secret = "3j2eyjXK_riAvNh9TkPZTTCoMlHMmpOpz5d0rE7hzFs"    # Replace with your actual client secret

        result = subprocess.run(f"dap --base-url {base_url} --client-id {client_id} --client-secret {client_secret} {command} --namespace {namespace}", 
                                check=True, shell=True, text=True, capture_output=True)
        return result.stdout
    except subprocess.CalledProcessError as e:
        print(f"Error running command '{command}': {e.output}")
        return None

def list_tables():
    return run_dap_command("list").split()

def download_table_data(table_name):
    return run_dap_command(f"snapshot --table {table_name} --format parquet")

def download_incremental_table_data(table_name, since_datetime):
    return run_dap_command(f"incremental --table {table_name} --format parquet --since {since_datetime}")

def download_table_schema(table_name):
    return run_dap_command(f"schema --table {table_name}")

def get_job_id():
    dirs = next(os.walk('downloads'))[1]
    if dirs:
        return dirs[0]  # Assuming the latest directory is the relevant one
    return None

def load_json_schema(file_path):
    with open(file_path, 'r') as file:
        return json.load(file)

def schema_field_to_dict(field):
    # Convert a SchemaField to a dictionary
    result = {
        "name": field.name,
        "type": field.field_type,
        "mode": field.mode,
        "description": field.description,
    }
    if field.field_type == 'RECORD':
        result["fields"] = [schema_field_to_dict(f) for f in field.fields]
    return result

def update_schema_description(bq_schema_fields, json_schema):
    updated_fields = []
    max_description_length = 1024

    for field in bq_schema_fields:
        json_field = json_schema.get('properties', {}).get(field.name)
        description = field.description

        if json_field:
            if 'description' in json_field:
                # Truncate the description if it's too long
                description = json_field['description'][:max_description_length]

            if field.field_type == 'RECORD':
                nested_fields = update_schema_description(field.fields, json_field) if field.fields else []
                new_field = bigquery.SchemaField(
                    name=field.name,
                    field_type=field.field_type,
                    mode=field.mode,
                    description=description,
                    fields=nested_fields
                )
            else:
                new_field = bigquery.SchemaField(
                    name=field.name,
                    field_type=field.field_type,
                    mode=field.mode,
                    description=description
                )
        else:
            # If json_field is None, use the original field
            new_field = field

        updated_fields.append(new_field)

    return updated_fields


def update_bigquery_schema_from_json(client, table_id, json_schema_file):
    # Load JSON schema
    json_schema = load_json_schema(json_schema_file)

    # Retrieve the current schema from BigQuery
    table = client.get_table(table_id)
    bq_schema = table.schema

    # Update the schema descriptions
    updated_schema = update_schema_description(bq_schema, json_schema['schema'])

    # Update the table with the new schema
    table.schema = updated_schema
    client.update_table(table, ['schema'])

def get_latest_schema_file(table_name, directory="downloads"):
    schema_file_pattern = re.compile(rf"{table_name}_schema_version_(\d+)\.json")
    highest_version = 0
    latest_schema_file = None

    for file in os.listdir(directory):
        match = schema_file_pattern.match(file)
        if match:
            version = int(match.group(1))
            if version > highest_version:
                highest_version = version
                latest_schema_file = file

    return os.path.join(directory, latest_schema_file) if latest_schema_file else None

# Initialize a BigQuery client
client = bigquery.Client()

# List tables
tables = list_tables()
if not tables:
    print("No tables found.")

for table in tables:
    # Download table data
    download_table_data(table)

    job_id = get_job_id()

    # Find the downloaded parquet file
    parquet_files = glob.glob(f'downloads/{job_id}/*.parquet')
    if not parquet_files:
        print(f"No parquet files found for job {job_id}.")
        continue
    parquet_file = parquet_files[0]

    table_ref = client.dataset('CanvasDataPortal2').table(table)

    # Load parquet file into BigQuery
    job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.PARQUET, write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE)
    with open(parquet_file, "rb") as source_file:
        job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

    # Wait for the load job to complete
    job.result()

    # Download table schema
    download_table_schema(table)

    update_bigquery_schema_from_json(client, f"dtsdatastore.CanvasDataPortal2.{table}", get_latest_schema_file(table))

    print(f"Table {table} loaded to BigQuery.")

    # Clean up downloaded files
    os.remove(parquet_file)
    shutil.rmtree(os.path.dirname(parquet_file))


Error running command 'snapshot --table access_tokens --format parquet': 
No parquet files found for job None.
Error running command 'snapshot --table account_users --format parquet': 
No parquet files found for job None.
Error running command 'snapshot --table accounts --format parquet': 
No parquet files found for job None.
Error running command 'snapshot --table assessment_question_banks --format parquet': 
No parquet files found for job None.
Error running command 'snapshot --table assessment_questions --format parquet': 
No parquet files found for job None.
Error running command 'snapshot --table assignment_groups --format parquet': 
No parquet files found for job None.


KeyboardInterrupt: 