Pull in Schema Format

In [0]:
%run "/Workspace/Users/ashleysekulic@gmail.com/tennis-analysis/notebooks/first_point_analysis/match_first_point_winner/tennis_points_raw_schema"

Create Widgets for Bronze Table

In [0]:
dbutils.widgets.text("catalog", "workspace")
dbutils.widgets.text("schema", "bronze")
dbutils.widgets.text("table", "tennis_points_raw")
dbutils.widgets.text("volume_path", "/Volumes/workspace/default/my_data_volume/tennis")

Retrieve Files from Volume

In [0]:
import re

volume_path = dbutils.widgets.get("volume_path")

files = dbutils.fs.ls(volume_path)
pattern = re.compile(r"\d{4}-[a-zA-Z]+-points\.csv")

candidate_files = [ f for f in files if pattern.match(f.name)]
print(candidate_files)



Bronze Ingestion

In [0]:
from pyspark.sql.functions import current_timestamp, lit
from pyspark.sql.functions import col
from pyspark.sql.utils import AnalysisException

catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
table = dbutils.widgets.get("table")

#Create Unity Catalog schema
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")

try:
    bronze_df = spark.table(f"{catalog}.{schema}.{table}")
    table_exists = True
except AnalysisException:
    table_exists = False

for f in candidate_files:
    file_name = f.name
    file_path = f.path
    archive_path = f"{volume_path}/archive/{file_name}"

    files_already_ingested = False
    
    if table_exists:
        bronze_df = spark.table(f"{catalog}.{schema}.{table}")

        files_already_ingested = (
            bronze_df
            .filter(col("source_file") == file_name)
            .limit(1)
            .count() > 0
        )

    if files_already_ingested:
        dbutils.fs.mv(file_path, archive_path)
        print(f"File {file_name} already ingested. Archiving then Skipping.")
        continue

    raw_df = (
        spark.read
        .option("header", "true")
        .schema(tennis_schema)
        .csv(file_path)
    )

    new_bronze_df = (
        raw_df
        .withColumn("source_file", lit(file_name))
        .withColumn("ingest_timestamp", current_timestamp())
    )
    
    write_mode = "append" if table_exists else "overwrite"

    (
        new_bronze_df.write
        .format("delta")
        .mode(write_mode)
        .saveAsTable(f"{catalog}.{schema}.{table}")
    )

    dbutils.fs.mv(file_path, archive_path)
    print(f"File {file_name} ingested and archived.")



Validation and Testing

In [0]:
%sql
SELECT source_file, COUNT(*) AS row_count
FROM workspace.bronze.tennis_points_raw
GROUP BY source_file
ORDER BY source_file;


In [0]:
%sql
-- CREATE OR REPLACE TEMP VIEW bronze_dedup AS
-- SELECT *
-- FROM (
--   SELECT *,
--          ROW_NUMBER() OVER (
--            PARTITION BY source_file, match_id, PointNumber
--            ORDER BY ingest_timestamp ASC
--          ) AS rn
--   FROM workspace.bronze.tennis_points_raw
-- )
-- WHERE rn = 1;

-- CREATE OR REPLACE TABLE workspace.bronze.tennis_points_raw
-- USING DELTA
-- AS
-- SELECT
--   * EXCEPT (rn)
-- FROM bronze_dedup;




In [0]:
# %sql
# SELECT
#   source_file,
#   COUNT(DISTINCT ingest_timestamp) AS ingest_runs,
#   COUNT(*) AS total_rows
# FROM workspace.bronze.tennis_points_raw
# GROUP BY source_file
# ORDER BY source_file;
