In [0]:
from pyspark.sql.functions import *
import uuid
from datetime import datetime

# Load Parquet
df = spark.read.csv("/Volumes/workspace/default/delta_quality/delta_quality_input.csv", header=True, inferSchema=True)
df.show()

# Save as Delta Table
df.write.format("delta").mode("overwrite").saveAsTable("data_quality_file")

## **Create Data Quality Session Table**

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql import SparkSession
from datetime import datetime

# Create schema if not exists
spark.sql("CREATE SCHEMA IF NOT EXISTS data_quality_file")

# Create session table if not exists
spark.sql("""
CREATE TABLE IF NOT EXISTS data_quality_file_session_log (
    session_id STRING,
    start_time TIMESTAMP,
    end_time TIMESTAMP
) USING DELTA
""")

# Generate session details
session_id = datetime.now().strftime("%Y%m%d%H%M%S")
start_time = datetime.now()

# Define schema
schema = StructType([
    StructField("session_id", StringType(), True),
    StructField("start_time", TimestampType(), True),
    StructField("end_time", TimestampType(), True)
])

# Create DataFrame
session_df = spark.createDataFrame([(
    session_id,
    start_time,
    None  # null end time
)], schema=schema)

# Append new session
session_df.write.format("delta").mode("overwrite").saveAsTable("data_quality_file_session_log")


In [0]:
%sql
select * from data_quality_file_session_log

## ** Data Quality Summary Table**

In [0]:
# Ensure schema exists before saving
spark.sql("CREATE SCHEMA IF NOT EXISTS data_summary_file")

# Get column names
columns = df.columns
total_rows = df.count()

# Build summary rows
summary_data = []
file_name = "sales_data"  # Use real file name if available

for col_name in columns:
    null_count = df.filter(col(col_name).isNull()).count()
    
    summary_data.append({
        "id": str(uuid.uuid4()),
        "session_id": session_id,
        "file_name": file_name,
        "column_name": col_name,
        "total_rows": total_rows,
        "null_failed": null_count
    })

# Convert to DataFrame
dq_summary_df = spark.createDataFrame(summary_data)

# Save summary
dq_summary_df.write.format("delta").mode("overwrite").saveAsTable("data_summary_file.dq_summary")


In [0]:
%sql
select * from data_summary_file.dq_summary

## **Update Session End Time**

In [0]:
from pyspark.sql.functions import when, col, lit
from datetime import datetime

# Update end time
end_time = datetime.now()

# Load and update session
df_sess = spark.table("delta_summary_file.dq_summary") \
    .withColumn("end_time", when(col("session_id") == session_id, lit(end_time)))

df_sess.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable("delta_summary_file.dq_summary")

In [0]:
%sql

select * from data_summary_file.dq_summary

## **PIT Data Quality Summary **

In [0]:
spark.sql("CREATE SCHEMA IF NOT EXISTS delta_summary_file")

# Create empty PIT summary table if not exists
spark.sql("""
CREATE TABLE IF NOT EXISTS delta_summary_file.dq_pit_summary (
    session_id STRING,
    total_columns INT,
    total_nulls INT,
    quality_score DOUBLE
) USING DELTA
""")



spark.sql("SELECT * FROM delta_summary_file.dq_pit_summary").show()