In [45]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, regexp_extract, to_date, sum as spark_sum, when, count
import pyspark.sql.functions as F
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("example_app") \
    .getOrCreate()

StatementMeta(, 6a46af5f-9c0a-4ac6-9ada-7a0c42be3672, 47, Finished, Available, Finished)

## Data Quality Functions

###### Function 1 - Primary key validation

In [46]:

from pyspark.sql.window import Window

def keyValidation(df, subset_columns):
    # Count the number of duplicate rows based on the subset of columns
    duplicate_count = df.groupBy(subset_columns).count().filter(F.col("count") > 1).count()
    return duplicate_count

def handle_duplicates(df, subset_columns, strategy='remove', keep='last'):
    if strategy == 'remove':
        if keep == 'first':
            return df.dropDuplicates([subset_columns])
        elif keep == 'last':
            window_spec = Window.partitionBy(subset_columns).orderBy(F.monotonically_increasing_id())
            return df.withColumn("row_num", F.row_number().over(window_spec)) \
                     .filter(F.col("row_num") == 1) \
                     .drop("row_num")
        else:
            raise ValueError("Invalid 'keep' parameter. Use 'first' or 'last'.")
    
    elif strategy == 'flag':
        window_spec = Window.partitionBy(subset_columns)
        return df.withColumn("is_duplicate", F.count("*").over(window_spec) > 1)
    
    elif strategy == 'aggregate':
        # You need to specify the aggregation functions for each column
        # For example, let's assume you want to take the first value for each column
        agg_exprs = [F.first(col).alias(col) for col in df.columns if col not in subset_columns]
        return df.groupBy(subset_columns).agg(*agg_exprs)
    
    else:
        raise ValueError("Invalid strategy provided. Use 'remove', 'flag', or 'aggregate'.")

StatementMeta(, 6a46af5f-9c0a-4ac6-9ada-7a0c42be3672, 48, Finished, Available, Finished)

###### Function 2 - Null value validation

In [75]:
# Step 1: Detect missing values
def detect_missing_values(df):
    missing_values = df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
    return missing_values

# Step 2: Analyze the impact
def detect_missing_values_percentages(df,mv):
    total_rows = df_customer.count()
    missing_percentages = mv.withColumn("total_rows", lit(total_rows)) \
    .select([(col(c) / col("total_rows") * 100).alias(c) for c in df_customer.columns])
    return missing_percentages    

# Step 3: Decide on a strategy
def handle_missing_values(df):
    # Impute missing values with the mean for numerical columns
    for col_name in df.columns:
        if df.schema[col_name].dataType.simpleString() in ["int","bigint","double"]:
            #mean_value = df.select(mean(col(col_name))).collect()[0][0]
            df = df.withColumn(col_name, when(col(col_name).isNull(), 0).otherwise(col(col_name)))
        elif df.schema[col_name].dataType.simpleString() in ["string","boolean"]:
            df = df.withColumn(col_name, when(col(col_name).isNull(), "Unknown").otherwise(col(col_name)))
        else: 
            df = df.withColumn(col_name, when(col(col_name).isNull(), "Error").otherwise(col(col_name)))   
    return df    

StatementMeta(, 6a46af5f-9c0a-4ac6-9ada-7a0c42be3672, 77, Finished, Available, Finished)

## Customer

In [48]:
# Read the Parquet file
df_customer = spark.read.parquet("Files/SalesLT.Customer_W01_data_delta_bronze")

print((df_customer.count(), len(df_customer.columns)))
print('----------')
df_customer.printSchema()

StatementMeta(, 6a46af5f-9c0a-4ac6-9ada-7a0c42be3672, 50, Finished, Available, Finished)

(100, 15)
----------
root
 |-- CustomerID: long (nullable = true)
 |-- NameStyle: long (nullable = true)
 |-- Title: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- MiddleName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Suffix: string (nullable = true)
 |-- CompanyName: string (nullable = true)
 |-- SalesPerson: string (nullable = true)
 |-- EmailAddress: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- PasswordHash: string (nullable = true)
 |-- PasswordSalt: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: string (nullable = true)



In [52]:

duplicate_count = keyValidation(df_customer,'CustomerID')

if duplicate_count > 0:
    print(f"The primary key is not unique.")
    df_customer_F1 = handle_duplicates(df_customer,'CustomerID')
    
else:
    print(f"No duplicate values found, the primary key is unique.")
    df_customer_F1 = df_customer


StatementMeta(, 6a46af5f-9c0a-4ac6-9ada-7a0c42be3672, 54, Finished, Available, Finished)

No duplicate values found, the primary key is unique.


In [76]:
missing_counts = detect_missing_values(df_customer_F1)
print("Missing value counts:")
missing_counts.show()

missing_percentages = detect_missing_values_percentages(df_customer_F1,missing_counts)
print("Missing value percentages:")
missing_percentages.show()

df_customer_F2 = handle_missing_values(df_customer_F1)

# Step 5: Validate the cleaned data
cleaned_missing_counts = detect_missing_values(df_customer_F2)
print("Missing value counts after cleaning:")
cleaned_missing_counts.show()

StatementMeta(, 6a46af5f-9c0a-4ac6-9ada-7a0c42be3672, 78, Finished, Available, Finished)

Missing value counts:
+----------+---------+-----+---------+----------+--------+------+-----------+-----------+------------+-----+------------+------------+-------+------------+
|CustomerID|NameStyle|Title|FirstName|MiddleName|LastName|Suffix|CompanyName|SalesPerson|EmailAddress|Phone|PasswordHash|PasswordSalt|rowguid|ModifiedDate|
+----------+---------+-----+---------+----------+--------+------+-----------+-----------+------------+-----+------------+------------+-------+------------+
|         0|        0|    2|        0|        38|       0|    96|          0|          0|           0|    0|           0|           0|      0|           0|
+----------+---------+-----+---------+----------+--------+------+-----------+-----------+------------+-----+------------+------------+-------+------------+

Missing value percentages:
+----------+---------+-----+---------+----------+--------+------+-----------+-----------+------------+-----+------------+------------+-------+------------+
|CustomerID|Na