In [None]:
import os

# Set Python 3.11 explicitly for driver and workers
os.environ["PYSPARK_PYTHON"] = r"C:\Users\Mohamad\AppData\Local\Programs\Python\Python311\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Users\Mohamad\AppData\Local\Programs\Python\Python311\python.exe"


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce

spark = SparkSession.builder.appName('Testing').getOrCreate()

In [None]:
df_testing = spark.read.json('Data/chatgpt_generated_1.json')
df_testing.show()
df_testing.printSchema()

In [None]:
df_testing.select('driver', col('driver.age'), 'driver_age', 'vehicle', col('vehicle.plate'), 'plate_number').show(truncate=False)


In [None]:
manual_df = (
    df_testing
    .withColumn("driver_age_std", coalesce("driver_age", col("driver.age")))
    .withColumn("plate_number_std", coalesce("plate_number", col("vehicle.plate"), "vehicle_plate"))
    .withColumn("policy_start_date_std", coalesce("policy_start_date", col("policy.start"), "start_date"))
    .withColumn("policy_end_date_std", coalesce("policy_end_date", col("policy.end"), "end_date"))
)

manual_df.select(
    "policy_number",
    "driver_age", "driver.age", "driver_age_std",
    "plate_number", "vehicle.plate", "vehicle_plate", "plate_number_std",
    "policy_start_date", "policy.start", "start_date", "policy_start_date_std",
    "policy_end_date", "policy.end", "end_date", "policy_end_date_std",
).show(truncate=False)

In [None]:
valid_plates = df_testing.filter(col("plate_number").isNotNull() & (col("plate_number") != "")).count()
invalid_plates = df_testing.filter(col("plate_number").isNull() | (col("plate_number") == "")).count()
print(f"Valid plates: {valid_plates}, Invalid plates: {invalid_plates}")

df_testing.filter(col("plate_number").isNull() | (col("plate_number") == "")).select("policy_number", "plate_number").show()


In [None]:
from src.metadata_loader import load_metadata
from src.reader import read_sources
from src.transformations import apply_transformations

metadata = load_metadata("metadata_motor.json")
dataflow = metadata["dataflows"][0]

frames = read_sources(spark, dataflow)
frames = apply_transformations(dataflow, frames)

policy_df = frames["policy_standardized"]
policy_df.show(truncate=False)
policy_df.printSchema()

In [None]:
from src.validator import apply_validations

validation_transformation = next(t for t in dataflow["transformations"] if t["type"] == "validate_fields")
rules = validation_transformation["params"]["validations"]
input_df = frames[validation_transformation["params"]["input"]]

ok_df, ko_df = apply_validations(input_df, rules)

print("OK count:", ok_df.count())
print("KO count:", ko_df.count())

print("=== STANDARD_OK sample ===")
ok_df.show(truncate=False)

print("=== STANDARD_KO sample ===")
ko_df.show(truncate=False)

In [None]:
ok_df.select("policy_number", "driver_age", "plate_number", "validation_errors").show(truncate=False)
ko_df.select("policy_number", "driver_age", "plate_number", "validation_errors").show(truncate=False)

In [None]:
from pyspark.sql.functions import col, explode

ko_with_errors = ko_df.select("policy_number", explode("validation_errors").alias("error"))
ko_with_errors.show(truncate=False)

In [None]:
# Complete fail here... idk why it breaks i tried multiple fixes and it's not working. will get back to it later (well i guess it fails cuase the column isn't havintg the same datatype so maybe having a scheme for the datatype and converting all to string might work but iwll see)
# ah it was a python 3.12 error...

# import pandas as pd

# test_data = [
#     {"policy_number": "TEST-001", "driver_age": 25, "plate_number": "TEST-123"},
#     {"policy_number": "TEST-002", "driver_age": "25", "plate_number": "TEST-456"},
#     {"policy_number": "TEST-003", "driver_age": 15, "plate_number": "TEST-789"},
#     {"policy_number": "TEST-004", "driver_age": 101, "plate_number": "TEST-000"},
#     {"policy_number": "TEST-005", "driver_age": None, "plate_number": "TEST-111"},
#     {"policy_number": "TEST-006", "driver_age": 30, "plate_number": ""},
#     {"policy_number": "TEST-007", "driver_age": 30, "plate_number": None},
# ]

# pd_df = pd.DataFrame(test_data)
# test_df = spark.createDataFrame(pd_df)
# test_df.show()

In [None]:
test_df = spark.createDataFrame([
    {"policy_number": "TEST-006", "driver_age": 30, "plate_number": ""},
    {"policy_number": "TEST-007", "driver_age": 30, "plate_number": None},
])

test_rules = [
    {"field": "plate_number", "validations": ["notEmpty"]},
    {"field": "driver_age", "validations": ["notNull", "isNumeric", "min:18", "max:100"]}
]

test_ok, test_ko = apply_validations(test_df, test_rules)

print("Test OK count:", test_ok.count())
print("Test KO count:", test_ko.count())

test_ko.select(
    "policy_number",
    "driver_age",
    "plate_number",
    "validation_errors"
).show(truncate=False)


In [None]:
date_test_cases = [
    {"policy_number": "DATE-001", "policy_start_date": "2024-01-01", "policy_end_date": "2025-01-01"},
    {"policy_number": "DATE-002", "policy_start_date": "2025-01-01", "policy_end_date": "2024-01-01"},
    {"policy_number": "DATE-003", "policy_start_date": "invalid-date", "policy_end_date": "2025-01-01"},
    {"policy_number": "DATE-004", "policy_start_date": "2024-01-01", "policy_end_date": None},
]

date_test_df = spark.createDataFrame(date_test_cases)
date_test_df.show()

date_rules = [
    {"field": "policy_start_date", "validations": ["isDate", "dateBefore:policy_end_date"]},
    {"field": "policy_end_date", "validations": ["isDate"]}
]

date_ok, date_ko = apply_validations(date_test_df, date_rules)
print("Date validation OK:", date_ok.count())
print("Date validation KO:", date_ko.count())
date_ko.select("policy_number", "policy_start_date", "policy_end_date", "validation_errors").show(truncate=False)

In [None]:
numeric_test_cases = [
    {"policy_number": "NUM-001", "driver_age": 25},
    {"policy_number": "NUM-002", "driver_age": "25"},
    {"policy_number": "NUM-003", "driver_age": "twenty-five"},
    {"policy_number": "NUM-004", "driver_age": 25.5},
    {"policy_number": "NUM-005", "driver_age": None},
]

numeric_test_df = spark.createDataFrame(numeric_test_cases)
numeric_test_df.show()

numeric_rules = [
    {"field": "driver_age", "validations": ["isNumeric"]},
]

numeric_ok, numeric_ko = apply_validations(numeric_test_df, numeric_rules)
print("Numeric validation OK:", numeric_ok.count())
print("Numeric validation KO:", numeric_ko.count())
numeric_ko.select("policy_number", "driver_age", "validation_errors").show(truncate=False)

In [None]:
range_test_cases = [
    {"policy_number": "RANGE-001", "driver_age": 18},
    {"policy_number": "RANGE-002", "driver_age": 100},
    {"policy_number": "RANGE-003", "driver_age": 17},
    {"policy_number": "RANGE-004", "driver_age": 101},
    {"policy_number": "RANGE-005", "driver_age": 50},
]

range_test_df = spark.createDataFrame(range_test_cases)
range_test_df.show()

range_rules = [
    {"field": "driver_age", "validations": ["min:18", "max:100"]},
]

range_ok, range_ko = apply_validations(range_test_df, range_rules)
print("Range validation OK:", range_ok.count())
print("Range validation KO:", range_ko.count())
range_ko.select("policy_number", "driver_age", "validation_errors").show(truncate=False)

In [None]:
frames.keys()

In [None]:
frames["policy_with_ingestion"].select("policy_number", "ingestion_dt").show(truncate=False)

In [None]:
ok_with_ingestion = frames["validation_ok"]
ko_with_ingestion = frames["validation_ko"]

print("OK records have ingestion_dt:", "ingestion_dt" in ok_with_ingestion.columns)
print("KO records have ingestion_dt:", "ingestion_dt" in ko_with_ingestion.columns)

ok_with_ingestion.select("policy_number", "ingestion_dt").show(5, truncate=False)
ko_with_ingestion.select("policy_number", "ingestion_dt").show(5, truncate=False)

In [None]:
from pyspark.sql.functions import count, when, isnan, col

validation_summary = ko_df.select(
    count(when(col("validation_errors").isNotNull(), 1)).alias("total_ko"),
    count(when(col("validation_errors").contains("plate_number"), 1)).alias("plate_errors"),
    count(when(col("validation_errors").contains("driver_age"), 1)).alias("age_errors"),
    count(when(col("validation_errors").contains("policy_start_date"), 1)).alias("date_errors"),
)

validation_summary.show()

In [None]:
empty_rules = []
empty_ok, empty_ko = apply_validations(policy_df, empty_rules)
print("Empty rules - OK count:", empty_ok.count())
print("Empty rules - KO count:", empty_ko.count())

In [None]:
all_fields_test_rules = [
    {"field": "policy_number", "validations": ["notNull"]},
    {"field": "driver_age", "validations": ["notNull", "isNumeric", "min:18", "max:100"]},
    {"field": "plate_number", "validations": ["notEmpty"]},
    {"field": "policy_start_date", "validations": ["isDate"]},
    {"field": "policy_end_date", "validations": ["isDate"]},
]

all_ok, all_ko = apply_validations(policy_df, all_fields_test_rules)
print("All fields validation - OK:", all_ok.count())
print("All fields validation - KO:", all_ko.count())
all_ko.select("policy_number", "validation_errors").show(truncate=False)

In [None]:
print("Total input records:", df_testing.count())
print("After normalization:", policy_df.count())
print("After validation - OK:", ok_df.count())
print("After validation - KO:", ko_df.count())
print("Sum check (OK + KO):", ok_df.count() + ko_df.count())

In [None]:
cols = policy_df.columns
print("Raw struct columns still present:", [c for c in ("driver", "vehicle", "policy") if c in cols])
print("All columns:", cols)