In [0]:
from typing import List
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import lit, current_timestamp, col, split, trim, expr, regexp_replace
from pyspark.sql.types import ArrayType, StringType, StructType, StructField

In [0]:
CATALOG = "workspace"
SCHEMA = "med"
RAW_TABLE = f"{CATALOG}.{SCHEMA}.raw_data"

RAW_RECORD_SCHEMA = StructType([
    StructField("doc_id", StringType(), False),
    StructField("category", StringType(), True),
    StructField("title", StringType(), True),
    StructField("synonyms", StringType(), True),
    StructField("url", StringType(), True),
    StructField("raw_text", StringType(), True),
    StructField("meta_json", StringType(), True),
])

In [0]:
# ensure table exists, if not create it
def ensure_raw_table():
    create_ddl = f"""
    CREATE TABLE IF NOT EXISTS {RAW_TABLE} (
      doc_id      STRING,
      category    STRING,
      source      STRING,
      title       STRING,
      synonyms    ARRAY<STRING>,
      url         STRING,
      raw_text    STRING,
      meta_json   STRING,
      ingested_at TIMESTAMP
    )
    USING DELTA
    """
    spark.sql(create_ddl)

In [0]:
def cast_synonyms_to_array(df: DataFrame, colname: str = "synonyms") -> DataFrame:
    if colname not in df.columns:
        return df.withColumn(colname, lit(None).cast("array<string>"))

    dtype = dict(df.dtypes).get(colname)

    if dtype and dtype.startswith("array"):
        return df.withColumn(colname, col(colname).cast(ArrayType(StringType())))

    from pyspark.sql.functions import regexp_replace

    df = df.withColumn(colname, regexp_replace(col(colname), ";", ","))
    df = df.withColumn(colname, split(col(colname), ",").cast("array<string>"))
    df = df.withColumn(colname, expr(f"transform({colname}, x -> trim(x))"))

    return df

In [0]:
def standardize_docs(df: DataFrame, source_value: str) -> DataFrame:
    required: List[str] = ["doc_id", "category", "title", "synonyms", "url", "raw_text", "meta_json"]

    for c in required:
        if c not in df.columns:
            df = df.withColumn(c, lit(None).cast("string"))

    df = cast_synonyms_to_array(df, "synonyms")

    return (
        df.select(*required)
          .withColumn("source", lit(source_value))
          .withColumn("ingested_at", current_timestamp())
    )

In [0]:
def write_raw_docs(df: DataFrame):
    (df.write.format("delta").mode("append").saveAsTable(RAW_TABLE))

In [0]:
def overwrite_source(df: DataFrame, source_value: str):
    (
        df.write
          .format("delta")
          .mode("overwrite")
          .option("replaceWhere", f"source = '{source_value}'")
          .saveAsTable(RAW_TABLE)
    )

In [0]:
def load_records_to_raw_data(records: List[dict], source_value: str, preview: bool = False):
    if not records:
        print(f"No records to load for source {source_value}")
        return

    df = spark.createDataFrame([Row(**r) for r in records], schema=RAW_RECORD_SCHEMA)

    if preview:
        display(df.limit(5))

    standardized = standardize_docs(df, source_value=source_value)
    overwrite_source(standardized, source_value=source_value)