In [0]:
import yaml
with open('./ingestion_config.yaml', 'r') as file:
    config = yaml.safe_load(file)

databricks_config = config["databricks_config"]
volume_path = databricks_config["volume_path"]

catalog = config["databricks_config"]["catalog"]
metadata_path = f"{volume_path}dev_20240627/dev_tables.json"
sql_lite_fqdn = f"{volume_path}dev_20240627/dev_databases/dev_databases/"

In [0]:
(display(spark.read.option("multiline", True).json(metadata_path)))
table_metadata = spark.read.option("multiline", True).json(metadata_path).collect()

In [0]:
import pandas as pd
import sqlite3
import re

def clean_column_names(df):
    new_column_names = [re.sub(r'[ ,;{}()\n\t=]', '_', col) for col in df.columns]
    return df.toDF(*new_column_names)

def table_exists(catalog, schema, table):
  try:
    query = f"SHOW TABLES IN {catalog}.{schema} LIKE '{table}'"
    result = spark.sql(query).collect()
    return len(result) > 0
  except Exception as e:
    print(f"Table check failed for {table} with error {str(e)}")
    return False

failed_table_writes = []

for t in table_metadata:
  table_list = t.table_names_original
  schema_name = t.db_id

  try:
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema_name}")
  except:
    print(f"Schema creation failed for {schema_name}")
    pass
    
  for t_name in table_list:
    if table_exists(catalog, schema_name, t_name):
      print(f"Table already exists: {catalog}.{schema_name}.{t_name}. Skipping...")
      pass
    else:
      try: 
        con = sqlite3.connect(f"{sql_lite_fqdn}{schema_name}/{schema_name}.sqlite")
        pdf = pd.read_sql_query(f"SELECT * from `{t_name}`", con)
        con.close()

        sdf = spark.createDataFrame(pdf)
        sdf_final = clean_column_names(sdf)
        sdf_final.write.saveAsTable(f"{catalog}.{schema_name}.{t_name}")
      except Exception as e:
        print(f"Table creation failed for {t_name} with error: {str(e)}")
        failed_table_writes.append({"schema": schema_name, "table": t_name, "error": str(e)})
        pass
  

In [0]:
failed_table_writes

In [0]:
import os

for ft in failed_table_writes:
  volume_path = f"{sql_lite_fqdn}{ft['schema']}/{ft['schema']}.sqlite"
  local_path = f"/tmp/doan/bird/{ft['schema']}/{ft['schema']}.sqlite"

  if not os.path.exists(local_path):
    dbutils.fs.cp(volume_path, f"file:{local_path}")
  
  con = sqlite3.connect(f"file:{local_path}?mode=ro&immutable=1")
  pdf = pd.read_sql_query(f"SELECT * from `{ft['table']}`", con)
  con.close()

  sdf = spark.createDataFrame(pdf)
  sdf_final = clean_column_names(sdf)
  sdf_final.write.saveAsTable(f"{catalog}.{ft['schema']}.{ft['table']}")