#Setup

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")

##Python Modules

##Helper Functions

In [0]:
from google.cloud import bigquery
from google.oauth2 import service_account
import base64
import json

def load_bigquery_tables_to_dfs(key_base64: str, project_id: str, parent_project_id: str):
    """
    Authenticates to BigQuery using a base64-encoded service account key,
    lists all tables in all datasets, and loads each into Spark as a
    DataFrame with the name `df_<table_name>`, selecting only
    document_id, timestamp, and data if they exist.
    """
    # Step 1: Set up BigQuery client
    key_json = json.loads(base64.b64decode(key_base64).decode("utf-8"))
    credentials = service_account.Credentials.from_service_account_info(key_json)
    client = bigquery.Client(project=project_id, credentials=credentials)

    # Step 2: List all datasets
    datasets = list(client.list_datasets())

    if not datasets:
        print(f"No datasets found in project {project_id}")
        return

    # Step 3: Loop through all tables in each dataset
    for dataset in datasets:
        dataset_id = dataset.dataset_id
        tables = list(client.list_tables(dataset_id))

        if not tables:
            print(f"No tables found in dataset {dataset_id}")
            continue

        for table in tables:
            table_id = table.table_id
            try:
                df = spark.read \
                    .format("bigquery") \
                    .option("credentials", key_base64) \
                    .option("project", project_id) \
                    .option("parentProject", parent_project_id) \
                    .option("dataset", dataset_id) \
                    .option("table", table_id) \
                    .option("viewsEnabled", "true") \
                    .load()

                # Select only desired columns if they exist
                desired_columns = [col for col in ['document_id', 'timestamp', 'data'] if col in df.columns]
                df_selected = df.select(*desired_columns)

                # Dynamically assign variable in the notebook context
                globals()[f"df_{table_id}"] = df_selected
                print(f"✅ Loaded: df_{table_id} ({dataset_id}.{table_id})")
            except Exception as e:
                print(f"❌ Failed to load {dataset_id}.{table_id}: {e}")

In [0]:
def write_to_uc_bronze(df: DataFrame, table_name: str):
    """
    Writes the given DataFrame to Unity Catalog at nightflyy.bronze.<table_name>.
    Creates the table if it doesn't exist, otherwise appends.
    """
    full_table_name = f"nightflyy.bronze.{table_name}"

    # Write to UC with append mode
    df.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .saveAsTable(full_table_name)

    print(f"📥 Written to Unity Catalog table → {full_table_name}")

In [0]:
from google.cloud import bigquery
from google.oauth2 import service_account
import base64
import json
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StructType
from pyspark.sql.functions import from_json, col

def parse_and_expand_json_data(df: DataFrame, sample_limit=20) -> DataFrame:
    """Infers schema from sampled JSON in 'data' column and expands it."""
    sample_jsons = df.select("data") \
        .filter(col("data").isNotNull()) \
        .limit(sample_limit) \
        .collect()

    schemas = []

    def dict_to_row(d):
        for k, v in d.items():
            if isinstance(v, dict):
                d[k] = Row(**v)
            elif isinstance(v, list) and all(isinstance(i, dict) for i in v):
                d[k] = [Row(**i) for i in v]
        return Row(**d)

    for row in sample_jsons:
        try:
            sample_dict = json.loads(row["data"])
            row_obj = dict_to_row(sample_dict)
            df_sample = spark.createDataFrame([row_obj])
            schemas.append(df_sample.schema)
        except Exception as e:
            print(f"Skipping invalid JSON: {e}")

    if not schemas:
        raise ValueError("No valid schemas found in sample.")

    def merge_struct_types(schema1: StructType, schema2: StructType) -> StructType:
        merged_fields = {}
        for field in schema1.fields + schema2.fields:
            if field.name not in merged_fields:
                merged_fields[field.name] = field
        return StructType(list(merged_fields.values()))

    merged_schema = schemas[0]
    for schema in schemas[1:]:
        merged_schema = merge_struct_types(merged_schema, schema)

    return df.withColumn("data_parsed", from_json(col("data"), merged_schema)) \
             .select("document_id", "timestamp", "data_parsed.*")


def load_and_expand_bigquery_tables(key_base64: str, project_id: str, parent_project_id: str):
    """
    Loads all BigQuery tables and expands the JSON 'data' column for each.
    Produces DataFrames named `df_expanded_<table_name>`.
    """
    key_json = json.loads(base64.b64decode(key_base64).decode("utf-8"))
    credentials = service_account.Credentials.from_service_account_info(key_json)
    client = bigquery.Client(project=project_id, credentials=credentials)

    datasets = list(client.list_datasets())
    if not datasets:
        print(f"No datasets found in project {project_id}")
        return

    for dataset in datasets:
        dataset_id = dataset.dataset_id
        tables = list(client.list_tables(dataset_id))

        for table in tables:
            table_id = table.table_id
            try:
                df = spark.read \
                    .format("bigquery") \
                    .option("credentials", key_base64) \
                    .option("project", project_id) \
                    .option("parentProject", parent_project_id) \
                    .option("dataset", dataset_id) \
                    .option("table", table_id) \
                    .option("viewsEnabled", "true") \
                    .load()

                if not all(col in df.columns for col in ['document_id', 'timestamp', 'data']):
                    print(f"⚠️ Skipping {table_id} — missing required columns.")
                    continue

                df_trimmed = df.select("document_id", "timestamp", "data")
                df_expanded = parse_and_expand_json_data(df_trimmed)
                globals()[f"df_expanded_{table_id}"] = df_expanded
                print(f"✅ Loaded and expanded: df_expanded_{table_id}")
                write_to_uc_bronze(df_expanded, table_id)

            except Exception as e:
                print(f"❌ Error processing {dataset_id}.{table_id}: {e}")

#Data Sources

## BigQuery

In [0]:
import base64, json

b64_key = dbutils.secrets.get(scope="shared-secrets", key="bq-key-json-base64")
key_json = base64.b64decode(b64_key).decode("utf-8")
key_dict = json.loads(key_json)

In [0]:
import json

key_str = dbutils.secrets.get(scope="bq-creds", key="bq-key-json")

# Try to parse the secret as JSON
try:
    key_dict = json.loads(key_str)
    key_json = json.dumps(key_dict)  # flatten to one-line string
    print("✅ Secret is valid and parsed.")
except json.JSONDecodeError:
    print("❌ Secret is not valid JSON.")

In [0]:
import base64
import json

# Get JSON from secret
key_str = dbutils.secrets.get(scope="bq-creds", key="bq-key-json")

# Validate + base64 encode the key
key_json = json.dumps(json.loads(key_str))
key_base64 = base64.b64encode(key_json.encode("utf-8")).decode("utf-8")

In [0]:
load_bigquery_tables_to_dfs(
    key_base64=key_base64,
    project_id="slydeapp-dc745",
    parent_project_id="slydeapp-dc745"
)

# Now you can access df_<table_name>, e.g.:
# df_events_raw_latest.display()

In [0]:
load_and_expand_bigquery_tables(
    key_base64=key_base64,
    project_id="slydeapp-dc745",
    parent_project_id="slydeapp-dc745"
)

# Now access: df_expanded_events_raw_latest, df_expanded_<other_table_name>, etc.