# 03 - Z-Score Anomaly Detection

## Method 1 of 4: Statistical Approach

### What is Z-Score?

Z-Score measures **how far a value is from the average**, in terms of standard deviations.

**Formula:**
```
Z-Score = (value - mean) / standard_deviation
```

### The Rule

- 99.7% of data has Z-Score between -3 and +3
- If |Z-Score| > 3 → Point is in the 0.3% most extreme → **ANOMALY**

### Pros & Cons

| Pros | Cons |
|------|------|
| Simple & fast | Assumes normal distribution |
| Easy to interpret | Looks at features independently |
| No training needed | Misses multivariate anomalies |

---
## Step 1: Setup
---

In [None]:
# ═══════════════════════════════════════════════════════════════════
# IMPORTS
# ═══════════════════════════════════════════════════════════════════

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, mean, stddev, when, lit, sqrt,
    abs as spark_abs,
    monotonically_increasing_id
)
import os

print("✓ Libraries imported")

In [None]:
# ═══════════════════════════════════════════════════════════════════
# CREATE SPARK SESSION
# ═══════════════════════════════════════════════════════════════════

spark = SparkSession.builder \
    .appName("ZScore_AnomalyDetection") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print("✓ Spark Session created")
print(f"  Version: {spark.version}")

In [None]:
# ═══════════════════════════════════════════════════════════════════
# CONFIGURATION
# ═══════════════════════════════════════════════════════════════════

# File paths
DATA_PATH = "../data/processed/BTCUSDT_1h_processed.csv"
OUTPUT_PATH = "../data/results/"

# Feature columns (the 6 we engineered)
FEATURE_COLUMNS = [
    "return",
    "log_return",
    "volatility_24h",
    "volume_change",
    "volume_ratio",
    "price_range"
]

# Z-Score threshold (|z| > 3 is anomaly)
ZSCORE_THRESHOLD = 3.0

print("✓ Configuration set")
print(f"  Threshold: |Z-Score| > {ZSCORE_THRESHOLD}")

---
## Step 2: Load Data
---

In [None]:
# ═══════════════════════════════════════════════════════════════════
# LOAD DATA
# ═══════════════════════════════════════════════════════════════════

df = spark.read.csv(DATA_PATH, header=True, inferSchema=True)

print("✓ Data loaded")
print(f"  Rows: {df.count()}")
print(f"  Columns: {len(df.columns)}")

In [None]:
# Quick look at the data
df.select(["timestamp"] + FEATURE_COLUMNS).show(5)

---
## Step 3: Calculate Statistics
---

In [None]:
# ═══════════════════════════════════════════════════════════════════
# CALCULATE MEAN AND STDDEV FOR EACH FEATURE
# ═══════════════════════════════════════════════════════════════════

# Store statistics for each feature
stats = {}

for feature in FEATURE_COLUMNS:
    # agg() calculates aggregate statistics
    feature_stats = df.agg(
        mean(col(feature)).alias("mean"),
        stddev(col(feature)).alias("stddev")
    ).first()
    
    stats[feature] = {
        "mean": feature_stats["mean"],
        "stddev": feature_stats["stddev"]
    }

# Display statistics
print("Feature Statistics:")
print("=" * 60)
print(f"{'Feature':<20} {'Mean':>15} {'StdDev':>15}")
print("-" * 60)
for feature, values in stats.items():
    print(f"{feature:<20} {values['mean']:>15.4f} {values['stddev']:>15.4f}")

---
## Step 4: Calculate Z-Scores
---

In [None]:
# ═══════════════════════════════════════════════════════════════════
# CALCULATE Z-SCORE FOR EACH FEATURE
# ═══════════════════════════════════════════════════════════════════

# Formula: z = (value - mean) / stddev

df_zscore = df

for feature in FEATURE_COLUMNS:
    feature_mean = stats[feature]["mean"]
    feature_std = stats[feature]["stddev"]
    
    # Add z-score column for this feature
    df_zscore = df_zscore.withColumn(
        f"{feature}_zscore",
        (col(feature) - lit(feature_mean)) / lit(feature_std)
    )

print("✓ Z-Scores calculated")
print("\nNew columns added:")
for feature in FEATURE_COLUMNS:
    print(f"  • {feature}_zscore")

In [None]:
# Show sample z-scores
df_zscore.select(
    "timestamp", "return", "return_zscore", "volume_change", "volume_change_zscore"
).show(5)

---
## Step 5: Flag Anomalies
---

In [None]:
# ═══════════════════════════════════════════════════════════════════
# FLAG ANOMALIES: |Z-Score| > THRESHOLD in ANY feature
# ═══════════════════════════════════════════════════════════════════

# Build condition: |z1| > 3 OR |z2| > 3 OR |z3| > 3 ...
anomaly_condition = None

for feature in FEATURE_COLUMNS:
    zscore_col = f"{feature}_zscore"
    condition = spark_abs(col(zscore_col)) > ZSCORE_THRESHOLD
    
    if anomaly_condition is None:
        anomaly_condition = condition
    else:
        anomaly_condition = anomaly_condition | condition  # OR

# Add anomaly label column
df_zscore = df_zscore.withColumn(
    "is_anomaly",
    when(anomaly_condition, 1).otherwise(0)
)

print("✓ Anomalies flagged")

In [None]:
# ═══════════════════════════════════════════════════════════════════
# COUNT RESULTS
# ═══════════════════════════════════════════════════════════════════

total = df_zscore.count()
anomalies = df_zscore.filter(col("is_anomaly") == 1).count()
normal = total - anomalies

print("\n" + "=" * 50)
print("Z-SCORE RESULTS")
print("=" * 50)
print(f"Total data points:  {total}")
print(f"Normal:             {normal} ({100*normal/total:.1f}%)")
print(f"Anomalies:          {anomalies} ({100*anomalies/total:.1f}%)")
print("=" * 50)

---
## Step 6: Examine Anomalies
---

In [None]:
# ═══════════════════════════════════════════════════════════════════
# VIEW DETECTED ANOMALIES
# ═══════════════════════════════════════════════════════════════════

print("Sample anomalies detected:")
print("-" * 50)

df_zscore.filter(col("is_anomaly") == 1) \
    .select("timestamp", "close", "return", "return_zscore", 
            "volume_change", "volume_change_zscore") \
    .show(10, truncate=False)

---
## Step 7: Save Results
---

In [None]:
# ═══════════════════════════════════════════════════════════════════
# SAVE RESULTS TO CSV
# ═══════════════════════════════════════════════════════════════════

# Create results directory
os.makedirs(OUTPUT_PATH, exist_ok=True)

# Select columns to save
result_columns = ["timestamp", "close", "return", "volume_change", 
                  "volatility_24h", "is_anomaly"]

# Add row ID for later joining
df_result = df_zscore.withColumn("row_id", monotonically_increasing_id())
df_result = df_result.select(["row_id"] + result_columns)

# Rename anomaly column to indicate method
df_result = df_result.withColumnRenamed("is_anomaly", "anomaly_zscore")

# Save
output_file = OUTPUT_PATH + "zscore_results"
df_result.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_file)

print(f"✓ Results saved to: {output_file}")

In [None]:
# ═══════════════════════════════════════════════════════════════════
# STOP SPARK
# ═══════════════════════════════════════════════════════════════════

spark.stop()
print("✓ Spark stopped")
print("\n" + "=" * 50)
print("Z-SCORE COMPLETE - Proceed to 04_kmeans.ipynb")
print("=" * 50)