In [144]:
%%sql
CREATE TABLE IF NOT EXISTS fact_error_event (
    table_name STRING,
    error_timestamp TIMESTAMP,
    error_type STRING,
    severity STRING,
    columns_checked STRING,
    row_identifier STRING,
    error_description STRING,
    collision_id STRING,
    extraction_timestamp string
) USING DELTA;


StatementMeta(, 5f12afc4-62cb-41a0-9bbc-412ebe1bdf33, 144, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [145]:
from pyspark.sql.functions import col, lit, current_timestamp, concat_ws

# Step 1: Extract null rows from nyc_crashes
null_rows = spark.sql("""
    SELECT * FROM nyc_crashes
    WHERE borough IS NULL
""").withColumn("columns_checked", 
      lit("borough,location")
).withColumn("error_type", 
      lit("NULL_CHECK")
).withColumn("severity", 
      lit("HIGH")
).withColumn("error_description", 
      lit("Nulls found in borough or location")
).withColumn("error_timestamp", 
      current_timestamp()
).withColumn("table_name", 
      lit("nyc_crashes")
).withColumn("row_identifier", 
      concat_ws("-", *spark.table("nyc_crashes").columns)
)

# Step 2: Load existing error fact rows
existing_errors = spark.table("fact_error_event").filter(
    col("error_type") == "NULL_CHECK"
).select("row_identifier", "columns_checked", "error_type")

# Step 3: Anti-join to exclude already logged issues
new_errors = null_rows.join(existing_errors, 
    on=["row_identifier", "columns_checked", "error_type"], 
    how="left_anti"
).select(
    "table_name", 
    "error_timestamp", 
    "error_type", 
    "severity",
    "columns_checked", 
    "row_identifier", 
    "error_description", 
    "collision_id",
    'extraction_timestamp'  # include this field
)

# Step 4: Append only new error rows
if new_errors.count() > 0:
    inserted_count = new_errors.count()    
    new_errors.write.mode("append").format("delta").saveAsTable("fact_error_event")
    print(f"Number of nulls records inserted: {inserted_count}")
else:
    print('No new errors identified')


StatementMeta(, 5f12afc4-62cb-41a0-9bbc-412ebe1bdf33, 145, Finished, Available, Finished)

Number of nulls records inserted: 34


In [146]:
from pyspark.sql.functions import col, lit, current_timestamp, concat_ws

# Step 1: Define columns to check for duplicates
duplicate_check_cols = ["crash_date", "borough", "vehicle_type_code1", "collision_id", "latitude", "longitude", "crash_time"]

# Step 2: Identify duplicate keys
duplicate_keys = (
    spark.table("nyc_crashes")
    .groupBy(duplicate_check_cols)
    .count()
    .filter(col("count") > 1)
    .drop("count")
)


# Ensure extraction_timestamp is present and properly typed
if "extraction_timestamp" not in null_rows.columns:
    null_rows = null_rows.withColumn("extraction_timestamp", lit(None).cast(TimestampType()))
else:
    null_rows = null_rows.withColumn("extraction_timestamp", col("extraction_timestamp").cast(TimestampType()))


# Step 3: Get full duplicate rows
duplicate_rows = (
    spark.table("nyc_crashes")
    .join(duplicate_keys, on=duplicate_check_cols, how="inner")
    .withColumn("columns_checked", lit(",".join(duplicate_check_cols)))
    .withColumn("error_type", lit("DUPLICATE_CHECK"))
    .withColumn("severity", lit("MEDIUM"))
    .withColumn("error_description", lit("Duplicate record found"))
    .withColumn("error_timestamp", current_timestamp())
    .withColumn("table_name", lit("nyc_crashes"))
    .withColumn("row_identifier", concat_ws("-", *duplicate_check_cols))
)

# Step 4: Load existing duplicate error logs
existing_duplicates = spark.table("fact_error_event").filter(
    col("error_type") == "DUPLICATE_CHECK"
).select("row_identifier", "columns_checked", "error_type")

# Step 5: Filter out already logged duplicates
new_duplicates = duplicate_rows.join(
    existing_duplicates,
    on=["row_identifier", "columns_checked", "error_type"],
    how="left_anti"
).select(
    "table_name",
    "error_timestamp",
    "error_type",
    "severity",
    "columns_checked",
    "row_identifier",
    "error_description",
    "collision_id",
    "extraction_timestamp"  # ensure collision_id is included
)

# Step 6: Write new duplicate errors to fact table
if new_duplicates.count() > 0:
    inserted_count = duplicate_rows.count()
    new_duplicates.write.mode("append").format("delta").saveAsTable("fact_error_event")
    print(f"Number of duplicate records inserted: {inserted_count}")
else:
    print('No new duplicate errors found')


StatementMeta(, 5f12afc4-62cb-41a0-9bbc-412ebe1bdf33, 146, Finished, Available, Finished)

No new duplicate errors found


In [147]:
df = spark.sql("SELECT * FROM Vscode.fact_error_event")
display(df)

StatementMeta(, 5f12afc4-62cb-41a0-9bbc-412ebe1bdf33, 147, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, dfbf9531-5266-4198-9b48-4f40a360f951)

In [153]:
from pyspark.sql.functions import *
## checking the error distribution and error event timestamp


error_df =df.groupBy('error_timestamp',
    'error_type', 
    to_date(col("extraction_timestamp")).alias('extraction_date')
    ).agg(count('*').alias('count'))

StatementMeta(, 5f12afc4-62cb-41a0-9bbc-412ebe1bdf33, 153, Finished, Available, Finished)

In [154]:
error_df.write.mode('overwrite').format('delta').saveAsTable('Error_Counts')

StatementMeta(, 5f12afc4-62cb-41a0-9bbc-412ebe1bdf33, 154, Finished, Available, Finished)

In [149]:
## reading the tables from the lakehouse
bronze_df = spark.table("nyc_crashes")
error_ids_df = spark.table("fact_error_event").select("collision_id").distinct()

 ## the left anti join collects everything from the  and left that is not on the right table, hence if succesful there will be no nulls or duplicate

clean_df = bronze_df.join(error_ids_df, on="collision_id", how="left_anti")

## creating a new table name silver_nyc_crashes enforcing the 3 layer method (bronze, silver and gold layers)
clean_df.write.mode("overwrite").format("delta").saveAsTable("silver_nyc_crashes")



StatementMeta(, 5f12afc4-62cb-41a0-9bbc-412ebe1bdf33, 149, Finished, Available, Finished)

In [150]:
%%sql
-- Checking if there is any nulls or duplicate using the fact error event table as reference
 
SELECT * FROM silver_nyc_crashes
JOIN fact_error_event ON silver_nyc_crashes.collision_id = fact_error_event.collision_id 

StatementMeta(, 5f12afc4-62cb-41a0-9bbc-412ebe1bdf33, 150, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 20 fields>

In [156]:
%%sql
SELECT * FROM error_counts

StatementMeta(, 5f12afc4-62cb-41a0-9bbc-412ebe1bdf33, 156, Finished, Available, Finished)

<Spark SQL result set with 3 rows and 4 fields>

In [157]:
%%sql

SELECT COUNT(*) FROM silver_nyc_crashes

StatementMeta(, 5f12afc4-62cb-41a0-9bbc-412ebe1bdf33, 157, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>