In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row

# Data is already saved as a table inside a catalog

In [0]:
spark = SparkSession.builder.appName("CreditCardIngestion").getOrCreate()
df = spark.table("finance_catalog.bronze.creditcard_bronze")
display(df.limit(5))

In [0]:
display(df.select("Class").distinct())

# Data Quality Checks

In [0]:
catalog_name="finance_catalog."
ingestion_database_name="bronze."
bronze_table = "creditcard_bronze"
full_bronze_table = catalog_name+ingestion_database_name+bronze_table
dq_audit_table = "bronze_audits"
full_bronze_audit_table = catalog_name+ingestion_database_name+dq_audit_table
print("Audit table for bronze : ", full_bronze_audit_table)
print("Bronze table :", full_bronze_table)

In [0]:
df_bronze = spark.table(full_bronze_table)
row_count = df_bronze.count()
row_count_check = int(row_count>0)
distinct_row_count = df_bronze.distinct().count()

## Check for the quality of the columns

In [0]:
critical_cols = ["Time", "Amount", "Class"]
audit_results = []
for col in critical_cols:
    null_count_pct=df_bronze.filter(df_bronze[col].isNull()).count()/row_count*100
    completeness_pct = df_bronze.filter(df_bronze[col])
    audit_results.append(Row(attribute_name=col,
                                  row_count=row_count,
                                  distinct_row_count=distinct_row_count,
                                  null_count=null_count,
                                  completeness=100-null_count_pct))
print(audit_results)


In [0]:
data_quality_df = spark.createDataFrame(audit_results)
data_quality_df = data_quality_df.withColumn("report_time", current_timestamp()).withColumn("table_name", lit(bronze_table)).withColumn("report_date", current_date())
data_quality_df = data_quality_df.select("report_date", "table_name","attribute_name", "row_count", "distinct_row_count","null_count", "completeness", "report_time")
display(data_quality_df)
data_quality_df.write.format('delta').mode("append").option("mergeSchema", "true").saveAsTable(full_bronze_audit_table)

In [0]:
df=spark.table(full_bronze_audit_table)
display(df)