## Data Ingestion & Optimization

**Objective:** Raw CSVs are slow and heavy. We need to load them using PySpark (because Pandas will crash with Memory Errors) and convert them to Parquet.

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, count, when, isnan, stddev
from pyspark.sql.types import DoubleType

In [2]:
# --- INDUSTRY TECHNIQUE: CONFIGURING SPARK ---
# We configure the driver memory to ensure we don't OOM (Out of Memory) locally.
# In a cluster (Databricks/AWS EMR), this is handled by the cluster manager.
spark = SparkSession.builder \
    .appName("Backblaze_Failure_Prediction") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

print("Spark Session Created")

Spark Session Created


In [3]:
# --- PATHS ---
# Update this to where you unzipped the data
RAW_DATA_PATH = "./data/raw/data_Q3_2025/*.csv" 
OUTPUT_PATH = "./data/parquet_Q3_2025"

In [None]:
# --- STEP 1: READ RAW DATA ---
# We use *.csv to tell Spark to read ALL files in that directory as one DataFrame.
# inferSchema=True is convenient but slow. In production, we define schemas manually.
print("Reading CSVs... this might take a minute.")

# samplingRatio=0.01 means "only read 1% of the data to guess the column types"
df = spark.read.csv(RAW_DATA_PATH, header=True, inferSchema=True, samplingRatio=0.01)

Reading CSVs... this might take a minute.


In [None]:
print("Reading CSVs (Lazy Mode)...")

# inferSchema=False means "Don't touch the data yet. Just read headers."
# This should finish in < 2 seconds.
df = spark.read.csv(RAW_DATA_PATH, header=True, inferSchema=False)

print("Initial Read Done.")

# SINCE WE DISABLED INFER_SCHEMA, EVERYTHING IS A STRING.
# WE MUST CAST COLUMNS MANUALLY BEFORE DOING MATH.
from pyspark.sql.types import DoubleType

# We need to update our selection logic to cast strings to numbers
# on the fly.
# ---------------------------------------------------------

Reading CSVs (Lazy Mode)...


In [None]:
# --- UPDATE STEPS 3 & 4 TO HANDLE "STRING" INPUTS ---

print("Filtering for Seagate (ST) drives...")
df_seagate = df.filter(col("model").contains("ST"))
df_seagate = df_seagate.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# Cache this. It tells Spark: "Load this filtered data into that 8GB RAM we promised."
# This triggers the actual read from disk.
df_seagate.cache() 

print(f"Counting Rows... (This triggers the actual read)")
total_rows = df_seagate.count()
print(f"Total Seagate Rows: {total_rows}")

# --- STAGE 1: NULL CHECK ---
print("\n--- STAGE 1: NULL CHECK ---")
smart_cols = [c for c in df_seagate.columns if c.startswith('smart_') and c.endswith('_raw')]

# We cast to Double (float) inside the check to handle the String types
select_expr = [count(when(col(c).cast(DoubleType()).isNull(), c)).alias(c) for c in smart_cols]
null_counts = df_seagate.select(select_expr).collect()[0]

valid_features = []
DROP_THRESHOLD = 0.10

for c in smart_cols:
    missing_count = null_counts[c]
    missing_pct = missing_count / total_rows
    if missing_pct < DROP_THRESHOLD:
        valid_features.append(c)

print(f"Features passed Null Check: {len(valid_features)}")

# --- STAGE 2: VARIANCE CHECK ---
print("\n--- STAGE 2: VARIANCE CHECK ---")

# Cast to DoubleType explicitly so StdDev works
std_expr = [stddev(col(c).cast(DoubleType())).alias(c) for c in valid_features]

std_devs = df_seagate.select(std_expr).collect()[0]

final_features = []
for c in valid_features:
    sd = std_devs[c]
    if sd is not None and sd > 0.0001:
        final_features.append(c)
    else:
        # print(f"[DROP] {c} (Zero Variance)")
        pass

print(f"Final Selected Features: {len(final_features)}")

# --- SAVE ---
# When saving, we want to cast these columns to numbers permanently
# so our next notebook doesn't have to deal with strings.
from pyspark.sql.functions import col

cols_to_save = []
# Add base cols
for c in ['date', 'serial_number', 'model', 'failure', 'capacity_bytes']:
    cols_to_save.append(col(c))

# Add feature cols (casted to Double)
for c in final_features:
    cols_to_save.append(col(c).cast(DoubleType()).alias(c))

print("Saving to Parquet...")
df_seagate.select(cols_to_save).write.mode("overwrite").parquet(OUTPUT_PATH)
print("Done.")

In [None]:
# --- STEP 2: INITIAL INSPECTION ---
print(f"Total Rows: {df.count()}")
print("Schema:")
df.printSchema()

In [None]:
# Let's check the Class Imbalance immediately
print("Distribution of Failures (0 = Healthy, 1 = Failed):")
df.groupBy("failure").count().show()

In [None]:
# --- STEP 3: DATA CLEANING (LIGHT) ---
# 1. Cast date column to actual date type
df = df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

In [None]:
# Filter specifically for Seagate drives (Industry Standard: Don't mix manufacturers in one model initially)
# Let's look at the most popular model in the dataset first to decide.
print("Top 5 Hard Drive Models:")
df.groupBy("model").count().orderBy(col("count").desc()).show(5)

1. Filter for our Manufacturer (Seagate).
2. Calculate the percentage of missing values for every column.
3. Drop columns that are >90% Empty.
4. Drop columns that have zero variance (the value is always "0" or always "100").

In [None]:
#Filter for Seagate (ST) drives immediately
# We do this because different manufacturers use different columns.
# Analyzing "Nulls" across mixed manufacturers is misleading.

# (Assuming 'ST4000DM000' or similar Seagate is top. Let's filter for just Seagate to reduce noise)
# Note: You might need to adjust the model string based on the print output above.

df_seagate = df.filter(col("model").contains("ST"))

print(f"Seagate Drives Count: {df_seagate.count()}")

In [None]:
#INDUSTRY TECHNIQUE: AUTOMATED NULL ANALYSIS
# We want to find which columns are actually populated.
print("Calculating Null percentages per column... (This takes time)")

# Get total row count for calculation
total_rows = df_seagate.count()

# Create a list to store valid columns
valid_features = []

# Loop through all columns (skip date, serial, model, failure for now)
skip_cols = ['date', 'serial_number', 'model', 'failure', 'capacity_bytes']
candidate_cols = [c for c in df_seagate.columns if c not in skip_cols]

In [None]:

# Note: In PySpark, doing a loop like this can be slow if not careful.
# We will do a single aggregation pass for efficiency.
aggregations = []
for c in candidate_cols:
    # Count how many Nulls or NaNs are in this column
    aggregations.append(
        count(when(isnan(c) | col(c).isNull(), c)).alias(c)
    )

# Run the query
null_counts = df_seagate.select(aggregations).collect()[0]

# Threshold: If more than 30% of data is missing, drop the column.
# In hardware, if a sensor works 99% of the time, it's useful. 
# If it only reports 70% of the time, it's garbage.
DROP_THRESHOLD = 0.3 


In [None]:
print("\n--- Feature Selection Report ---")
for c in candidate_cols:
    null_count = null_counts[c]
    null_pct = null_count / total_rows
    
    if null_pct < DROP_THRESHOLD:
        print(f"[KEEP] {c}: {null_pct:.2%} missing")
        valid_features.append(c)
    else:
        # We don't print these to keep output clean, but they are dropped.
        pass

print(f"\nSelected {len(valid_features)} features based on data density.")

In [None]:
# 4. FEATURE SELECTION STAGE 2: VARIANCE CHECK
print("\n--- STAGE 2: VARIANCE CHECK ---")
# If Standard Deviation is 0, the data never changes. It's useless.

# 1. Create the query to calculate StdDev for survivor columns only
# We cast to Double to avoid errors with integer-only columns
std_expr = [stddev(col(c).cast(DoubleType())).alias(c) for c in valid_features]


# 2. Run the calculation (this scans the data)
std_devs = df_seagate.select(std_expr).collect()[0]

# 3. Filter
final_features = []

for c in valid_features:
    sd = std_devs[c]
    
    # Logic: Keep if StdDev > 0.0001 (meaning the data changes)
    if sd is not None and sd > 0.0001:
        final_features.append(c)
    else:
        # Optional: Print what we drop
        print(f"[DROP] {c} (Zero Variance)")

print("-" * 30)
print(f"Original Candidate Features: {len(candidate_cols)}")
print(f"Features after Null Check:   {len(valid_features)}")
print(f"Final Selected Features:     {len(final_features)}")

In [None]:
# --- STEP 5: SAVE FINAL DATASET ---

# Construct final list of columns to keep
# We keep the identifiers + the features that passed both checks
final_cols = ['date', 'serial_number', 'model', 'failure', 'capacity_bytes'] + final_features

print(f"Selecting {len(final_cols)} columns...")
df_final = df_seagate.select(final_cols)

print("Writing cleaned dataset to Parquet...")
df_final.write.mode("overwrite").parquet(OUTPUT_PATH)
print("Done. ETL Pipeline Complete.")