In [0]:
# Set paths to external location (s3 bucket)
S3_BUCKET = "s3://stock-pipeline-data-dev-mc"
CATALOG = "stock_pipeline"
SCHEMA = "bronze"
TABLE = "fmp_fundamentals"
FULL_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{TABLE}"

# External paths only used if you choose path-based tables instead of UC (we use UC here)
BRONZE_TABLE_PATH = f"{S3_BUCKET}/bronze/fmp_fundamentals/"
CHECKPOINT_PATH = f"{S3_BUCKET}/_checkpoints/bronze_fundamentals/"
SCHEMA_LOC_PATH = f"{S3_BUCKET}/_checkpoints/bronze_fundamentals_schema/"

## Ensure Catalog/Schema exist
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")


In [0]:
%sql
CREATE TABLE IF NOT EXISTS stock_pipeline.bronze.fmp_fundamentals
USING DELTA
LOCATION 's3://stock-pipeline-data-dev-mc/bronze/fmp_fundamentals/'

In [0]:
## Auto Loader Stream
from pyspark.sql.functions import col, input_file_name, current_timestamp, to_date, to_timestamp

# Use wildcard to match all statement types
RAW_FUNDAMENTALS_PATH = f"{S3_BUCKET}/raw/fmp/*/"

df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json") # NDJSON lines (gz supported)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.schemaLocation", SCHEMA_LOC_PATH)
        .option("cloudFiles.schemaEvolutionMode", "rescue")
        .option("recursiveFileLookup", "true")
        .option("rescuedDataColumn", "_rescued_data")
        .option("cloudFiles.includeExistingFiles", "false") # Change to false for only new files
        .load(RAW_FUNDAMENTALS_PATH)
)


bronze_df = (
    df
    .withColumn("as_of_date", to_date("as_of_date"))
    .withColumn("fiscal_period_end", to_date("fiscal_period_end"))
    .withColumn("filing_date", to_date("filing_date"))
    .withColumn("fetched_at_ts", to_timestamp("fetched_at"))
    .withColumn("_input_file", col("_metadata.file_path"))
    .withColumn("_processing_time", current_timestamp())
)


In [0]:
## Write to Bronze Delta (Backfill once)
q = (
    bronze_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", f"{CHECKPOINT_PATH}/stream")
    .option("mergeSchema", "true")
    .option("optimizeWrite", "true")
    .option("autoCompact", "true")
    .trigger(availableNow=True)
    .toTable(FULL_TABLE_NAME)
)

q.awaitTermination()

# Enable table conveniences
spark.sql(f"""
ALTER TABLE {FULL_TABLE_NAME}
SET TBLPROPERTIES (
    delta.autoOptimize.optimizeWrite = true,
    delta.autoOptimize.autoCompact = true,
    delta.enableChangeDataFeed = true
)
""")

## Verify
bronze = spark.read.table(FULL_TABLE_NAME)

print("Counts by endpoint:")
display(bronze.groupBy("endpoint").count().orderBy("endpoint"))

print("Counts by symbol (top 20):")
display(bronze.groupBy("symbol").count().orderBy(col("count").desc()).limit(20))

print("Sample records:")
display(bronze.select("symbol","endpoint","fiscal_period_end","filing_date","fetched_at_ts").limit(10))


## Quick Data Quality
from pyspark.sql.functions import sum as _sum

nulls = bronze.select(
*[col(c).isNull().cast("int").alias(c) for c in ["symbol","endpoint","payload","hash"]]
).agg(*[_sum(c).alias(c) for c in ["symbol","endpoint","payload","hash"]])

display(nulls)

dupes = bronze.groupBy("hash").count().filter(col("count") > 1)
print("Duplicate payload hashes:", dupes.count())

