# STEPS TO FOLLOW 

- Pick the Active Elibile Tables from config Table where active_flag=1 and load_flag=1
- And bq_to_gcs_status='COMPLETED'
- Reads the Latest PARQUET from GCS (gcs_path/dt=/*.parquet)--> uses the Latest dt Folder
- Writes to the Delta bronze path
- Create the Table From bronze path top bronze Dataset
- Update the config table

In [0]:
# Create Databricks widgets for MySQL and GCP/Databricks parameters
dbutils.widgets.text("mysql_host",     "136.113.73.13")  # MySQL server host/IP
dbutils.widgets.text("mysql_port",     "3306")           # MySQL server port
dbutils.widgets.text("mysql_db",       "GCPmigrationMeta")  # MySQL database name
dbutils.widgets.text("mysql_user",     "root")           # MySQL username
dbutils.widgets.text("mysql_password", "Admin@1234")     # MySQL password (use secrets in production)
dbutils.widgets.text("GOOGLE_APPLICATION_CREDENTIALS", "dbfs:/FileStore/shared_uploads/anurag.srivastava@koantekorg.onmicrosoft.com/datamigrationproject_475415_ffc1c7e9a773.json")  # GCP service account key path
dbutils.widgets.text("hive_db",        "bronze")         # Hive database name for Delta tables

# Retrieve widget values for use in the notebook
mysql_host = dbutils.widgets.get("mysql_host")
mysql_port = dbutils.widgets.get("mysql_port")
mysql_db = dbutils.widgets.get("mysql_db")
mysql_user = dbutils.widgets.get("mysql_user")
mysql_password = dbutils.widgets.get("mysql_password")
GOOGLE_APPLICATION_CREDENTIALS = dbutils.widgets.get("GOOGLE_APPLICATION_CREDENTIALS")
hive_db = dbutils.widgets.get("hive_db")

In [0]:
%pip install -q mysql-connector-python google-cloud-bigquery google-cloud-storage

In [0]:
# If needed once per cluster:
# %pip install mysql-connector-python google-cloud-storage

# COMMAND ----------
import os, re, shutil, json
import mysql.connector as mc
from contextlib import contextmanager
from urllib.parse import urlparse
from google.cloud import storage
from pyspark.sql.functions import col, to_timestamp, to_date
from pyspark.sql.types import TimestampType

# ---------------- Configuration ----------------
# Read MySQL and GCP/Databricks parameters from widgets
MYSQL = {
    "host": dbutils.widgets.get("mysql_host").strip(),
    "port": int(dbutils.widgets.get("mysql_port")),
    "db":   dbutils.widgets.get("mysql_db").strip(),
    "user": dbutils.widgets.get("mysql_user").strip(),
    "pwd":  dbutils.widgets.get("mysql_password"),
}
# GCP service account key path (convert dbfs:/ to /dbfs/)
GCP_KEY = dbutils.widgets.get("GOOGLE_APPLICATION_CREDENTIALS").replace("dbfs:/", "/dbfs/")
# Hive database name for Delta tables
HIVE_DB = dbutils.widgets.get("hive_db").strip()


assert os.path.exists(GCP_KEY), f"GCP key not found at {GCP_KEY}"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = GCP_KEY  # auth for GCS SDK

In [0]:
@contextmanager
def mysql_conn():
    """Context manager for MySQL connection."""
    conn = mc.connect(
        host=MYSQL["host"], port=MYSQL["port"],
        user=MYSQL["user"], password=MYSQL["pwd"], database=MYSQL["db"]
    )
    try:
        yield conn
    finally:
        conn.close()

In [0]:
def fetch_eligible_rows():
    """
    Fetch rows from config_table that are eligible for processing:
    - active_flag=1
    - load_flag=1
    - bq_to_gcs_status='COMPLETED'
    - gcs_to_bronze_status in ('NOT_STARTED','FAILED')
    """
    with mysql_conn() as conn:
        cur = conn.cursor(dictionary=True)
        cur.execute("""
          SELECT table_name, gcs_path, target_path
          FROM config_table
          WHERE active_flag=1
            AND load_flag=1
            AND bq_to_gcs_status='COMPLETED'
            AND gcs_to_bronze_status IN ('NOT_STARTED','FAILED')
          ORDER BY table_name
        """)
        rows = cur.fetchall()
        cur.close()
    return rows


In [0]:
def set_bronze_status(table_name, status, err=None):
    """
    Update gcs_to_bronze_status in config_table for a given table.
    Status can be 'IN_PROGRESS', 'COMPLETED', or 'FAILED'.
    """
    with mysql_conn() as conn:
        cur = conn.cursor()
        if status == "IN_PROGRESS":
            cur.execute("""
              UPDATE config_table
                 SET gcs_to_bronze_status='IN_PROGRESS', last_run_ts=NOW(), error_message=NULL
               WHERE table_name=%s
            """, (table_name,))
        elif status == "COMPLETED":
            cur.execute("""
              UPDATE config_table
                 SET gcs_to_bronze_status='COMPLETED', last_success_ts=NOW()
               WHERE table_name=%s
            """, (table_name,))
        else:  # FAILED
            cur.execute("""
              UPDATE config_table
                 SET gcs_to_bronze_status='FAILED', error_message=%s
               WHERE table_name=%s
            """, (str(err)[:2000] if err else "FAILED", table_name))
        conn.commit(); cur.close()

In [0]:
def reset_load_flag(table_name):
    """Set load_flag=0 for a table after processing is done."""
    with mysql_conn() as conn:
        cur = conn.cursor()
        cur.execute("UPDATE config_table SET load_flag=0 WHERE table_name=%s", (table_name,))
        conn.commit(); cur.close()

In [0]:
def _gcs_client():
    """Create and return a Google Cloud Storage client."""
    # env var already set above
    return storage.Client()

**Sample URI**

gs://bigquerytogcsmigration/exports/ods/ods_order_items/dt=20251101T104630Z/000000000000.parquet

In [0]:

def _split_gs(gcs_uri: str):
    """
    Split a GCS URI (gs://bucket/path) into (bucket, path).
    """
    assert gcs_uri.startswith("gs://")
    p = urlparse(gcs_uri)
    return p.netloc, p.path.lstrip("/")

In [0]:
def latest_dt_uri(gcs_base: str) -> str:
    """
    Find the latest dt=YYYYMMDDTHHMMSSZ folder under gcs_base.
    Return a wildcard URI for all Parquet files in that folder.
    """
    bucket_name, base_prefix = _split_gs(gcs_base.rstrip("/"))
    cli = _gcs_client()

    # List all objects under base_prefix/dt=
    search_prefix = f"{base_prefix}/dt="
    dts = set()
    for blob in cli.list_blobs(bucket_name, prefix=search_prefix):
        # Look for '.../dt=xxxxx/' in blob.name or files under it
        m = re.search(r"dt=([\dT]+Z)/", blob.name)
        if m:
            dts.add(m.group(1))
    if not dts:
        # fallback: allow wildcard if no dt folders found
        return f"gs://{bucket_name}/{base_prefix}/dt=*/*.parquet"
    latest = sorted(dts)[-1]
    return f"gs://{bucket_name}/{base_prefix}/dt={latest}/*.parquet"

In [0]:
def _ensure_dir(local_path: str):
    """Create a local directory if it does not exist."""
    os.makedirs(local_path, exist_ok=True)


In [0]:
def _prefix_from_wildcard(gcs_uri: str) -> tuple[str, str]:
    """
    Given a GCS URI with wildcard, return (bucket, prefix) for listing.
    Example: gs://bucket/foo/bar/dt=.../*.parquet -> (bucket, 'foo/bar/dt=.../')
    """
    bucket, path = _split_gs(gcs_uri)
    if "*" in path:
        prefix = path[:path.rfind("/") + 1]
    else:
        prefix = path if path.endswith("/") else path + "/"
    return bucket, prefix


In [0]:
def copy_gcs_prefix_to_dbfs(gcs_uri_wildcard: str, dbfs_dir: str) -> str:
    """
    Copy all objects under the wildcard's parent prefix from GCS to DBFS directory.
    Returns the DBFS directory containing the downloaded Parquet files.
    """
    bucket, prefix = _prefix_from_wildcard(gcs_uri_wildcard)
    cli = _gcs_client()
    bkt = cli.bucket(bucket)

    local_dir = dbfs_dir.replace("dbfs:/", "/dbfs/")
    if os.path.exists(local_dir):
        shutil.rmtree(local_dir)
    _ensure_dir(local_dir)

    n = 0
    for blob in cli.list_blobs(bucket, prefix=prefix):
        if blob.name.endswith("/"):
            continue
        local_file = os.path.join(local_dir, os.path.basename(blob.name))
        blob.download_to_filename(local_file)
        n += 1
    if n == 0:
        raise FileNotFoundError(f"No objects found under gs://{bucket}/{prefix}")
    print(f"↳ Copied {n} file(s) from gs://{bucket}/{prefix} → {dbfs_dir}")
    return dbfs_dir


In [0]:
def to_dbfs(path: str) -> str:
    """
    Convert a path to DBFS format if not already in DBFS.
    """
    p = path.strip()
    if p.startswith("dbfs:/") or p.startswith("dbfs:"):
        return p
    if p.startswith("/"):
        return "dbfs:" + p
    return "dbfs:/" + p

## MAIN ETL LOGIC

In [0]:

rows = fetch_eligible_rows()
if not rows:
    print("Nothing to process (eligible rows not found).")
else:
    # Ensure Hive database exists for Delta tables
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {HIVE_DB}")
    print(f"Processing {len(rows)} table(s): {[r['table_name'] for r in rows]}")
    for r in rows:
        t = r["table_name"]
        gcs_base = r["gcs_path"]
        dst_path = to_dbfs(r["target_path"])
        try:
            set_bronze_status(t, "IN_PROGRESS")

            # 1) Pick latest dt folder in GCS for this table
            src_uri = latest_dt_uri(gcs_base)
            print(f"\n{t}: staging {src_uri}")

            # 2) Copy Parquet files from GCS to DBFS staging directory
            stage_dir = f"dbfs:/tmp/gcs_stage/{t}"
            stage_dir = copy_gcs_prefix_to_dbfs(src_uri, stage_dir)

            # 3) Read Parquet files from DBFS and write to Delta table (overwrite mode)
            df = spark.read.parquet(stage_dir)
            # --- Normalize time columns (minimal) ---
            # Force 'order_ts' to proper TIMESTAMP regardless of source variation
            if 'order_ts' in df.columns:
                # This safely handles string, date, long (epoch sec/ms if you already pre-converted),
                # and passes through if it's already timestamp.
                df = df.withColumn('order_ts', to_timestamp(col('order_ts')))

            df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").save(dst_path)

            # 4) Register Delta table in Hive (bronze DB)
            spark.sql(f"CREATE TABLE IF NOT EXISTS {HIVE_DB}.`{t}` USING DELTA LOCATION '{dst_path}'")

            set_bronze_status(t, "COMPLETED")
            reset_load_flag(t)
            print(f"✅ {t}: Bronze written at {dst_path}")
        except Exception as e:
            set_bronze_status(t, "FAILED", err=e)
            print(f"❌ {t}: {e}")

# (Optional) Clean up DBFS staging directory after run
dbutils.fs.rm("dbfs:/tmp/gcs_stage", recurse=True)