In [0]:
%python

#uploading the test dataset
df_test = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("/Volumes/workspace/default/scania_aps_failure_data/aps_failure_test_set.csv")
)

df_test.createOrReplaceTempView("aps_test")




In [0]:
%sql
-- checking if the test data is loaded correctly

select * from aps_test

In [0]:
%python

#uploading the training dataset

df_training = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("/Volumes/workspace/default/scania_aps_failure_data/aps_failure_training_set.csv")
)

df_training.createOrReplaceTempView("aps_training")


In [0]:
%sql

-- checking if the training data is loaded correctly

select * from aps_training 

In [0]:
%sql

-- checking the total_rows of both the sets together (Report)
select 
  'Test_set' as Set_type,
  count(*) as total_rows 
from aps_test

union all 

select 
  'Training_set',
  count(*) 
from aps_training;

In [0]:
%sql

-- combining both the test and training dataset to create large dataset (76K rows)

create or replace table aps_all as 
select * from aps_test
union all 
select * from aps_training; 

In [0]:
%python

# creating the same full dataset in python. the SQL 'aps_all' is 'df' in python.
df = spark.table("aps_all")

In [0]:
%sql

-- seeing the full dataset (76K rows)

select * from aps_all

In [0]:
%sql

-- getting the count of the rows of the full dataset 

select 
  count(*) as total_rows 
from aps_all;

In [0]:
%sql

-- getting the count of the columns 

SELECT COUNT(*) AS column_count
FROM information_schema.columns
WHERE table_name = 'aps_all';


In [0]:
%sql
-- getting the entire metadata of the columns

select *
from information_schema.columns
where table_name = 'aps_all';

In [0]:
%sql

-- getting the datatype of the columns 

DESCRIBE aps_all;


In [0]:

# found that all the column datatypes are strings. so we cannot do any calculations on this. hence decided to change all the datatypes to int. 
# first replacing all NA in all the columns as proper nulls 

# running this to see how many "bad" tokens  meaning 'na' exist per column
from pyspark.sql.functions import col, trim, lower, when, sum as _sum

bad_tokens = ['na','n/a','null','none','nan','']   # tokens we'll treat as missing
total_rows = spark.table("aps_all").count()       # or df.count() if you already have df

# count occurrences of these tokens per column
na_counts = df.select([
    _sum(when(lower(trim(col(c))).isin(bad_tokens), 1).otherwise(0)).alias(c)
    for c in df.columns
])

display(na_counts)   # shows a single row with counts per column


In [0]:

# in previous step we found the number of 'na' in each column. turns out each of the columns have a lot of 'na'. so now we'll be replacing the 'na' with nulls so that we can change the datatype of the columns to int. 

from pyspark.sql.functions import trim, lower, when, col

bad_tokens = ['na','n/a','null','none','nan','']   # all lowercase here

df_clean = df.select([
    when(lower(trim(col(c))).isin(bad_tokens), None)
        .otherwise(trim(col(c))).alias(c)
    for c in df.columns
])

# sanity check
df_clean.select("aa_000", "ab_000", "ac_000").show(20)

#df_clean is the new dataset name


In [0]:
#checking the rows to see if there are 'na'

from pyspark.sql.functions import sum as _sum

check_na = df_clean.select([
    _sum(when(col(c) == "na", 1).otherwise(0)).alias(c)
    for c in df_clean.columns
])

display(check_na)

# found that there are no 'na' as all columns returned 0

In [0]:
#checking the null count to see the count of nulls in each column 

nulls = df_clean.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_clean.columns
])
display(nulls)

# showing same as original number of 'na'. so we have successfully replaced 'na' with nulls

In [0]:
# create a new table for the SQL with the cleaned DataFrame

df_clean.write.mode("overwrite").saveAsTable("aps_all_clean")


In [0]:
%sql
-- checking the dataset to see if the 'na' are replaced by nulls so that we can change the datatype of the dataset columns
select * 
from aps_all_clean;

-- succesfully converted all 'na' to nulls.

In [0]:
# converting strings to integers. first we are going to cast all the columns to float and then round them to int.

# converting strings to integers safely using long (64-bit) to avoid overflow

from pyspark.sql.functions import col, round

# get all columns except 'class'
numeric_cols = df_clean.columns[1:]

# convert to float, round, then cast to long; keep 'class' as is
df_clean_long = df_clean.select(
    col('class'),
    *[round(col(c).cast('float')).cast('long').alias(c) for c in numeric_cols]
)

# save as a new table for step-wise tracking
df_clean_long.write.mode("overwrite").saveAsTable("aps_all_clean_long")

# df_clean_long is the new table_name
# in SQL it is aps_all_clean_long



In [0]:
%sql

-- checking if the new converted dataset is working as expected.
select * 
from aps_all_clean_long

In [0]:
%sql 
-- checking the type conversion. 
describe aps_all_clean_long;

-- successfully changed the type from string to bigint. 


In [0]:
# we are checking the count of nulls for all columns

from pyspark.sql.functions import col, sum as _sum, when

# check null counts for all columns
null_counts = df_clean_long.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_clean_long.columns
])

display(null_counts)


In [0]:
# finally converting the class column to numeric (pos=1, neg=0)

from pyspark.sql.functions import when, col

# convert class column to numeric (pos=1, neg=0)
df_clean_final = df_clean_long.withColumn(
    "class",
    when(col("class") == "pos", 1)
    .when(col("class") == "neg", 0)
    .otherwise(None)  # safeguard if anything unexpected
)

# save as a final clean SQL table
df_clean_final.write.mode("overwrite").saveAsTable("aps_clean_final")


In [0]:
%sql

-- checking the conversion from 'pos' and 'neg' to 1 and 0
select * from aps_clean_final;

### ✅ Data Cleaning Summary

We have completed the data cleaning phase for the APS Failure Dataset. The following steps were performed:

1. **Handled missing values**  
   - Replaced `"na"` strings with proper `null` values.

2. **Type conversion**  
   - Converted all sensor columns (except `class`) from string → float → bigint for consistency.  
   - Stored the result as `df_clean_long`.

3. **Target variable cleanup**  
   - Converted `class` column:  
     - `pos` → `1` (failure)  
     - `neg` → `0` (normal)  
   - Stored the result as `df_clean_final`.

4. **Table versions created for traceability (Python)**  
   - `df` → original combined dataset (train + test)  
   - `df_clean` → missing-token cleaned (na → NULL)  
   - `df_clean_long` → numeric conversion (float → rounded bigint)  
   - `df_clean_final` → final cleaned dataset with binary `class`

5. **Table versions created for traceability (SQL)**  
   - `aps_all` → combined training & test dataset.  
   - `aps_all_clean` → NA → null cleaning.  
   - `aps_all_clean_long` → numeric conversion.  
   - `aps_clean_final` → final cleaned dataset with binary target.

---
👉 The dataset is now fully cleaned and ready for **Exploratory Data Analysis (EDA)**.


In [0]:
# EDA - Exploratory Data Analysis

# display the table
display(df_clean_final)

### questions for exploratory data analysis at a global level

1. What is the total number of rows and columns in df_clean_final?
(Shape of the dataset → overall size check)

2. How many positive (class=1) vs. negative (class=0) samples are there?
(Target variable distribution → class imbalance check)

3. How many missing/null values are there in each column, and which columns have the highest % of missing data?
(Data completeness overview)

4. Which 5 features (columns) have the highest variance, and which 5 have the lowest variance?
(Identify informative vs. flat/noisy features)

5. Which features are highly correlated with each other (correlation > 0.8 or < -0.8)?
(Detect redundancy / multicollinearity across the dataset)

In [0]:
# 1. What is the total number of rows and columns in df_clean_final? 

from pyspark.sql import functions as F

# Row count
row_count = df_clean_final.count()

# Column count
col_count = len(df_clean_final.columns)

# Create a small summary DataFrame so it's displayed nicely
summary_df = spark.createDataFrame(
    [(row_count, col_count)],
    ["Total Rows", "Total Columns"]
)

display(summary_df)


In [0]:
# 2. How many positive (class=1) vs. negative (class=0) samples are there?

from pyspark.sql import functions as F

# Count of positive (1) vs negative (0) in class column
class_distribution = (
    df_clean_final.groupBy("class")
    .agg(F.count("*").alias("count"))
    .orderBy("class")
)

# Add percentage column rounded to 2 decimals
total_rows = df_clean_final.count()
class_distribution = class_distribution.withColumn(
    "percentage", F.round(F.col("count") / total_rows * 100, 2)
)

display(class_distribution)


In [0]:
# 3. How many missing/null values are there in each column, and which columns have the highest % of missing data? 

from pyspark.sql.functions import col, sum as _sum, count

# Total rows for percentage calculation
total_rows = df_clean_final.count()

# Compute null counts for each column
null_counts = df_clean_final.select([
    _sum(col(c).isNull().cast("int")).alias(c) for c in df_clean_final.columns
]).toPandas().T  # Convert to Pandas to manipulate easily

null_counts.columns = ["null_count"]
null_counts["null_percentage"] = (null_counts["null_count"] / total_rows * 100).round(2)

# Sort by highest % missing
null_counts_sorted = null_counts.sort_values("null_percentage", ascending=False)
null_counts_sorted


In [0]:
# 4. Which 5 features (columns) have the highest variance, and which 5 have the lowest variance?

from pyspark.sql.functions import col, variance as _variance

# Calculate variance for each numeric column (excluding 'class')
numeric_cols = df_clean_final.columns[1:]  # all except 'class'

variance_df = df_clean_final.select([
    _variance(col(c)).alias(c) for c in numeric_cols
]).toPandas().T  # transpose for easier handling

variance_df.columns = ["variance"]
variance_df_sorted = variance_df.sort_values("variance", ascending=False)

# Top 5 highest variance
top_5 = variance_df_sorted.head(5)

# Top 5 lowest variance
bottom_5 = variance_df_sorted.tail(5)

top_5, bottom_5


In [0]:
# 5. Which features are highly correlated with each other (correlation > 0.8 or < -0.8)?

import pandas as pd
import numpy as np

# Convert to Pandas for correlation analysis (only numeric features, excluding target)
df_pd = df_clean_final.drop("class").toPandas()

# Compute correlation matrix
corr_matrix = df_pd.corr()

# Extract upper triangle of correlation matrix (to avoid duplicates)
mask = np.triu(np.ones_like(corr_matrix, dtype=bool), k=1)

# Find feature pairs with correlations
corr_pairs = corr_matrix.where(mask).stack().reset_index()
corr_pairs.columns = ["Feature1", "Feature2", "Correlation"]

# Add correlation flag
corr_pairs["High_Correlation"] = corr_pairs["Correlation"].apply(
    lambda x: "Highly Correlated" if abs(x) > 0.8 else "Not Correlated"
)

# Filter only strong correlations
high_corr_pairs = corr_pairs[corr_pairs["High_Correlation"] == "Highly Correlated"] \
    .sort_values(by="Correlation", ascending=False)

display(high_corr_pairs)




### questions for exploratory data analysis at each column level 

1. For each numeric column, what are the mean, median, and mode values?
(Central tendency check)

2. For each numeric column, what are the standard deviation, variance, min, max, and range (max−min)?
(Data spread and scale check)

3. For each numeric column, what is the skewness and kurtosis?
(Distribution shape: symmetry & tails)

4. For each numeric column, what is the percentage of missing/null values?
(Column-level completeness check)

5. For each numeric column, are there extreme outliers (e.g., values beyond 1.5×IQR or z-score > 3)?
(Outlier detection)

6. How does the distribution (histogram) of each sensor column look? Is it normal, uniform, or skewed?
(Pattern detection per sensor)

7. How does each sensor column differ between class=1 vs. class=0 (group-wise means/medians)?
(Feature discrimination check)

In [0]:
# 1. For each numeric column, what are the mean, median, and mode values? (Central tendency check)


from pyspark.sql.functions import col, mean, expr, count, when, first
from pyspark.sql import functions as F

# Get all numeric columns except the target
numeric_cols = df_clean_final.columns[1:]  # skipping 'class'

# Initialize an empty list to store results
results = []

for c in numeric_cols:
    # Compute mean
    col_mean = df_clean_final.select(mean(col(c)).alias("mean")).collect()[0]["mean"]
    
    # Compute median using approxQuantile
    col_median = df_clean_final.approxQuantile(c, [0.5], 0.01)[0]
    
    # Compute mode: find value with highest frequency
    mode_df = df_clean_final.groupBy(c).count().orderBy(F.desc("count"))
    col_mode = mode_df.first()[c]
    
    # Append result
    results.append((c, col_mean, col_median, col_mode))

# Convert to PySpark DataFrame for display
eda_central_tendency = spark.createDataFrame(results, ["Column", "Mean", "Median", "Mode"])

# Display results
display(eda_central_tendency)


In [0]:
# 2. For each numeric column, what are the standard deviation, variance, min, max, and range (max−min)? (Data spread and scale check)

from pyspark.sql.functions import stddev, variance, min, max

# List of numeric columns except target
numeric_cols = df_clean_final.columns[1:]

# Initialize list to store results
spread_results = []

for c in numeric_cols:
    stats = df_clean_final.select(
        stddev(col(c)).alias("stddev"),
        variance(col(c)).alias("variance"),
        min(col(c)).alias("min"),
        max(col(c)).alias("max")
    ).collect()[0]
    
    col_stddev = stats["stddev"]
    col_variance = stats["variance"]
    col_min = stats["min"]
    col_max = stats["max"]
    col_range = col_max - col_min
    
    spread_results.append((c, col_stddev, col_variance, col_min, col_max, col_range))

# Convert to PySpark DataFrame
eda_spread = spark.createDataFrame(
    spread_results, ["Column", "StdDev", "Variance", "Min", "Max", "Range"]
)

# Display results
display(eda_spread)


In [0]:
# 3. For each numeric column, what are the skewness and kurtosis values? (Data shape check)

from pyspark.sql.functions import skewness, kurtosis, when, col

numeric_cols = df_clean_final.columns[1:]

shape_results = []

for c in numeric_cols:
    stats = df_clean_final.select(
        skewness(col(c)).alias("skewness"),
        kurtosis(col(c)).alias("kurtosis")
    ).collect()[0]
    
    col_skewness = stats["skewness"]
    col_kurtosis = stats["kurtosis"]
    
    # Interpret skewness
    if col_skewness is None:
        skew_desc = "No data"
    elif col_skewness > 0.5:
        skew_desc = "Positively skewed"
    elif col_skewness < -0.5:
        skew_desc = "Negatively skewed"
    else:
        skew_desc = "Approximately symmetric"
    
    # Interpret kurtosis
    if col_kurtosis is None:
        kurt_desc = "No data"
    elif col_kurtosis > 3:
        kurt_desc = "Heavy tails / Leptokurtic"
    elif col_kurtosis < 3:
        kurt_desc = "Light tails / Platykurtic"
    else:
        kurt_desc = "Normal tails / Mesokurtic"
    
    shape_results.append((c, col_skewness, skew_desc, col_kurtosis, kurt_desc))

# Convert to PySpark DataFrame
eda_shape = spark.createDataFrame(
    shape_results, ["Column", "Skewness", "Skewness_Interpretation", "Kurtosis", "Kurtosis_Interpretation"]
)

display(eda_shape)



In [0]:
# 4. For each numeric column, what is the percentage of missing/null values? (Column-level completeness check)

from pyspark.sql.functions import col, sum as _sum

# Total rows for percentage calculation
total_rows = df_clean_final.count()

# Compute null counts for each column
null_counts = df_clean_final.select([
    _sum(col(c).isNull().cast("int")).alias(c) for c in df_clean_final.columns[1:]  # Exclude 'class'
]).toPandas().T  # Convert to Pandas for easier manipulation

null_counts.columns = ["null_count"]
null_counts["null_percentage"] = (null_counts["null_count"] / total_rows * 100).round(2)

# Sort by highest % missing
null_counts_sorted = null_counts.sort_values("null_percentage", ascending=False)

display(null_counts_sorted)


In [0]:
# 5. For each numeric column, are there extreme outliers (e.g., values beyond 1.5×IQR or z-score > 3)? (Outlier detection)

from pyspark.sql.functions import col
import pandas as pd

numeric_cols = df_clean_final.columns[1:]  # exclude 'class' if it's first

outlier_summary = []
total_rows = df_clean_final.count()

for c in numeric_cols:
    try:
        # Compute quartiles
        q1, q3 = df_clean_final.approxQuantile(c, [0.25, 0.75], 0.01)
        if q1 is None or q3 is None:
            outlier_summary.append((c, None, None, "No quartiles"))
            continue

        iqr = q3 - q1
        if iqr == 0:
            outlier_summary.append((c, 0, 0.0, "Constant column"))
            continue

        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr

        # Count outliers
        count_outliers = df_clean_final.filter(
            (col(c) < lower_bound) | (col(c) > upper_bound)
        ).count()

        percent_outliers = round(count_outliers / total_rows * 100, 2)

        outlier_summary.append((c, count_outliers, percent_outliers, "OK"))

    except Exception as e:
        outlier_summary.append((c, None, None, f"Error: {str(e)}"))

# Convert to Pandas
outlier_df = pd.DataFrame(
    outlier_summary,
    columns=["Feature", "Outlier_Count", "Outlier_Percentage", "Status"]
).sort_values("Outlier_Percentage", ascending=False)

display(outlier_df)




### Outlier Detection – Observation  

We applied **two methods** for outlier detection across all numeric columns:  

- **IQR method** (1.5 × IQR rule)  
- **Z-score method** (> 3 standard deviations)  

Both approaches consistently showed either **0% detected outliers** or produced constant values across many features. This suggests that:  

- The sensor readings are very stable within a narrow operating range.  
- There is low variability across features, making extreme deviations unlikely.  
- The dataset appears clean and consistent, with no significant anomalies or faulty sensor values.  

**Conclusion:** The APS dataset does not exhibit meaningful outliers at the column level, which aligns with expectations for structured sensor data.  


In [0]:
# 6. How does the distribution (histogram) of each sensor column look? Is it normal, uniform, or skewed? (Pattern detection per sensor)

from pyspark.sql.functions import skewness
import pandas as pd

numeric_cols = df_clean_final.columns[1:]  # exclude 'class'

# Compute skewness for each column
skew_data = []

for col_name in numeric_cols:
    sk_val = df_clean_final.select(skewness(col(col_name))).first()[0]
    
    # Determine distribution type with direction
    if sk_val is None:
        dist_type = "Unknown"
    elif sk_val > 1:
        dist_type = "Highly Right-Skewed"
    elif 0.5 < sk_val <= 1:
        dist_type = "Moderately Right-Skewed"
    elif -0.5 <= sk_val <= 0.5:
        dist_type = "Approximately Symmetric"
    elif -1 <= sk_val < -0.5:
        dist_type = "Moderately Left-Skewed"
    else:  # sk_val < -1
        dist_type = "Highly Left-Skewed"
    
    skew_data.append((col_name, sk_val, dist_type))

# Convert to Pandas for display
dist_df = pd.DataFrame(skew_data, columns=["Feature", "Skewness", "Distribution_Type"])

display(dist_df)


In [0]:
# 7. How does each sensor column differ between class=1 vs. class=0 (group-wise means/medians)? (Feature discrimination check)

from pyspark.sql.functions import col, mean
import pandas as pd

numeric_cols = df_clean_final.columns[1:]  # exclude 'class'
classes = [0, 1]  # class values

results = []

# Loop through classes
for cls in classes:
    df_class = df_clean_final.filter(col("class") == cls)
    total_rows = df_class.count()
    
    for c in numeric_cols:
        # Mean
        mean_val = df_class.select(mean(col(c))).first()[0]
        
        # Median using approxQuantile
        median_val = df_class.approxQuantile(c, [0.5], 0.01)[0]  # 1% error tolerance
        
        results.append((c, cls, mean_val, median_val))

# Convert to Pandas for easier display
eda7_df = pd.DataFrame(results, columns=["Feature", "Class", "Mean", "Median"])

# Optional: sort by Feature and Class
eda7_df = eda7_df.sort_values(["Feature", "Class"])

display(eda7_df)



## Business Problems 

### 1. Part-to-Whole Analysis – Component Contribution

**Question:** Which sensors or systems contribute the most to overall abnormal readings or failure indicators?

**Business Value:** Helps prioritize maintenance or redesign for high-impact components. Fleet managers can focus on the sensors/parts that “matter most” for vehicle reliability.

In [0]:
from pyspark.sql.functions import col, mean, stddev
import pandas as pd

numeric_cols = df_clean_final.columns[1:]  # exclude 'class'

# Compute mean and stddev for each sensor
summary_stats = df_clean_final.select([
    mean(col(c)).alias(c + "_mean") for c in numeric_cols
] + [
    stddev(col(c)).alias(c + "_std") for c in numeric_cols
]).collect()[0]

# Identify abnormal readings (above mean + 2*stddev)
abnormal_counts = []
total_abnormal = 0

for c in numeric_cols:
    mean_val = summary_stats[c + "_mean"]
    std_val = summary_stats[c + "_std"]
    
    if std_val is not None:
        upper_threshold = mean_val + 2 * std_val
        count = df_clean_final.filter(col(c) > upper_threshold).count()
    else:
        count = 0
    
    abnormal_counts.append((c, count))
    total_abnormal += count

# Calculate contribution % without rounding and add a flag
abnormal_summary = []
for sensor, count in abnormal_counts:
    if total_abnormal > 0:
        percent = (count / total_abnormal) * 100
    else:
        percent = 0.0
    flag = "Yes" if count > 0 else "No"
    abnormal_summary.append((sensor, count, percent, flag))

# Convert to Pandas for display
abnormal_df = pd.DataFrame(abnormal_summary, columns=["Sensor", "Abnormal_Count", "Contribution_Percentage", "Has_Abnormal"])
abnormal_df = abnormal_df.sort_values("Contribution_Percentage", ascending=False)

display(abnormal_df)




### 2. Change Over Time / Trend Analysis

**Question:** How do key sensor readings evolve over time for the fleet? Are there trends indicating degradation, wear, or unusual operational patterns?

**Business Value:** Identifies early signs of system wear, abnormal usage patterns, or performance drift across vehicles. Enables proactive maintenance and reduces unexpected breakdowns.

In [0]:
import pyspark.sql.functions as F
import pandas as pd

# Step 1: Add a row index column
df_indexed = df_clean_final.withColumn("row_idx", F.monotonically_increasing_id())

# Step 2: Get total rows and quartile size
total_rows = df_indexed.count()
quartile_size = total_rows // 4

trend_summary = []

# Step 3: Loop through numeric columns
for c in numeric_cols:
    quartile_means = []
    for q in range(4):
        start_idx = q * quartile_size
        end_idx = (q + 1) * quartile_size if q < 3 else total_rows
        
        mean_val = (
            df_indexed.filter(
                (F.col("row_idx") >= start_idx) & (F.col("row_idx") < end_idx)
            )
            .select(F.mean(F.col(c)))
            .first()[0]
        )
        quartile_means.append(mean_val)
    
    # --- Trend flag ---
    if all(m == quartile_means[0] for m in quartile_means):
        trend = "stable"
    elif quartile_means == sorted(quartile_means):
        trend = "increasing"
    elif quartile_means == sorted(quartile_means, reverse=True):
        trend = "decreasing"
    else:
        trend = "fluctuating"
    
    trend_summary.append((c, *quartile_means, trend))

# Step 4: Convert to Pandas DataFrame for display
trend_df = pd.DataFrame(
    trend_summary,
    columns=["Sensor", "Q1_mean", "Q2_mean", "Q3_mean", "Q4_mean", "Trend_Flag"]
)

display(trend_df)



### Reporting / Aggregated Summaries

**Question:** How can we summarize sensor behavior across vehicles or time periods to quickly identify anomalies or hotspots?

**Business Value:** Creates a dashboard-like overview for management or maintenance teams. Highlights operational risk areas and supports quick, informed decision-making.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, min as _min, max as _max, mean as _mean, stddev as _std, expr
import pandas as pd

# Numeric columns (exclude 'class')
numeric_cols = df_clean_final.columns[1:]

# --------------------------
# Step 1: Compute Summary Stats
# --------------------------
summary_list = []
for c in numeric_cols:
    stats = df_clean_final.select(
        _min(col(c)).alias("Min"),
        _max(col(c)).alias("Max"),
        _mean(col(c)).alias("Mean"),
        _std(col(c)).alias("StdDev"),
        expr(f'percentile_approx({c}, 0.5)').alias("Median")
    ).collect()[0]
    summary_list.append((c, stats["Min"], stats["Max"], stats["Mean"], stats["Median"], stats["StdDev"]))

summary_df = pd.DataFrame(summary_list, columns=["Sensor", "Min", "Max", "Mean", "Median", "StdDev"])

# --------------------------
# Step 2: Abnormal readings (Mean + 2*StdDev)
# --------------------------
abnormal_list = []
total_abnormal_all = 0

for c in numeric_cols:
    mean_val = summary_df.loc[summary_df["Sensor"]==c, "Mean"].values[0]
    std_val = summary_df.loc[summary_df["Sensor"]==c, "StdDev"].values[0]
    if std_val is not None:
        upper_thresh = mean_val + 2*std_val
        count = df_clean_final.filter(col(c) > upper_thresh).count()
    else:
        count = 0
    abnormal_list.append((c, count))
    total_abnormal_all += count

abnormal_df = pd.DataFrame(abnormal_list, columns=["Sensor", "Abnormal_Count"])
abnormal_df["Contribution_Percentage"] = abnormal_df["Abnormal_Count"].apply(
    lambda x: (x / total_abnormal_all * 100) if total_abnormal_all > 0 else 0
)

# --------------------------
# Step 3: Trend Flags (Quartile Means)
# --------------------------
df_indexed = df_clean_final.withColumn("row_idx", F.monotonically_increasing_id())
total_rows = df_indexed.count()
quartile_size = total_rows // 4

trend_list = []
for c in numeric_cols:
    quart_means = []
    for q in range(4):
        start_idx = q*quartile_size
        end_idx = (q+1)*quartile_size if q<3 else total_rows
        mean_val = df_indexed.filter(
            (col("row_idx")>=start_idx) & (col("row_idx")<end_idx)
        ).select(F.mean(col(c))).first()[0]
        quart_means.append(mean_val)
    
    # Trend flag
    if quart_means == sorted(quart_means):
        trend = "Increasing"
    elif quart_means == sorted(quart_means, reverse=True):
        trend = "Decreasing"
    else:
        trend = "Fluctuating"
    trend_list.append((c, trend))

trend_df = pd.DataFrame(trend_list, columns=["Sensor", "Trend_Flag"])

# --------------------------
# Step 4: Variability & Anomaly Flags
# --------------------------
std_mean = summary_df["StdDev"].mean()
summary_df["Variability_Flag"] = summary_df["StdDev"].apply(
    lambda x: "High Variability" if x > 2*std_mean else "Normal"
)
abnormal_df["Anomaly_Flag"] = abnormal_df["Contribution_Percentage"].apply(
    lambda x: "High Anomalies" if x > 5 else "Normal"
)

# --------------------------
# Step 5: Merge All into Final Report
# --------------------------
final_report = summary_df.merge(abnormal_df, on="Sensor").merge(trend_df, on="Sensor")

# Display the polished report
display(final_report)


In [0]:
display(df_clean_final)
