## 05_bronze_ingest_supporting_files
## One-time ingestion of supporting CSV + metadata JSON into Bronze

In [0]:
from pyspark.sql import functions as F

CAT = "zillow"
BRONZE = "zillow_bronze"

ASIS = "/Volumes/zillow/zillow_medallion/raw-converted_format/as_is"

# CSV files to land as Bronze reference tables
CSV_FILES = {
  "City_time_series.csv": "city_ts_bronze",
  "Metro_time_series.csv": "metro_ts_bronze",
  "CountyCrossWalk_Zillow.csv": "county_crosswalk_bronze",
  "cities_crosswalk.csv": "cities_crosswalk_bronze",
  "DataDictionary.csv": "data_dictionary_bronze",
}

# JSON metadata files
ALL_METRICS_JSON = f"{ASIS}/all_available_metrics.json"
FIELDS_PER_LEVEL_JSON = f"{ASIS}/fields_per_level.json"

spark.sql(f"CREATE CATALOG IF NOT EXISTS {CAT}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CAT}.{BRONZE}")

In [0]:
# Helper: create a staging table schema from the file (fast and simple)
def create_stg_from_csv(path: str, stg_table: str):
    df = (spark.read.option("header","true").option("inferSchema","true").csv(path))
    spark.sql(f"DROP TABLE IF EXISTS {stg_table}")
    df.limit(0).write.format("delta").mode("overwrite").saveAsTable(stg_table)
    return df.schema

def ensure_target_from_stg(stg: str, tgt: str, comment: str):
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {tgt}
    USING DELTA
    AS SELECT
      s.*,
      CAST(NULL AS TIMESTAMP) AS load_dt,
      CAST(NULL AS STRING) AS source_path,
      CAST(NULL AS STRING) AS ingest_mode
    FROM {stg} s
    WHERE 1=0
    """)
    spark.sql(f"COMMENT ON TABLE {tgt} IS '{comment}'")

In [0]:
# 1) CSVs via COPY INTO (one-time loads)
for fname, tname in CSV_FILES.items():
    src = f"{ASIS}/{fname}"
    stg = f"{CAT}.{BRONZE}._stg_{tname}"
    tgt = f"{CAT}.{BRONZE}.{tname}"

    print(f"\n=== CSV -> COPY INTO: {fname} ===")

    # create staging with inferred schema
    _ = create_stg_from_csv(src, stg)

    # ensure target has audit cols
    ensure_target_from_stg(
        stg, tgt,
        f"Bronze table for {fname} ingested via COPY INTO; includes audit cols."
    )

    # load into staging
    spark.sql(f"TRUNCATE TABLE {stg}")
    spark.sql(f"""
    COPY INTO {stg}
    FROM '{src}'
    FILEFORMAT = CSV
    FORMAT_OPTIONS ('header'='true', 'inferSchema'='true')
    """)

    # insert into target with audit cols
    spark.sql(f"""
    INSERT INTO {tgt}
    SELECT
      s.*,
      current_timestamp() AS load_dt,
      '{src}' AS source_path,
      'copy_into_csv_as_is' AS ingest_mode
    FROM {stg} s
    """)

    rows = spark.sql(f"SELECT COUNT(*) AS c FROM {stg}").collect()[0]["c"]
    print("Rows inserted:", rows)

    # cleanup stage table (optional)
    spark.sql(f"DROP TABLE IF EXISTS {stg}")

# Quick evidence
display(spark.sql(f"""
SHOW TABLES IN {CAT}.{BRONZE}
"""))

In [0]:
# 2) JSON metadata -> Bronze tables (one-time)
# all_available_metrics.json is a JSON array of strings -> parse from text
raw = spark.read.text(ALL_METRICS_JSON)
json_str = raw.select(F.concat_ws("", F.collect_list("value")).alias("json")).first()["json"]

df_metrics = (spark.createDataFrame([(json_str,)], ["json"])
              .select(F.from_json("json", "array<string>").alias("metrics"))
              .select(F.explode("metrics").alias("metric"))
              .withColumn("load_dt", F.current_timestamp())
              .withColumn("source_path", F.lit(ALL_METRICS_JSON))
              .withColumn("ingest_mode", F.lit("json_text_parse"))
)

metrics_tgt = f"{CAT}.{BRONZE}.all_available_metrics_bronze"
df_metrics.write.format("delta").mode("overwrite").saveAsTable(metrics_tgt)
spark.sql(f"COMMENT ON TABLE {metrics_tgt} IS 'List of available metric names from all_available_metrics.json (array of strings).'")

print("Loaded metrics:", df_metrics.count())
display(spark.sql(f"SELECT * FROM {metrics_tgt} LIMIT 20"))

In [0]:
# fields_per_level.json is typically a map<string, array<string>> -> flatten
raw2 = spark.read.text(FIELDS_PER_LEVEL_JSON)
json_str2 = raw2.select(F.concat_ws("", F.collect_list("value")).alias("json")).first()["json"]

df_fields_map = (spark.createDataFrame([(json_str2,)], ["json"])
                 .select(F.from_json("json", "map<string,array<string>>").alias("m")))

df_fields = (df_fields_map
             .select(F.explode("m").alias("level", "fields"))
             .select("level", F.explode("fields").alias("field"))
             .withColumn("load_dt", F.current_timestamp())
             .withColumn("source_path", F.lit(FIELDS_PER_LEVEL_JSON))
             .withColumn("ingest_mode", F.lit("json_text_parse"))
)

fields_tgt = f"{CAT}.{BRONZE}.fields_per_level_bronze"
df_fields.write.format("delta").mode("overwrite").saveAsTable(fields_tgt)
spark.sql(f"COMMENT ON TABLE {fields_tgt} IS 'Fields per level from fields_per_level.json flattened to (level, field).'")

print("Loaded fields rows:", df_fields.count())
display(spark.sql(f"SELECT * FROM {fields_tgt} ORDER BY level, field LIMIT 50"))