In [0]:
ACCOUNT = "miningganaa01"   
SAS = "sp=racwl&st=2025-08-29T09:42:27Z&se=2025-09-20T17:57:27Z&spr=https&sv=2024-11-04&sr=c&sig=1wxn%2BAjzhAFX0i7cIGiJPmwCMy2zW5OY9o%2Fh8ES6MMk%3D"  

In [0]:
CONTAINER = "dls"
BRONZE_DIR = f"abfss://{CONTAINER}@{ACCOUNT}.dfs.core.windows.net/bronze/kaggle/quality_prediction"

In [0]:
spark.conf.set(f"fs.azure.sas.{CONTAINER}.{ACCOUNT}.dfs.core.windows.net", SAS)

In [0]:
ACCOUNT   = "miningganaa01"                     # your storage account
CONTAINER = "dls"                               # your container
SAS       = "?sp=racwl&st=2025-08-29T09:42:27Z&se=2025-09-20T17:57:27Z&spr=https&sv=2024-11-04&sr=c&sig=1wxn%2BAjzhAFX0i7cIGiJPmwCMy2zW5OY9o%2Fh8ES6MMk%3D" # paste EXACTLY (KEEP the leading "?")

In [0]:
# --- Tell the ABFS driver to use SAS for this account ---
spark.conf.set(f"fs.azure.account.auth.type.{ACCOUNT}.dfs.core.windows.net", "SAS")
spark.conf.set(
    f"fs.azure.sas.token.provider.type.{ACCOUNT}.dfs.core.windows.net",
    "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider"
)
spark.conf.set(f"fs.azure.sas.fixed.token.{ACCOUNT}.dfs.core.windows.net", SAS)

In [0]:
# --- Bronze path in ADLS (no SAS in the path when using FixedSASTokenProvider) ---
BRONZE_DIR = f"abfss://{CONTAINER}@{ACCOUNT}.dfs.core.windows.net/bronze/kaggle/quality_prediction"

In [0]:
# --- Read all CSVs from Bronze ---
df_raw = (spark.read
          .option("header", True)        # use first row as headers
          .option("inferSchema", True)   # auto-detect data types
          .csv(f"{BRONZE_DIR}/*"))

display(df_raw.limit(10))    # quick preview
df_raw.printSchema()         # inspect column names + types

In [0]:
# --- Save a Bronze Delta copy for stability and time-travel ---
spark.sql("CREATE DATABASE IF NOT EXISTS mining")
(df_raw.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("mining.bronze_flotation"))

print("Row count:", spark.table("mining.bronze_flotation").count())

In [0]:
# ============================================
# FIX: sanitize column names, then save BRONZE
# ============================================
from pyspark.sql import functions as F
import re

def normalize_col(colname: str) -> str:
    """
    Make column names Delta/SQL friendly:
    - lowercase
    - replace '%' with 'percent'
    - replace spaces/slashes with underscores
    - collapse multiple underscores
    - strip leading/trailing underscores
    """
    s = colname.strip().lower()
    s = s.replace("%", "percent")
    s = re.sub(r"[\/]", "_", s)          # slashes -> underscore
    s = re.sub(r"\s+", "_", s)           # spaces -> underscore
    s = re.sub(r"__+", "_", s)           # collapse repeats
    s = s.strip("_")
    return s

In [0]:
# Apply renames to a copy of df_raw
df_bronze = df_raw
rename_map = {c: normalize_col(c) for c in df_raw.columns}

for old, new in rename_map.items():
    if old != new:
        df_bronze = df_bronze.withColumnRenamed(old, new)

In [0]:
# standardize 'date' -> 'ts' (keeps it timestamp)
if "date" in df_bronze.columns:
    df_bronze = df_bronze.withColumnRenamed("date", "ts")

In [0]:
# Save as Bronze Delta (now with safe column names)
spark.sql("CREATE DATABASE IF NOT EXISTS mining")
(df_bronze.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("mining.bronze_flotation"))

print("Bronze saved. Row count:", spark.table("mining.bronze_flotation").count())
spark.table("mining.bronze_flotation").printSchema()

In [0]:
# ===========================================================
# NEXT STEP: Create the SILVER layer (clean & standardized data)
# ===========================================================
# Business context:
# The raw mining plant data (Bronze) is messy: all numbers are stored as text,
# sometimes with commas or stray symbols. Before engineers or analysts can
# trust the data, we need to clean and type-cast it.
#
# Why it matters:
# Clean, numeric data enables quality checks, dashboards, and advanced models
# (for example: tracking silica impurity trends that impact ore quality).
# ===========================================================

from pyspark.sql import functions as F, types as T
import re

# 1) Load Bronze
bronze = spark.table("mining.bronze_flotation")

# 2) Ensure timestamp column is named ts 
df = bronze
if "ts" not in df.columns and "date" in df.columns:
    df = df.withColumnRenamed("date", "ts")

In [0]:
# 3) Identify which columns should be numeric (everything except 'ts' timestamp).
numeric_cols = [c for c in df.columns if c != "ts"]

In [0]:
# 4) Clean each numeric column:
for c in numeric_cols:
    # a) Remove leading/trailing spaces
    df = df.withColumn(c, F.when(F.col(c).isNull(), None).otherwise(F.trim(F.col(c))))
    # b) Convert European-style decimals (e.g. "12,34") into standard "12.34"
    df = df.withColumn(c, F.regexp_replace(F.col(c), ",", "."))
    # c) Strip out any stray non-numeric symbols (e.g. '%')
    df = df.withColumn(c, F.regexp_replace(F.col(c), r"[^0-9\.\-]", ""))
    # d) Cast the cleaned string into a true DOUBLE data type
    df = df.withColumn(c, F.col(c).cast(T.DoubleType()))

In [0]:
# 5) Quick validation: how many rows, and how many nulls per column?
row_count = df.count()
nulls = (df.select([
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in numeric_cols
]).collect()[0].asDict())

print(f"[SILVER PREVIEW] Total rows = {row_count}")
print("[Sample of NULL counts by column]")
for k,v in list(nulls.items())[:5]:
    print(f"  {k}: {v}")

In [0]:
# 6) Save the clean result as the Silver Delta table.
# This Silver layer is now ready for downstream analytics and KPI generation.
spark.sql("CREATE DATABASE IF NOT EXISTS mining")
(df.write
   .format("delta")
   .mode("overwrite")              # overwrite for reproducibility
   .option("overwriteSchema", "true")
   .saveAsTable("mining.silver_flotation"))

print("✅ Silver table saved as mining.silver_flotation")
display(df.limit(10))  # show a preview of the clean data

In [0]:
# ===========================================================
# STEP 9: Create the GOLD layer (business KPIs)
# ===========================================================
# Business context:
# Engineers don’t want to stare at millions of rows of sensor data.
# They need summarized metrics that answer key questions:
#   - What was the average silica concentration each day?
#   - Did silica ever exceed thresholds that hurt ore quality?
#   - How stable was the process (min/max levels, records count)?
#
# Why it matters:
# These Gold metrics become the backbone for dashboards and
# decision-making — linking raw data to operational outcomes.
# ===========================================================

from pyspark.sql import functions as F

# 1) Load the Silver table (already cleaned & typed)
silver = spark.table("mining.silver_flotation")

In [0]:
# 2) Daily KPI aggregation
daily_kpis = (silver
              .withColumn("day", F.to_date("ts"))
              .groupBy("day")
              .agg(
                  F.avg("percent_silica_concentrate").alias("avg_silica"),
                  F.max("percent_silica_concentrate").alias("max_silica"),
                  F.min("percent_silica_concentrate").alias("min_silica"),
                  F.count("*").alias("records")
              )
              .orderBy("day"))


In [0]:
# 3) Save Gold layer as a Delta table
(daily_kpis.write
   .format("delta")
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable("mining.gold_daily_kpis"))

print("✅ Gold KPIs saved as mining.gold_daily_kpis")
display(daily_kpis.limit(10))

In [0]:
# ===========================================================
# STEP 10: Add a threshold-based KPI to Gold
# ===========================================================
# Business context:
# High silica content reduces iron ore quality and increases costs.
# Engineers need to know not just averages, but how often
# the process runs above critical thresholds (e.g., 2% silica).
#
# Why it matters:
# This metric acts like a "process stability score."
# It highlights days where intervention may be needed.
# ===========================================================

from pyspark.sql import functions as F

# Reload Silver (clean data)
silver = spark.table("mining.silver_flotation")

In [0]:
# Threshold value (domain knowledge: adjust if needed)
THRESHOLD = 2.0

# Compute daily KPI with % above threshold
daily_kpis_threshold = (silver
    .withColumn("day", F.to_date("ts"))
    .groupBy("day")
    .agg(
        F.avg("percent_silica_concentrate").alias("avg_silica"),
        F.max("percent_silica_concentrate").alias("max_silica"),
        F.min("percent_silica_concentrate").alias("min_silica"),
        (F.sum(F.when(F.col("percent_silica_concentrate") > THRESHOLD, 1).otherwise(0))
         / F.count("*") * 100).alias("pct_above_threshold"),
        F.count("*").alias("records")
    )
    .orderBy("day"))

In [0]:
# Save updated Gold table
(daily_kpis_threshold.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("mining.gold_daily_kpis"))

print("✅ Gold table updated with silica threshold KPI")
display(daily_kpis_threshold.limit(10))

In [0]:
# ===========================================================
# RE-INGEST RAW CSV WITH EXPLICIT QUOTING OPTIONS
# ===========================================================
# Why: The file uses comma separators, AND commas inside numbers,
# but fields are quoted. We tell Spark to respect quotes/escapes.

ACCOUNT   = "miningganaa01"    # your account
CONTAINER = "dls"
BRONZE_DIR = f"abfss://{CONTAINER}@{ACCOUNT}.dfs.core.windows.net/bronze/kaggle/quality_prediction"

df_raw = (spark.read
    .format("csv")
    .option("header", True)                 # first row has column names
    .option("sep", ",")                     # comma-separated
    .option("quote", '"')                   # fields are quoted with "
    .option("escape", '"')                  # quotes are escaped by doubling (""), so escape is also "
    .option("multiLine", False)             # lines are single-line
    .option("ignoreLeadingWhiteSpace", True)
    .option("ignoreTrailingWhiteSpace", True)
    .option("inferSchema", False)           # read as strings first; we’ll cast ourselves
    .load(f"{BRONZE_DIR}/*"))

display(df_raw.limit(3))
df_raw.printSchema()


In [0]:
# ===========================================================
# BRONZE: safe column names, keep raw values (date stays string here)
# ===========================================================
import re
def normalize_col(colname: str) -> str:
    s = colname.strip().lower()
    s = s.replace("%", "percent")
    s = re.sub(r"[\/]", "_", s)
    s = re.sub(r"\s+", "_", s)
    s = re.sub(r"__+", "_", s)
    return s.strip("_")

df_bronze = df_raw
for old, new in {c: normalize_col(c) for c in df_raw.columns}.items():
    if old != new:
        df_bronze = df_bronze.withColumnRenamed(old, new)

spark.sql("CREATE DATABASE IF NOT EXISTS mining")
(df_bronze.write
   .format("delta")
   .mode("overwrite")
   .option("overwriteSchema","true")
   .saveAsTable("mining.bronze_flotation"))

print("✅ Bronze rebuilt")
spark.table("mining.bronze_flotation").printSchema()


In [0]:
# ===========================================================
# SILVER: parse timestamp + cast numerics
# ===========================================================
from pyspark.sql import functions as F, types as T

bronze = spark.table("mining.bronze_flotation")

# a) Parse 'date' string -> proper timestamp 'ts'
df = bronze.withColumn("ts", F.to_timestamp(F.col("date"), "yyyy-MM-dd HH:mm:ss"))

# b) Decide which columns should be numeric (all except 'date' & 'ts')
numeric_cols = [c for c in df.columns if c not in ("date","ts")]

# c) Clean & cast numbers: handle comma decimals and stray symbols
for c in numeric_cols:
    df = df.withColumn(c, F.when(F.col(c).isNull(), None).otherwise(F.trim(F.col(c))))
    df = df.withColumn(c, F.regexp_replace(F.col(c), ",", "."))            # "55,2" -> "55.2"
    df = df.withColumn(c, F.regexp_replace(F.col(c), r"[^0-9\.\-]", ""))   # strip %, spaces, etc.
    df = df.withColumn(c, F.col(c).cast(T.DoubleType()))

# d) Quick sanity check: ts should NOT be null now
print("Null ts count:", df.filter(F.col("ts").isNull()).count())

# e) Save Silver
(df.select(["ts"] + numeric_cols)
   .write
   .format("delta")
   .mode("overwrite")
   .option("overwriteSchema","true")
   .saveAsTable("mining.silver_flotation"))

print("✅ Silver rebuilt with real timestamps")
display(df.select("ts", "percent_silica_concentrate").limit(5))

In [0]:
# ===========================================================
# GOLD: daily silica KPIs (now with real 'day')
# ===========================================================
from pyspark.sql import functions as F

silver = spark.table("mining.silver_flotation")

daily_kpis = (silver
    .withColumn("day", F.to_date("ts"))
    .groupBy("day")
    .agg(
        F.avg("percent_silica_concentrate").alias("avg_silica"),
        F.max("percent_silica_concentrate").alias("max_silica"),
        F.min("percent_silica_concentrate").alias("min_silica"),
        (F.sum(F.when(F.col("percent_silica_concentrate") > 2.0, 1).otherwise(0))
         / F.count("*") * 100).alias("pct_above_threshold"),
        F.count("*").alias("records")
    )
    .orderBy("day"))

(daily_kpis.write
   .format("delta")
   .mode("overwrite")
   .option("overwriteSchema","true")
   .saveAsTable("mining.gold_daily_kpis"))

print("✅ Gold rebuilt with daily grouping")
display(daily_kpis.limit(10))

In [0]:
# Key insights from this sample:
# - On 2017-03-10, average silica was only ~1.74% and
#   just 22% of readings were above the 2% threshold.
#   This indicates a stable, high-quality production day.
#
# - On 2017-03-14, average silica jumped to ~3.26% with
#   nearly 80% of readings above threshold, signaling a
#   potential operational issue that would require attention.
#
# - Daily record counts (~4,320 per day) confirm that the
#   sensors record every 20 seconds, consistent with real
#   industrial process historian data.
#
# Why this matters for the business:
# Instead of sifting through 700,000+ raw rows, engineers
# now see a clear, day-by-day picture of process stability
# and quality risks. This kind of pipeline directly supports
# data-driven decision making in mining operations.

In [0]:
# ===========================================================
# STEP 11: Visualization - Daily Silica Trends
# ===========================================================

import matplotlib.pyplot as plt

# 1) Load Gold daily KPIs
gold = spark.table("mining.gold_daily_kpis").toPandas()

# 2) Plot average silica per day
plt.figure(figsize=(12,6))
plt.plot(gold["day"], gold["avg_silica"], marker="o", label="Avg Silica (%)")

# 3) Add threshold line at 2%
plt.axhline(y=2.0, color="red", linestyle="--", label="Threshold (2%)")

# 4) Titles & labels for business readability
plt.title("Daily Silica Concentration Trends", fontsize=16)
plt.xlabel("Day", fontsize=12)
plt.ylabel("Silica Concentration (%)", fontsize=12)
plt.xticks(rotation=45)
plt.legend()

plt.show()


In [0]:
# This visualization highlights the volatility of silica levels
# in the flotation process. The red line marks the 2% threshold
# where ore quality begins to deteriorate.
#
# - We see periods of stability (e.g., March, late May) where
#   silica averages are consistently below 2%.
# - Other periods (April, August) show frequent threshold
#   breaches, with daily averages reaching as high as 4.5%.
# - About half of the days cross the threshold, indicating a
#   significant quality risk.

In [0]:
# ===========================================================
# Visualization - % of Readings Above Threshold
# ===========================================================


import matplotlib.pyplot as plt

# 1) Load Gold table as Pandas
gold = spark.table("mining.gold_daily_kpis").toPandas()

# 2) Plot bar chart
plt.figure(figsize=(14,6))
plt.bar(gold["day"], gold["pct_above_threshold"], color="orange")

# 3) Titles & labels
plt.title("% of Daily Readings Above 2% Silica", fontsize=16)
plt.xlabel("Day", fontsize=12)
plt.ylabel("Percent of Readings Above Threshold (%)", fontsize=12)
plt.xticks(rotation=45)

plt.axhline(y=50, color="red", linestyle="--", label="50% of day above risk level")
plt.legend()

plt.show()


In [0]:
# This bar chart reveals *how much of each day* was spent
# above the 2% silica threshold. While averages provide a
# general trend, this KPI exposes the severity of instability:
#
# - Many days exceed 50%, meaning most of the day’s
#   production was out of spec.
# - On several days, 100% of readings were above 2%,
#   signaling critical process issues.
# - Only a few days show low-risk operation (<20%).

In [0]:
# ===========================================================
#  Pipeline Monitoring & Data Quality Log
# ===========================================================
# What this cell does:
# - Captures run metadata (time, duration)
# - Records row counts for Bronze/Silver/Gold
# - Checks data quality (nulls, ranges, % out-of-spec)
# - Appends a single record per run to a Delta log table
#   => mining.monitoring_log
# ===========================================================

import time
from pyspark.sql import functions as F, types as T

start_ts = F.current_timestamp()

# 1) Load layers
bronze = spark.table("mining.bronze_flotation")
silver = spark.table("mining.silver_flotation")
gold   = spark.table("mining.gold_daily_kpis")

In [0]:
# --- Row counts (are we processing the expected volume?) ---
bronze_rows = bronze.count()
silver_rows = silver.count()
gold_rows   = gold.count()

In [0]:
# --- Time coverage (are timestamps complete, any gaps?) ---
ts_min, ts_max = (silver
    .agg(F.min("ts").alias("ts_min"), F.max("ts").alias("ts_max"))
    .first())

In [0]:
# --- Null checks on key business fields ---
critical_cols = ["percent_silica_concentrate", "percent_iron_concentrate", "ore_pulp_ph"]
null_counts = (silver.select([
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(f"null_{c}") for c in critical_cols
]).first().asDict())

In [0]:
# --- Simple business sanity checks ---
# Example: pH should always be between 0 and 14
ph_out_of_range = silver.filter((F.col("ore_pulp_ph") < 0) | (F.col("ore_pulp_ph") > 14)).count()

In [0]:
# Example: Flows/levels should not be negative
nonneg_cols = [
    "starch_flow","amina_flow","ore_pulp_flow",
    "flotation_column_01_air_flow","flotation_column_02_air_flow","flotation_column_03_air_flow",
    "flotation_column_04_air_flow","flotation_column_05_air_flow","flotation_column_06_air_flow",
    "flotation_column_07_air_flow",
    "flotation_column_01_level","flotation_column_02_level","flotation_column_03_level",
    "flotation_column_04_level","flotation_column_05_level","flotation_column_06_level",
    "flotation_column_07_level"
]
neg_violations = sum(silver.filter(F.col(c) < 0).count() for c in nonneg_cols if c in silver.columns)


In [0]:
# --- Optional: today's risk metric (if data has current dates) ---
THRESHOLD = 2.0
daily_today = (silver
    .withColumn("day", F.to_date("ts"))
    .filter(F.col("day") == F.to_date(F.current_timestamp()))
    .agg((F.sum(F.when(F.col("percent_silica_concentrate") > THRESHOLD, 1).otherwise(0)) / F.count("*") * 100)
         .alias("pct_above_threshold_today"))
    .first())
pct_above_threshold_today = None if daily_today is None else daily_today["pct_above_threshold_today"]

In [0]:
# --- Build one log row (note: no run_ts yet) ---
schema = T.StructType([
    T.StructField("bronze_rows", T.LongType(), False),
    T.StructField("silver_rows", T.LongType(), False),
    T.StructField("gold_rows",   T.LongType(), False),
    T.StructField("ts_min", T.TimestampType(), True),
    T.StructField("ts_max", T.TimestampType(), True),
    T.StructField("null_percent_silica_concentrate", T.LongType(), True),
    T.StructField("null_percent_iron_concentrate",   T.LongType(), True),
    T.StructField("null_ore_pulp_ph",                T.LongType(), True),
    T.StructField("ph_out_of_range", T.LongType(), True),
    T.StructField("neg_violations",  T.LongType(), True),
    T.StructField("silica_threshold", T.DoubleType(), True),
    T.StructField("pct_above_threshold_today", T.DoubleType(), True),
])

row = [(
    bronze_rows, silver_rows, gold_rows,
    ts_min, ts_max,
    int(null_counts.get("null_percent_silica_concentrate", 0)),
    int(null_counts.get("null_percent_iron_concentrate", 0)),
    int(null_counts.get("null_ore_pulp_ph", 0)),
    int(ph_out_of_range),
    int(neg_violations),
    float(THRESHOLD),
    None if pct_above_threshold_today is None else float(pct_above_threshold_today),
)]

log_df = spark.createDataFrame(row, schema=schema)

In [0]:
# Add run_ts dynamically (so it's always filled)
log_df = log_df.withColumn("run_ts", F.current_timestamp())

In [0]:
# --- Save monitoring log (merge schema for future additions) ---
writer = (log_df.write.format("delta").mode("append"))
if spark.catalog.tableExists("mining.monitoring_log"):
    writer = writer.option("mergeSchema", "true")

writer.saveAsTable("mining.monitoring_log")

print("✅ Monitoring snapshot saved")
display(spark.table("mining.monitoring_log").orderBy(F.col("run_ts").desc()).limit(5))

In [0]:
# -----------------------------------------------------------
# Reset the monitoring table once to remove schema conflicts
# -----------------------------------------------------------
spark.sql("DROP TABLE IF EXISTS mining.monitoring_log")

# Recreate it with the clean schema from our current log_df
from pyspark.sql import functions as F, types as T

# (Rebuild the log_df exactly as in the last cell up to `log_df = spark.createDataFrame(...)`)
# ... then add run_ts and write fresh:

log_df = log_df.withColumn("run_ts", F.current_timestamp())

(log_df.write
    .format("delta")
    .mode("overwrite")        # create fresh
    .saveAsTable("mining.monitoring_log"))

print("✅ monitoring_log recreated with a clean schema")
display(spark.table("mining.monitoring_log").orderBy(F.col("run_ts").desc()).limit(5))
