 # Part 1 - Bronze layer
- Raw data ingestion (No parsing or transformation)
- Adding technical metadata ie columns [1. _ingestion_timestamp,2. _source,3. _status]
- Error handling (missing file, unreadable rows, empty lines)
- Some Data Quality Metrics (line count, empty line count)
- Save to data/ukprop_bronze.parquet (the optimal file type is checked in the silver layer, we went for the standard here)

### Imports

In [7]:
# Import Required Libraries
import os
import pandas as pd
import time
from datetime import datetime, timedelta
import random
import json
import findspark
import csv

findspark.init()

# PySpark imports
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import * # isnan, count, desc
    from pyspark.sql.types import *
    from pyspark.sql import Row

    pyspark_available = True
    print("PySpark is available!")
except ImportError:
    print("PySpark not found. Please install with: pip install pyspark")
    pyspark_available = False

if not pyspark_available:
    raise ImportError("Cannot proceed without PySpark. Please install PySpark first.")

# Create SparkSession with custom configuration
spark = SparkSession.builder \
    .appName("DAT535-Project") \
     .config("spark.sql.adaptive.enabled", "true") \
     .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
     .config("spark.driver.memory", "2g") \
     .config("spark.executor.memory", "1g") \
     .getOrCreate()

# Set log level to reduce verbose output
spark.sparkContext.setLogLevel("WARN")

print("✓ SparkSession created successfully!")
print(f"Spark Version: {spark.version}")
print(f"Scala version: {spark.sparkContext._gateway.jvm.scala.util.Properties.versionNumberString()}")
print(f"Application Name: {spark.sparkContext.appName}")
print(f"Master: {spark.sparkContext.master}")
print(f"Default Parallelism: {spark.sparkContext.defaultParallelism}")

PySpark is available!
✓ SparkSession created successfully!
Spark Version: 3.5.0
Scala version: 2.12.18
Application Name: DAT535-Project
Master: local[*]
Default Parallelism: 4


### RDD -> DataFrame hybrid approach
+ Map/Reduce fundamentals as requested by our God lord and savior (Tomasz)
+ Allows per line error handling without failing the pipeline (try/except), useful to learn

In [10]:
print("=== Bronze Layer: Raw Data Ingestion ===")

source = "data/ukprop_unstructured.txt"

# Parse Line and handle errors (Bronze layer pattern)
def parse_line_safe(line):
    try:
        return {
            "_raw_data": line,
            "_ingestion_timestamp": time.time(),
            "_source": source,
            "_status": "valid"
        }
    except Exception as error:
        return {
            "_raw_data": line,
            "_ingestion_timestamp": time.time(),
            "_source": source,
            "_status": "parse_error"
        }

start_time = time.time()

# Create RDD from raw text
raw_rdd = spark.sparkContext.textFile(source)
bronze_rdd = raw_rdd.map(parse_line_safe)

df_bronze = spark.createDataFrame(bronze_rdd)
df_bronze.show(n=4, truncate=True)

end_time = time.time()

print(f"Execution time: {end_time - start_time:.2f} seconds")

print("Counting data for the data quality metrics")
total_records = df_bronze.count()
empty_records = df_bronze.filter(trim(col("_raw_data")) == "").count()
valid_records = df_bronze.filter(col("_status") == "valid").count()
error_records = df_bronze.filter(col("_status") == "parse_error").count()
success_rates = (valid_records/total_records)*100

print("\nData Quality Metrics:")
print(f"total_records: {total_records}")
print(f"valid_records: {valid_records}")
print(f"empty_records: {empty_records}")
print(f"error_records: {error_records}")
print(f"success_rates: {success_rates:.1f}%")

# not needed as per today. Errors were not really present here so. Therefore we added no more error handling
# print("\nError type breakdown:") 
# bronze_df.filter(col("_status") != "valid").groupBy("_status").count().show()

=== Bronze Layer: Raw Data Ingestion ===


25/11/24 12:07:02 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 54 (TID 687): Attempting to kill Python Worker
                                                                                

+--------------------+--------------------+--------------------+-------+
|_ingestion_timestamp|           _raw_data|             _source|_status|
+--------------------+--------------------+--------------------+-------+
|1.7639860183155077E9|A newly built lea...|data/ukprop_unstr...|  valid|
|1.7639860183155622E9|A newly built fre...|data/ukprop_unstr...|  valid|
|1.7639860183155687E9|This newly built ...|data/ukprop_unstr...|  valid|
|1.7639860183155925E9|An newly built fr...|data/ukprop_unstr...|  valid|
+--------------------+--------------------+--------------------+-------+
only showing top 4 rows

Execution time: 4.44 seconds
Counting data for the data quality metrics





Data Quality Metrics:
total_records: 5732838
valid_records: 5732838
empty_records: 0
error_records: 0
success_rates: 100.0%


                                                                                

### (Alternative) Dataframe approach
+ Faster (cayalyst optimizer)
+ Cleaner syntax
+ No manual parsing in .py
+ Easy metadata columns (current_timestamp, lit)
+ -Crashes if one fails

In [9]:
# 1. Read raw text
source = "data/ukprop_unstructured.txt"

start_time = time.time()

df_bronze_raw = spark.read.text(source).withColumnRenamed("value", "_raw_data")

# 2. Add ingestion metadata
df_bronze = (
    df_bronze_raw
    .withColumn("_ingestion_timestamp", current_timestamp())
    .withColumn("_source", lit(source))
    .withColumn("_status", lit("valid"))
)

# 3. Basic data-quality metrics
df_bronze.show(n=4, truncate=True)

end_time = time.time()

print(f"Execution time: {end_time - start_time:.2f} seconds")

# count empty lines
empty_records = df_bronze.filter(trim(col("_raw_data")) == "").count()
total_records = df_bronze.count()
valid_records = total_records - empty_records # either it works or everything fails
error_records = 0
success_rates = (valid_records / total_records) * 100

print("\nData Quality Metrics:")
print(f"total_records: {total_records}")
print(f"valid_records: {valid_records}")
print(f"empty_records: {empty_records}")
print(f"error_records: {error_records}")
print(f"success_rates: {success_rates}%")

+--------------------+--------------------+--------------------+-------+
|           _raw_data|_ingestion_timestamp|             _source|_status|
+--------------------+--------------------+--------------------+-------+
|A newly built lea...|2025-11-24 12:06:...|data/ukprop_unstr...|  valid|
|A newly built fre...|2025-11-24 12:06:...|data/ukprop_unstr...|  valid|
|This newly built ...|2025-11-24 12:06:...|data/ukprop_unstr...|  valid|
|An newly built fr...|2025-11-24 12:06:...|data/ukprop_unstr...|  valid|
+--------------------+--------------------+--------------------+-------+
only showing top 4 rows

Execution time: 0.08 seconds





Data Quality Metrics:
total_records: 5732838
valid_records: 5732838
empty_records: 0
error_records: 0
success_rates: 100.0%


                                                                                

### Save data to bronze.parquet

In [5]:
output_path = "data/ukprop_bronze.parquet"
df_bronze.coalesce(1).write.mode("overwrite").parquet(output_path)
print(f"Bronze layer saved to {output_path}")

[Stage 11:>                                                         (0 + 1) / 1]

Bronze layer saved to data/ukprop_bronze.parquet


                                                                                

## Conclusion
+ RDD/Map-Reduce is technically superior for Bronze considering its core feature being low level fault tolerance and _status marking.
+ Sure DFs is faster (0.10s compared to 4.5s), but less fault-tolerant for parsing errors. Hence RDD/Map-Reduce is better for Data Integrity, Resilience and auditability.
+ Hence RDD/Map-Reduce is the most effective implementation for the bronze layer in that it that guarantees that all raw data is successfully cataloged, even the bad records. 

# END OF BRONZE 

In [None]:
spark.stop() # stop spark notebook if necessary