# **Task 1 Code: Working Environment + Big Data Ecosystem Architecture (Spark)**

In [29]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# STEP 1: INSTALLATION

In [76]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark
!pip install -q plotly
!pip install -q kaleido


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/69.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.0/69.0 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/49.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.3/49.3 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h

# STEP 2: IMPORT LIBRARIES

In [77]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings
warnings.filterwarnings('ignore')


# STEP 3: INITIALIZE SPARK SESSION

In [78]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("EarthquakeDataArchitecture") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "10") \
    .getOrCreate()

print("Spark Session Created Successfully")
print(f"Spark Version: {spark.version}")


Spark Session Created Successfully
Spark Version: 4.0.1


# STEP 4: LOAD EARTHQUAKE DATA

In [79]:
df_spark = spark.read.csv("/content/drive/MyDrive/Big Data Analytics Lab /Open_Ended/global_earthquakes_10yrs.csv", header=True, inferSchema=True)

print(f"Total Records Loaded: {df_spark.count():,}")
print(f"Number of Columns: {len(df_spark.columns)}")



Total Records Loaded: 80,829
Number of Columns: 22


# STEP 5: ARCHITECTURE DIAGNOSTICS

In [81]:
print("BIG DATA ARCHITECTURE METRICS")

print(f"Worker Cores (Parallelism): {spark.sparkContext.defaultParallelism}")
print(f"Initial Data Partitions: {df_spark.rdd.getNumPartitions()}")
print(f"Configured Shuffle Partitions: {spark.conf.get('spark.sql.shuffle.partitions')}")

executor_memory = spark.conf.get('spark.executor.memory')
driver_memory = spark.conf.get('spark.driver.memory')
print(f"Executor Memory: {executor_memory}")
print(f"Driver Memory: {driver_memory}")


BIG DATA ARCHITECTURE METRICS
Worker Cores (Parallelism): 2
Initial Data Partitions: 2
Configured Shuffle Partitions: 10
Executor Memory: 4g
Driver Memory: 4g


# STEP 6: DATA REPARTITIONING BY YEAR

In [82]:
df_spark = df_spark.withColumn("time_parsed", to_timestamp(col("time")))
df_spark = df_spark.withColumn("year", year(col("time_parsed")))

df_repartitioned = df_spark.repartition(12, "year")

print(f"\nData Partitions After Repartitioning: {df_repartitioned.rdd.getNumPartitions()}")



Data Partitions After Repartitioning: 12


# STEP 7: PARTITION ANALYSIS

In [83]:
partition_counts = df_repartitioned.groupBy(spark_partition_id().alias("partition_id")) \
    .count() \
    .orderBy("partition_id")

partition_df = partition_counts.toPandas()
print("\nRecords per Partition:")
print(partition_df.to_string(index=False))



Records per Partition:
 partition_id  count
            1  16092
            6  20236
            7  21994
            9   7121
           11  15386


# STEP 8: WRITE TO DISTRIBUTED STORAGE

In [84]:
output_path = "/content/earthquake_distributed"

df_repartitioned.write \
    .mode("overwrite") \
    .partitionBy("year") \
    .parquet(output_path)

print(f"\nData successfully written to: {output_path}")



Data successfully written to: /content/earthquake_distributed


# STEP 9: VERIFY DISTRIBUTED STORAGE

In [85]:
df_reload = spark.read.parquet(output_path)
print(f"Reloaded Records: {df_reload.count():,}")


Reloaded Records: 80,829


# STEP 10: DISTRIBUTED PROCESSING DEMO

In [86]:
magnitude_stats = df_repartitioned.groupBy("year") \
    .agg(
        count("*").alias("total_events"),
        round(avg("mag"), 2).alias("avg_magnitude"),
        round(max("mag"), 2).alias("max_magnitude"),
        round(min("mag"), 2).alias("min_magnitude"),
        round(avg("depth"), 2).alias("avg_depth")
    ) \
    .orderBy("year")

stats_df = magnitude_stats.toPandas()
print("\nYear-wise Seismic Statistics:")
print(stats_df.to_string(index=False))



Year-wise Seismic Statistics:
 year  total_events  avg_magnitude  max_magnitude  min_magnitude  avg_depth
 2015          7121           4.80            8.3            4.5      67.48
 2016          7415           4.80            7.9            4.5      62.59
 2017          6342           4.81            8.2            4.5      69.92
 2018          7520           4.81            8.2            4.5      74.20
 2019          7170           4.80            8.0            4.5      63.76
 2020          6479           4.80            7.8            4.5      60.75
 2021          8922           4.81            8.2            4.5      52.12
 2022          7748           4.79            7.6            4.5      56.64
 2023          7638           4.81            7.8            4.5      60.55
 2024          6385           4.79            7.5            4.5      67.36
 2025          8089           4.81            8.8            4.5      52.69


# STEP 11: VISUALIZATION 1 - ARCHITECTURE DASHBOARD

In [90]:
fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=("Partition Distribution", "Yearly Event Distribution",
                    "Architecture Metrics", "Magnitude Trend Analysis"),
    specs=[[{"type": "bar"}, {"type": "scatter"}],
           [{"type": "indicator"}, {"type": "box"}]]
)

fig.add_trace(
    go.Bar(x=partition_df["partition_id"], y=partition_df["count"],
           marker=dict(color=partition_df["count"], colorscale="Viridis"),
           name="Partition Load"),
    row=1, col=1
)

fig.add_trace(
    go.Scatter(x=stats_df["year"], y=stats_df["total_events"],
               mode="lines+markers", line=dict(width=3, color="#FF6B6B"),
               marker=dict(size=10), name="Events per Year"),
    row=1, col=2
)

fig.add_trace(
    go.Indicator(
        mode="number+delta",
        value=df_spark.count(),
        title={"text": "Total Events Processed"},
        delta={"reference": 75000, "relative": True},
        domain={"x": [0, 1], "y": [0, 1]}
    ),
    row=2, col=1
)

fig.add_trace(
    go.Box(y=stats_df["avg_magnitude"], name="Avg Magnitude",
           marker_color="#4ECDC4", boxmean="sd"),
    row=2, col=2
)

fig.update_layout(
    height=900,
    showlegend=False,
    title_text="Big Data Ecosystem Architecture - Earthquake Analytics",
    title_font_size=20,
    template="plotly_dark"
)

fig.update_xaxes(title_text="Partition ID", row=1, col=1)
fig.update_yaxes(title_text="Record Count", row=1, col=1)
fig.update_xaxes(title_text="Year", row=1, col=2)
fig.update_yaxes(title_text="Total Events", row=1, col=2)

fig.write_html("task1_architecture_visualization.html")
print("Visualization saved: task1_architecture_visualization.html")
fig.show()


Visualization saved: task1_architecture_visualization.html


# STEP 12: VISUALIZATION 2 - CLUSTER ARCHITECTURE DIAGRAM

In [88]:
fig_arch = go.Figure()

fig_arch.add_trace(go.Scatter(
    x=[2], y=[5],
    mode="markers+text",
    marker=dict(size=60, color="#FF6B6B"),
    text=["Master Node<br>Driver"],
    textposition="bottom center",
    textfont=dict(size=12, color="white"),
    name="Master"
))

worker_positions = [[1, 3], [2, 3], [3, 3], [1, 1], [2, 1], [3, 1]]
for i, pos in enumerate(worker_positions):
    fig_arch.add_trace(go.Scatter(
        x=[pos[0]], y=[pos[1]],
        mode="markers+text",
        marker=dict(size=45, color="#4ECDC4"),
        text=[f"Worker {i+1}<br>Partition {i+1}"],
        textposition="bottom center",
        textfont=dict(size=10, color="white"),
        name=f"Worker {i+1}"
    ))

for pos in worker_positions:
    fig_arch.add_trace(go.Scatter(
        x=[2, pos[0]], y=[5, pos[1]],
        mode="lines",
        line=dict(color="white", width=2, dash="dot"),
        showlegend=False
    ))

fig_arch.add_trace(go.Scatter(
    x=[2], y=[0],
    mode="markers+text",
    marker=dict(size=70, color="#95E1D3", symbol="square"),
    text=["HDFS Storage<br>Parquet Format"],
    textposition="top center",
    textfont=dict(size=12, color="black"),
    name="Storage"
))

fig_arch.update_layout(
    title="Distributed Big Data Architecture - Spark Cluster",
    xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
    yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
    plot_bgcolor="#1e1e1e",
    paper_bgcolor="#1e1e1e",
    height=600,
    showlegend=False,
    font=dict(color="white")
)

fig_arch.write_html("task1_cluster_architecture.html")
print("Architecture diagram saved: task1_cluster_architecture.html")
fig_arch.show()


Architecture diagram saved: task1_cluster_architecture.html


# STEP 13: VISUALIZATION 3 - 3D DATA DISTRIBUTION

In [89]:
df_sample = df_repartitioned.limit(5000).toPandas()

fig_flow = px.scatter_3d(
    df_sample,
    x="longitude", y="latitude", z="depth",
    color="mag",
    size="mag",
    color_continuous_scale="Turbo",
    title="3D Data Distribution Across Distributed Nodes",
    labels={"mag": "Magnitude", "depth": "Depth (km)"}
)

fig_flow.update_layout(
    scene=dict(
        xaxis_title="Longitude",
        yaxis_title="Latitude",
        zaxis_title="Depth (km)",
        bgcolor="#1e1e1e"
    ),
    template="plotly_dark",
    height=700
)

fig_flow.write_html("task1_data_distribution.html")
print("Data distribution saved: task1_data_distribution.html")
fig_flow.show()


Data distribution saved: task1_data_distribution.html


# **Task 2 — Big Data Architecture (Volume–Variety–Velocity) and Distributed Design Implementation**

In [92]:
# STEP 2: VOLUME ANALYSIS (FIXED)

from pyspark.sql.functions import (
    count, sum as _sum, length, col, avg,
    max as _max, min as _min, countDistinct
)

volume_stats = df_repartitioned.agg(
    count("*").alias("total_records"),
    (_sum(length(col("place"))) / 1024 / 1024).alias("text_data_mb"),
    countDistinct("place").alias("unique_locations"),
    countDistinct("id").alias("unique_events")
).collect()[0]


# STEP 1: VOLUME ANALYSIS

In [99]:
print("3Vs ANALYSIS - VOLUME")
volume_stats = df_repartitioned.agg(
    count("*").alias("total_records"),
    (sum(length(col("place"))) / 1024 / 1024).alias("text_data_mb"),
    countDistinct("place").alias("unique_locations"),
    countDistinct("id").alias("unique_events")
).collect()[0]

print(f"Total Records: {volume_stats['total_records']:,}")
print(f"Text Data Size: {volume_stats['text_data_mb']:.2f} MB")
print(f"Unique Locations: {volume_stats['unique_locations']:,}")
print(f"Unique Event IDs: {volume_stats['unique_events']:,}")

data_growth = df_repartitioned.groupBy("year").count().orderBy("year").toPandas()
avg_growth = data_growth['count'].pct_change().mean() * 100

print(f"\nAverage Annual Growth Rate: {avg_growth:.2f}%")
print(f"Projected 2026 Events: {int(data_growth['count'].iloc[-1] * (1 + avg_growth/100)):,}")


3Vs ANALYSIS - VOLUME
Total Records: 80,829
Text Data Size: 2.25 MB
Unique Locations: 45,286
Unique Event IDs: 80,829

Average Annual Growth Rate: 2.74%
Projected 2026 Events: 8,310


# STEP 2: VELOCITY ANALYSIS


In [100]:
print("3Vs ANALYSIS - VELOCITY")

df_repartitioned = df_repartitioned.withColumn("month", month(col("time_parsed")))

monthly_velocity = df_repartitioned.groupBy("year", "month") \
    .count() \
    .orderBy("year", "month")

velocity_df = monthly_velocity.toPandas()
avg_monthly = velocity_df['count'].mean()
max_monthly = velocity_df['count'].max()

print(f"Average Events per Month: {avg_monthly:.0f}")
print(f"Peak Monthly Events: {max_monthly}")
print(f"Average Events per Day: {avg_monthly/30:.1f}")
print(f"Average Events per Hour: {avg_monthly/30/24:.2f}")

peak_month = velocity_df.loc[velocity_df['count'].idxmax()]
print(f"\nPeak Activity: {peak_month['year']}-{peak_month['month']:02d} with {peak_month['count']} events")


3Vs ANALYSIS - VELOCITY
Average Events per Month: 612
Peak Monthly Events: 1322
Average Events per Day: 20.4
Average Events per Hour: 0.85

Peak Activity: 2025-07 with 1322 events


# STEP 3: VARIETY ANALYSIS

In [101]:
print("3Vs ANALYSIS - VARIETY")

variety_analysis = df_repartitioned.select(
    countDistinct("magType").alias("magnitude_types"),
    countDistinct("net").alias("networks"),
    countDistinct("status").alias("status_types"),
    countDistinct("locationSource").alias("location_sources"),
    countDistinct("type").alias("event_types")
).collect()[0]

print("DATA VARIETY DIMENSIONS:")
print(f"   Magnitude Calculation Methods: {variety_analysis['magnitude_types']}")
print(f"   Seismic Networks: {variety_analysis['networks']}")
print(f"   Status Categories: {variety_analysis['status_types']}")
print(f"   Location Data Sources: {variety_analysis['location_sources']}")
print(f"   Event Types: {variety_analysis['event_types']}")

print("\nDATA FORMATS:")
print("   Temporal: ISO 8601 timestamps with timezone")
print("   Geospatial: WGS84 decimal coordinates")
print("   Numerical: Float64 (magnitude, depth, errors)")
print("   Text: Variable-length location descriptions")
print("   Categorical: Enumerated types (magType, status)")


3Vs ANALYSIS - VARIETY
DATA VARIETY DIMENSIONS:
   Magnitude Calculation Methods: 15
   Seismic Networks: 14
   Status Categories: 2
   Location Data Sources: 41
   Event Types: 5

DATA FORMATS:
   Temporal: ISO 8601 timestamps with timezone
   Geospatial: WGS84 decimal coordinates
   Numerical: Float64 (magnitude, depth, errors)
   Text: Variable-length location descriptions
   Categorical: Enumerated types (magType, status)


# STEP 4: STORAGE LAYER IMPLEMENTATION

In [103]:
print("STORAGE LAYER IMPLEMENTATION")

df_repartitioned.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("/content/earthquake_storage/raw_zone")

high_risk = df_repartitioned.filter(col("mag") >= 6.0)
high_risk.write \
    .mode("overwrite") \
    .parquet("/content/earthquake_storage/high_risk_zone")

print(" Raw Zone: Partitioned by year/month (12 years × 12 months)")
print(" High-Risk Zone: Filtered magnitude >= 6.0 events")

storage_stats = spark.read.parquet("/content/earthquake_storage/raw_zone")
high_risk_count = spark.read.parquet("/content/earthquake_storage/high_risk_zone").count()

print(f"\nRaw Zone Records: {storage_stats.count():,}")
print(f"High-Risk Zone Records: {high_risk_count:,}")
print(f"High-Risk Percentage: {(high_risk_count/storage_stats.count())*100:.2f}%")


STORAGE LAYER IMPLEMENTATION
 Raw Zone: Partitioned by year/month (12 years × 12 months)
 High-Risk Zone: Filtered magnitude >= 6.0 events

Raw Zone Records: 80,829
High-Risk Zone Records: 1,470
High-Risk Percentage: 1.82%


# STEP 5: PROCESSING MECHANISM - RISK SCORING

In [104]:
print("PROCESSING MECHANISM: SEISMIC RISK SCORING")

from pyspark.sql.window import Window

df_risk = df_repartitioned.withColumn(
    "magnitude_score",
    when(col("mag") >= 7.0, 10)
    .when(col("mag") >= 6.0, 8)
    .when(col("mag") >= 5.5, 6)
    .when(col("mag") >= 5.0, 4)
    .otherwise(2)
)

df_risk = df_risk.withColumn(
    "depth_score",
    when(col("depth") < 10, 8)
    .when(col("depth") < 50, 6)
    .when(col("depth") < 100, 4)
    .otherwise(2)
)

df_risk = df_risk.withColumn(
    "risk_score",
    (col("magnitude_score") + col("depth_score")) / 2
)

window_spec = Window.partitionBy("year").orderBy(col("risk_score").desc())
top_risks = df_risk.withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") <= 3) \
    .select("year", "time_parsed", "place", "mag", "depth", "risk_score") \
    .orderBy("year", "risk_score", ascending=[True, False])

print("\nTOP 3 HIGHEST RISK EVENTS PER YEAR:")
top_risks_df = top_risks.toPandas()
print(top_risks_df.to_string(index=False))


PROCESSING MECHANISM: SEISMIC RISK SCORING

TOP 3 HIGHEST RISK EVENTS PER YEAR:
 year             time_parsed                                                  place  mag  depth  risk_score
 2015 2015-04-25 06:11:25.950                          67 km NNE of Bharatpur, Nepal  7.8  8.220         9.0
 2015 2015-12-07 07:50:05.950                        104 km W of Murghob, Tajikistan  7.2 22.000         8.0
 2015 2015-12-04 22:25:00.110                                 southeast Indian Ridge  7.1 35.000         8.0
 2016 2016-12-25 14:22:27.010                             41 km SW of Quellón, Chile  7.6 38.000         8.0
 2016 2016-12-17 11:27:36.170                178 km WNW of Panguna, Papua New Guinea  6.3  8.430         8.0
 2016 2016-12-08 17:38:46.280                 69 km WSW of Kirakira, Solomon Islands  7.8 40.000         8.0
 2017 2017-12-12 21:41:31.140                              63 km NNE of Kerman, Iran  6.0  8.000         8.0
 2017 2017-12-08 09:51:08.050                   

In [105]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
from pyspark.sql.functions import col, year, month, hour, dayofweek, when, count, avg, stddev
import pandas as pd
import numpy as np


print("ADVANCED DATA ANALYSIS & VISUALIZATION - TASK 2\n")


print("Preparing advanced analytics datasets...\n")

df_analytics = df_risk.withColumn("hour", hour(col("time_parsed")))
df_analytics = df_analytics.withColumn("day_of_week", dayofweek(col("time_parsed")))

temporal_patterns = df_analytics.groupBy("year", "month").agg(
    count("*").alias("event_count"),
    avg("mag").alias("avg_magnitude"),
    avg("depth").alias("avg_depth"),
    avg("risk_score").alias("avg_risk")
).orderBy("year", "month").toPandas()

hourly_dist = df_analytics.groupBy("hour").count().orderBy("hour").toPandas()

depth_magnitude = df_analytics.select("depth", "mag", "risk_score").sample(False, 0.2).toPandas()

geographic_risk = df_analytics.groupBy("latitude", "longitude").agg(
    count("*").alias("event_count"),
    avg("risk_score").alias("avg_risk"),
    avg("mag").alias("avg_mag")
).filter(col("event_count") > 5).toPandas()

magnitude_distribution = df_analytics.groupBy("year").agg(
    count(when(col("mag") >= 7.0, 1)).alias("major"),
    count(when((col("mag") >= 6.0) & (col("mag") < 7.0), 1)).alias("strong"),
    count(when((col("mag") >= 5.5) & (col("mag") < 6.0), 1)).alias("moderate"),
    count(when(col("mag") < 5.5, 1)).alias("light")
).orderBy("year").toPandas()

network_analysis = df_analytics.groupBy("net", "magType").count().orderBy(col("count").desc()).limit(20).toPandas()


fig1 = make_subplots(
    rows=3, cols=2,
    subplot_titles=(
        "Temporal Trend: Event Frequency",
        "Magnitude vs Depth Correlation",
        "Hourly Distribution Pattern",
        "Risk Score Distribution",
        "Magnitude Category Evolution",
        "Geographic Clustering"
    ),
    specs=[
        [{"type": "scatter"}, {"type": "scatter"}],
        [{"type": "bar"}, {"type": "violin"}],
        [{"type": "bar"}, {"type": "scatter"}]
    ],
    vertical_spacing=0.12,
    horizontal_spacing=0.12
)

temporal_patterns['date'] = pd.to_datetime(temporal_patterns[['year', 'month']].assign(day=1))
fig1.add_trace(
    go.Scatter(
        x=temporal_patterns['date'],
        y=temporal_patterns['event_count'],
        mode='lines+markers',
        line=dict(color='#FF6B6B', width=2),
        marker=dict(size=6),
        name='Events',
        fill='tozeroy',
        fillcolor='rgba(255,107,107,0.2)'
    ),
    row=1, col=1
)

fig1.add_trace(
    go.Scatter(
        x=depth_magnitude['depth'],
        y=depth_magnitude['mag'],
        mode='markers',
        marker=dict(
            size=4,
            color=depth_magnitude['risk_score'],
            colorscale='Turbo',
            showscale=True,
            colorbar=dict(title="Risk", x=1.15, len=0.3, y=0.85)
        ),
        name='Events'
    ),
    row=1, col=2
)

fig1.add_trace(
    go.Bar(
        x=hourly_dist['hour'],
        y=hourly_dist['count'],
        marker=dict(
            color=hourly_dist['count'],
            colorscale='Viridis',
            showscale=False
        ),
        name='Hourly'
    ),
    row=2, col=1
)

risk_sample = df_analytics.select("risk_score").sample(False, 0.1).toPandas()
fig1.add_trace(
    go.Violin(
        y=risk_sample['risk_score'],
        box_visible=True,
        meanline_visible=True,
        fillcolor='#4ECDC4',
        opacity=0.6,
        name='Risk'
    ),
    row=2, col=2
)

magnitude_distribution_melted = magnitude_distribution.melt(
    id_vars=['year'],
    value_vars=['major', 'strong', 'moderate', 'light'],
    var_name='category',
    value_name='count'
)

colors_map = {'major': '#FF0000', 'strong': '#FF6B6B', 'moderate': '#FFA07A', 'light': '#FFD700'}
for category in ['major', 'strong', 'moderate', 'light']:
    data = magnitude_distribution_melted[magnitude_distribution_melted['category'] == category]
    fig1.add_trace(
        go.Bar(
            x=data['year'],
            y=data['count'],
            name=category.capitalize(),
            marker_color=colors_map[category]
        ),
        row=3, col=1
    )

fig1.add_trace(
    go.Scatter(
        x=geographic_risk['longitude'],
        y=geographic_risk['latitude'],
        mode='markers',
        marker=dict(
            size=geographic_risk['event_count']/2,
            color=geographic_risk['avg_risk'],
            colorscale='Hot',
            showscale=True,
            colorbar=dict(title="Avg Risk", x=1.15, len=0.3, y=0.15)
        ),
        name='Hotspots'
    ),
    row=3, col=2
)

fig1.update_xaxes(title_text="Time", row=1, col=1)
fig1.update_yaxes(title_text="Event Count", row=1, col=1)
fig1.update_xaxes(title_text="Depth (km)", row=1, col=2)
fig1.update_yaxes(title_text="Magnitude", row=1, col=2)
fig1.update_xaxes(title_text="Hour of Day", row=2, col=1)
fig1.update_yaxes(title_text="Events", row=2, col=1)
fig1.update_yaxes(title_text="Risk Score", row=2, col=2)
fig1.update_xaxes(title_text="Year", row=3, col=1)
fig1.update_yaxes(title_text="Count", row=3, col=1)
fig1.update_xaxes(title_text="Longitude", row=3, col=2)
fig1.update_yaxes(title_text="Latitude", row=3, col=2)

fig1.update_layout(
    height=1400,
    title_text="Comprehensive Seismic Data Analysis Dashboard",
    title_font_size=22,
    template="plotly_dark",
    showlegend=True,
    legend=dict(x=1.05, y=0.5)
)

fig1.write_html("task2_analysis_dashboard.html")
print("Dashboard saved: task2_analysis_dashboard.html")
fig1.show()


regional_data = df_analytics.withColumn(
    "region",
    when((col("longitude") >= -180) & (col("longitude") < -30), "Americas")
    .when((col("longitude") >= -30) & (col("longitude") < 60), "Europe-Africa")
    .when((col("longitude") >= 60) & (col("longitude") < 150), "Asia")
    .otherwise("Pacific")
)

regional_stats = regional_data.groupBy("region", "year").agg(
    count("*").alias("events"),
    avg("mag").alias("avg_mag"),
    avg("risk_score").alias("avg_risk")
).orderBy("year", "region").toPandas()

fig2 = make_subplots(
    rows=2, cols=2,
    subplot_titles=(
        "Regional Event Distribution",
        "Regional Magnitude Trends",
        "Risk Score by Region",
        "Regional Activity Heatmap"
    ),
    specs=[
        [{"type": "bar"}, {"type": "scatter"}],
        [{"type": "box"}, {"type": "heatmap"}]
    ]
)

for region in regional_stats['region'].unique():
    region_data = regional_stats[regional_stats['region'] == region]
    fig2.add_trace(
        go.Bar(
            x=region_data['year'],
            y=region_data['events'],
            name=region
        ),
        row=1, col=1
    )

for region in regional_stats['region'].unique():
    region_data = regional_stats[regional_stats['region'] == region]
    fig2.add_trace(
        go.Scatter(
            x=region_data['year'],
            y=region_data['avg_mag'],
            mode='lines+markers',
            name=region,
            line=dict(width=3)
        ),
        row=1, col=2
    )

regional_risk_dist = regional_data.select("region", "risk_score").sample(False, 0.1).toPandas()
for region in regional_risk_dist['region'].unique():
    region_risks = regional_risk_dist[regional_risk_dist['region'] == region]
    fig2.add_trace(
        go.Box(
            y=region_risks['risk_score'],
            name=region
        ),
        row=2, col=1
    )

pivot_data = regional_stats.pivot(index='region', columns='year', values='events').fillna(0)
fig2.add_trace(
    go.Heatmap(
        z=pivot_data.values,
        x=pivot_data.columns,
        y=pivot_data.index,
        colorscale='Reds',
        showscale=True
    ),
    row=2, col=2
)

fig2.update_xaxes(title_text="Year", row=1, col=1)
fig2.update_yaxes(title_text="Events", row=1, col=1)
fig2.update_xaxes(title_text="Year", row=1, col=2)
fig2.update_yaxes(title_text="Avg Magnitude", row=1, col=2)
fig2.update_yaxes(title_text="Risk Score", row=2, col=1)
fig2.update_xaxes(title_text="Year", row=2, col=2)
fig2.update_yaxes(title_text="Region", row=2, col=2)

fig2.update_layout(
    height=1000,
    title_text="Regional Seismic Analysis",
    title_font_size=22,
    template="plotly_dark",
    barmode='stack'
)

fig2.write_html("task2_regional_analysis.html")
print("Regional analysis saved: task2_regional_analysis.html")
fig2.show()


high_risk_events = df_analytics.filter(col("risk_score") >= 8.0).toPandas()

fig3 = px.scatter_geo(
    high_risk_events,
    lat='latitude',
    lon='longitude',
    color='mag',
    size='risk_score',
    hover_data=['place', 'depth', 'time_parsed'],
    color_continuous_scale='Reds',
    size_max=15,
    title='Global High-Risk Earthquake Events (Risk Score ≥ 8.0)',
    projection='natural earth'
)

fig3.update_layout(
    template='plotly_dark',
    height=700,
    geo=dict(
        showland=True,
        landcolor='rgb(30, 30, 30)',
        coastlinecolor='rgb(100, 100, 100)',
        showocean=True,
        oceancolor='rgb(10, 10, 30)'
    )
)

fig3.write_html("task2_global_risk_map.html")
print("Global risk map saved: task2_global_risk_map.html")
fig3.show()


time_series_data = temporal_patterns.copy()
time_series_data['ma_7'] = time_series_data['event_count'].rolling(window=7, min_periods=1).mean()
time_series_data['ma_30'] = time_series_data['event_count'].rolling(window=30, min_periods=1).mean()

fig4 = go.Figure()

fig4.add_trace(go.Scatter(
    x=time_series_data['date'],
    y=time_series_data['event_count'],
    mode='lines',
    name='Actual',
    line=dict(color='rgba(255,255,255,0.3)', width=1)
))

fig4.add_trace(go.Scatter(
    x=time_series_data['date'],
    y=time_series_data['ma_7'],
    mode='lines',
    name='7-Month MA',
    line=dict(color='#4ECDC4', width=2)
))

fig4.add_trace(go.Scatter(
    x=time_series_data['date'],
    y=time_series_data['ma_30'],
    mode='lines',
    name='30-Month MA',
    line=dict(color='#FF6B6B', width=3)
))

fig4.update_layout(
    title='Time Series Analysis with Moving Averages',
    xaxis_title='Date',
    yaxis_title='Event Count',
    template='plotly_dark',
    height=600,
    hovermode='x unified'
)

fig4.write_html("task2_time_series.html")
print("Time series analysis saved: task2_time_series.html")
fig4.show()


depth_bins = [-10, 10, 50, 100, 300, 700]
depth_labels = ['Shallow (<10km)', 'Upper (10-50km)', 'Mid (50-100km)', 'Deep (100-300km)', 'Very Deep (>300km)']

depth_analysis_df = df_analytics.select("depth", "mag", "risk_score").toPandas()
depth_analysis_df['depth_category'] = pd.cut(depth_analysis_df['depth'], bins=depth_bins, labels=depth_labels)

depth_stats = depth_analysis_df.groupby('depth_category').agg({
    'mag': ['mean', 'std', 'count'],
    'risk_score': 'mean'
}).reset_index()
depth_stats.columns = ['depth_category', 'avg_mag', 'std_mag', 'count', 'avg_risk']

fig5 = make_subplots(
    rows=1, cols=2,
    subplot_titles=('Depth Category Distribution', 'Magnitude by Depth Category'),
    specs=[[{"type": "bar"}, {"type": "box"}]]
)

fig5.add_trace(
    go.Bar(
        x=depth_stats['depth_category'],
        y=depth_stats['count'],
        marker=dict(
            color=depth_stats['avg_risk'],
            colorscale='Plasma',
            showscale=True,
            colorbar=dict(title="Avg Risk", x=0.45)
        ),
        text=depth_stats['count'],
        textposition='outside',
        name='Event Count'
    ),
    row=1, col=1
)

for category in depth_labels:
    category_data = depth_analysis_df[depth_analysis_df['depth_category'] == category]
    fig5.add_trace(
        go.Box(
            y=category_data['mag'],
            name=category,
            boxmean='sd'
        ),
        row=1, col=2
    )

fig5.update_xaxes(title_text="Depth Category", row=1, col=1)
fig5.update_yaxes(title_text="Event Count", row=1, col=1)
fig5.update_xaxes(title_text="Depth Category", row=1, col=2)
fig5.update_yaxes(title_text="Magnitude", row=1, col=2)

fig5.update_layout(
    height=600,
    title_text='Depth Category Analysis',
    title_font_size=20,
    template='plotly_dark',
    showlegend=False
)

fig5.write_html("task2_depth_analysis.html")
print("Depth analysis saved: task2_depth_analysis.html")
fig5.show()


network_summary = df_analytics.groupBy("net").agg(
    count("*").alias("total_events"),
    avg("mag").alias("avg_magnitude"),
    avg("risk_score").alias("avg_risk")
).orderBy(col("total_events").desc()).limit(10).toPandas()

fig6 = make_subplots(
    rows=1, cols=2,
    subplot_titles=('Top 10 Seismic Networks by Activity', 'Network Performance Metrics'),
    specs=[[{"type": "bar"}, {"type": "scatter"}]]
)

fig6.add_trace(
    go.Bar(
        y=network_summary['net'],
        x=network_summary['total_events'],
        orientation='h',
        marker=dict(
            color=network_summary['total_events'],
            colorscale='Blues',
            showscale=False
        ),
        text=network_summary['total_events'],
        textposition='outside',
        name='Events'
    ),
    row=1, col=1
)

fig6.add_trace(
    go.Scatter(
        x=network_summary['avg_magnitude'],
        y=network_summary['avg_risk'],
        mode='markers+text',
        marker=dict(
            size=network_summary['total_events']/100,
            color=network_summary['total_events'],
            colorscale='Viridis',
            showscale=True,
            colorbar=dict(title="Events")
        ),
        text=network_summary['net'],
        textposition='top center',
        name='Networks'
    ),
    row=1, col=2
)

fig6.update_xaxes(title_text="Total Events", row=1, col=1)
fig6.update_yaxes(title_text="Network", row=1, col=1)
fig6.update_xaxes(title_text="Avg Magnitude", row=1, col=2)
fig6.update_yaxes(title_text="Avg Risk Score", row=1, col=2)

fig6.update_layout(
    height=600,
    title_text='Seismic Network Analysis',
    title_font_size=20,
    template='plotly_dark'
)

fig6.write_html("task2_network_analysis.html")
print("Network analysis saved: task2_network_analysis.html")
fig6.show()


print("\nADVANCED VISUALIZATION SUMMARY\n")
print("Generated 6 comprehensive visualizations:")
print("  1. task2_analysis_dashboard.html - 6-panel comprehensive dashboard")
print("  2. task2_regional_analysis.html - Regional comparison & heatmap")
print("  3. task2_global_risk_map.html - Interactive global risk map")
print("  4. task2_time_series.html - Temporal trends with moving averages")
print("  5. task2_depth_analysis.html - Depth category insights")
print("  6. task2_network_analysis.html - Network performance metrics")
print("\nAll visualizations use modern dark theme with interactive features")


ADVANCED DATA ANALYSIS & VISUALIZATION - TASK 2

Preparing advanced analytics datasets...

Dashboard saved: task2_analysis_dashboard.html


Regional analysis saved: task2_regional_analysis.html


Global risk map saved: task2_global_risk_map.html


Time series analysis saved: task2_time_series.html


Depth analysis saved: task2_depth_analysis.html


Network analysis saved: task2_network_analysis.html



ADVANCED VISUALIZATION SUMMARY

Generated 6 comprehensive visualizations:
  1. task2_analysis_dashboard.html - 6-panel comprehensive dashboard
  2. task2_regional_analysis.html - Regional comparison & heatmap
  3. task2_global_risk_map.html - Interactive global risk map
  4. task2_time_series.html - Temporal trends with moving averages
  5. task2_depth_analysis.html - Depth category insights
  6. task2_network_analysis.html - Network performance metrics

All visualizations use modern dark theme with interactive features


In [107]:
from pyspark.sql.functions import (
    col, when, lit, concat, round as spark_round,
    year, month, dayofmonth, hour, dayofweek,
    lag, lead, datediff, abs as spark_abs,
    countDistinct, avg, sum as spark_sum, max as spark_max, min as spark_min,
    row_number, rank, dense_rank, percent_rank,
    explode, split, regexp_replace, substring
)
from pyspark.sql.window import Window
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px

In [108]:
print("TASK 3: DATA RETRIEVAL & TRANSFORMATION PIPELINE\n")
print("STEP 1: DATA FILTERING\n")

df_filtered_magnitude = df_repartitioned.filter(col("mag") >= 5.0)
print(f"Original records: {df_repartitioned.count():,}")
print(f"Filtered (mag >= 5.0): {df_filtered_magnitude.count():,}")

df_filtered_depth = df_repartitioned.filter((col("depth") >= 0) & (col("depth") <= 100))
print(f"Filtered (depth 0-100km): {df_filtered_depth.count():,}")

df_combined_filter = df_repartitioned.filter(
    (col("mag") >= 5.0) &
    (col("depth") <= 100) &
    (col("year") >= 2020)
)
print(f"Combined filter (mag>=5, depth<=100, year>=2020): {df_combined_filter.count():,}\n")

TASK 3: DATA RETRIEVAL & TRANSFORMATION PIPELINE

STEP 1: DATA FILTERING

Original records: 80,829
Filtered (mag >= 5.0): 18,899
Filtered (depth 0-100km): 67,817
Combined filter (mag>=5, depth<=100, year>=2020): 9,395



In [109]:
print("STEP 2: DATA AGGREGATION\n")

yearly_aggregation = df_repartitioned.groupBy("year").agg(
    spark_sum("mag").alias("total_magnitude"),
    avg("mag").alias("avg_magnitude"),
    spark_max("mag").alias("max_magnitude"),
    spark_min("mag").alias("min_magnitude"),
    avg("depth").alias("avg_depth"),
    countDistinct("place").alias("unique_locations")
).orderBy("year")

yearly_agg_df = yearly_aggregation.toPandas()
print("Yearly Aggregation:")
print(yearly_agg_df.to_string(index=False))
print()

monthly_aggregation = df_repartitioned.groupBy("year", "month").agg(
    countDistinct("id").alias("event_count"),
    avg("mag").alias("avg_magnitude"),
    avg("depth").alias("avg_depth")
).orderBy("year", "month")

monthly_agg_df = monthly_aggregation.toPandas()
print(f"Monthly aggregation records: {len(monthly_agg_df)}\n")

STEP 2: DATA AGGREGATION

Yearly Aggregation:
 year  total_magnitude  avg_magnitude  max_magnitude  min_magnitude  avg_depth  unique_locations
 2015     34184.890000       4.800574            8.3            4.5  67.478937              4815
 2016     35610.020000       4.802430            7.9            4.5  62.594136              4945
 2017     30522.850000       4.812811            8.2            4.5  69.916749              4500
 2018     36175.750000       4.810605            8.2            4.5  74.201313              5155
 2019     34441.500000       4.803556            8.0            4.5  63.764266              4981
 2020     31094.260000       4.799238            7.8            4.5  60.745563              4500
 2021     42930.200000       4.811724            8.2            4.5  52.118757              4687
 2022     37122.450000       4.791230            7.6            4.5  56.639154              4526
 2023     36701.620000       4.805135            7.8            4.5  60.553008   

In [110]:
print("STEP 3: CREATING DERIVED COLUMNS\n")

df_derived = df_repartitioned.withColumn(
    "magnitude_category",
    when(col("mag") >= 7.0, "Major")
    .when(col("mag") >= 6.0, "Strong")
    .when(col("mag") >= 5.5, "Moderate")
    .otherwise("Light")
)

df_derived = df_derived.withColumn(
    "depth_category",
    when(col("depth") < 10, "Shallow")
    .when(col("depth") < 50, "Upper Crust")
    .when(col("depth") < 100, "Mid Crust")
    .when(col("depth") < 300, "Deep")
    .otherwise("Very Deep")
)

df_derived = df_derived.withColumn(
    "energy_release",
    spark_round(10 ** (1.5 * col("mag") + 4.8), 2)
)

df_derived = df_derived.withColumn(
    "location_summary",
    concat(
        lit("Mag "),
        spark_round(col("mag"), 1),
        lit(" at "),
        spark_round(col("depth"), 0),
        lit("km depth")
    )
)

derived_sample = df_derived.select(
    "time_parsed", "mag", "depth",
    "magnitude_category", "depth_category",
    "energy_release", "location_summary"
).limit(5).toPandas()

print("Sample of derived columns:")
print(derived_sample.to_string(index=False))
print()


STEP 3: CREATING DERIVED COLUMNS

Sample of derived columns:
            time_parsed  mag  depth magnitude_category depth_category  energy_release        location_summary
2019-12-30 19:20:16.923  4.5  10.00              Light    Upper Crust    3.548134e+11 Mag 4.5 at 10.0km depth
2019-12-30 18:58:20.221  4.6  10.00              Light    Upper Crust    5.011872e+11 Mag 4.6 at 10.0km depth
2019-12-30 18:43:36.410  4.9  10.00              Light    Upper Crust    1.412538e+12 Mag 4.9 at 10.0km depth
2019-12-30 18:14:06.078  4.5  10.00              Light    Upper Crust    3.548134e+11 Mag 4.5 at 10.0km depth
2019-12-30 17:49:59.468  5.0  24.32              Light    Upper Crust    1.995262e+12 Mag 5.0 at 24.0km depth



In [111]:
print("STEP 4: WINDOW FUNCTIONS & RANKING\n")

window_year = Window.partitionBy("year").orderBy(col("mag").desc())

df_ranked = df_derived.withColumn("magnitude_rank", row_number().over(window_year))
df_ranked = df_ranked.withColumn("magnitude_dense_rank", dense_rank().over(window_year))

top_events = df_ranked.filter(col("magnitude_rank") <= 3).select(
    "year", "magnitude_rank", "mag", "depth", "place"
).orderBy("year", "magnitude_rank")

top_events_df = top_events.toPandas()
print("Top 3 events per year by magnitude:")
print(top_events_df.head(15).to_string(index=False))
print()


STEP 4: WINDOW FUNCTIONS & RANKING

Top 3 events per year by magnitude:
 year  magnitude_rank  mag  depth                                  place
 2015               1  8.3  22.44              48 km W of Illapel, Chile
 2015               2  7.8 664.00            Bonin Islands, Japan region
 2015               3  7.8   8.22          67 km NNE of Bharatpur, Nepal
 2016               1  7.9  94.54   140 km E of Kokopo, Papua New Guinea
 2016               2  7.8  40.00 69 km WSW of Kirakira, Solomon Islands
 2016               3  7.8  15.11     53 km NNE of Amberley, New Zealand
 2017               1  8.2  47.39    2017 Tehuantepec, Mexico Earthquake
 2017               2  7.9 135.00 35 km WNW of Panguna, Papua New Guinea
 2017               3  7.7  10.00   Komandorskiye Ostrova, Russia region
 2018               1  8.2 600.00                   2018 Fiji Earthquake
 2018               2  7.9 670.81                45 km S of Levuka, Fiji
 2018               3  7.9  14.06           261 km S

In [112]:
print("STEP 5: LAG/LEAD OPERATIONS\n")

window_time = Window.orderBy("time_parsed")

df_temporal = df_derived.withColumn("prev_magnitude", lag(col("mag"), 1).over(window_time))
df_temporal = df_temporal.withColumn("next_magnitude", lead(col("mag"), 1).over(window_time))

df_temporal = df_temporal.withColumn(
    "magnitude_change",
    col("mag") - col("prev_magnitude")
)

temporal_sample = df_temporal.select(
    "time_parsed", "mag", "prev_magnitude", "next_magnitude", "magnitude_change"
).filter(col("prev_magnitude").isNotNull()).limit(10).toPandas()

print("Temporal pattern analysis:")
print(temporal_sample.to_string(index=False))
print()



STEP 5: LAG/LEAD OPERATIONS

Temporal pattern analysis:
            time_parsed  mag  prev_magnitude  next_magnitude  magnitude_change
2015-01-01 06:48:29.670 4.60            4.80            4.70             -0.20
2015-01-01 06:54:20.570 4.70            4.60            4.60              0.10
2015-01-01 07:12:44.230 4.60            4.70            5.10             -0.10
2015-01-01 08:49:53.200 5.10            4.60            4.80              0.50
2015-01-01 09:42:00.670 4.80            5.10            4.70             -0.30
2015-01-01 10:07:05.770 4.70            4.80            4.80             -0.10
2015-01-01 10:08:27.740 4.80            4.70            4.70              0.10
2015-01-01 12:04:49.450 4.70            4.80            5.36             -0.10
2015-01-01 12:16:14.570 5.36            4.70            4.70              0.66
2015-01-01 13:57:14.670 4.70            5.36            4.60             -0.66



In [113]:
print("STEP 6: JOINING DATASETS\n")

yearly_stats = df_derived.groupBy("year").agg(
    countDistinct("id").alias("yearly_event_count"),
    avg("mag").alias("yearly_avg_magnitude")
)

df_joined = df_derived.join(yearly_stats, on="year", how="left")

df_joined = df_joined.withColumn(
    "magnitude_vs_avg",
    spark_round(col("mag") - col("yearly_avg_magnitude"), 2)
)

joined_sample = df_joined.select(
    "year", "mag", "yearly_avg_magnitude", "magnitude_vs_avg", "place"
).limit(5).toPandas()

print("Joined data with yearly statistics:")
print(joined_sample.to_string(index=False))
print()


STEP 6: JOINING DATASETS

Joined data with yearly statistics:
 year  mag  yearly_avg_magnitude  magnitude_vs_avg                         place
 2019  4.5              4.803556              -0.3 44 km SSE of Gilgit, Pakistan
 2019  4.6              4.803556              -0.2    65 km WSW of Jaqué, Panama
 2019  4.9              4.803556               0.1   Bonin Islands, Japan region
 2019  4.5              4.803556              -0.3              Mid-Indian Ridge
 2019  5.0              4.803556               0.2  43 km SE of Gilgit, Pakistan



In [114]:
print("STEP 7: TEXT TRANSFORMATION\n")

df_text = df_derived.withColumn(
    "place_clean",
    regexp_replace(col("place"), "[0-9]+\\s*km\\s*", "")
)

df_text = df_text.withColumn(
    "first_word",
    substring(col("place"), 1, 20)
)

df_text = df_text.withColumn(
    "has_offshore",
    when(col("place").contains("offshore"), "Yes").otherwise("No")
)

text_sample = df_text.select(
    "place", "place_clean", "first_word", "has_offshore"
).limit(5).toPandas()

print("Text transformation samples:")
print(text_sample.to_string(index=False))
print()



STEP 7: TEXT TRANSFORMATION

Text transformation samples:
                        place                 place_clean           first_word has_offshore
44 km SSE of Gilgit, Pakistan     SSE of Gilgit, Pakistan 44 km SSE of Gilgit,           No
   65 km WSW of Jaqué, Panama        WSW of Jaqué, Panama 65 km WSW of Jaqué,            No
  Bonin Islands, Japan region Bonin Islands, Japan region Bonin Islands, Japan           No
             Mid-Indian Ridge            Mid-Indian Ridge     Mid-Indian Ridge           No
 43 km SE of Gilgit, Pakistan      SE of Gilgit, Pakistan 43 km SE of Gilgit,            No



In [115]:
print("STEP 8: COMPLEX AGGREGATION\n")

complex_agg = df_derived.groupBy("year", "magnitude_category", "depth_category").agg(
    countDistinct("id").alias("event_count"),
    avg("mag").alias("avg_magnitude"),
    avg("depth").alias("avg_depth")
).orderBy("year", "magnitude_category", "depth_category")

complex_agg_df = complex_agg.toPandas()
print(f"Complex aggregation records: {len(complex_agg_df)}")
print("Sample:")
print(complex_agg_df.head(10).to_string(index=False))
print()

STEP 8: COMPLEX AGGREGATION

Complex aggregation records: 206
Sample:
 year magnitude_category depth_category  event_count  avg_magnitude  avg_depth
 2015              Light           Deep          806       4.684988 155.539056
 2015              Light      Mid Crust          821       4.710219  68.935396
 2015              Light        Shallow          270       4.756667   7.066767
 2015              Light    Upper Crust         4441       4.740009  20.924430
 2015              Light      Very Deep          338       4.660651 518.952367
 2015              Major           Deep            2       7.300000 183.000000
 2015              Major      Mid Crust            1       7.500000  55.000000
 2015              Major        Shallow            1       7.800000   8.220000
 2015              Major    Upper Crust           11       7.236364  23.593636
 2015              Major      Very Deep            4       7.500000 610.707500



In [116]:
print("STEP 9: PIVOT OPERATIONS\n")

pivot_data = df_derived.groupBy("year").pivot("magnitude_category").count().fillna(0).orderBy("year")

pivot_df = pivot_data.toPandas()
print("Pivot table: Magnitude categories by year")
print(pivot_df.to_string(index=False))
print()


STEP 9: PIVOT OPERATIONS

Pivot table: Magnitude categories by year
 year  Light  Major  Moderate  Strong
 2015   6676     19       299     127
 2016   6945     16       323     131
 2017   5945      7       286     104
 2018   7038     17       349     116
 2019   6727     10       298     135
 2020   6074      9       284     112
 2021   8361     19       404     138
 2022   7301     11       320     116
 2023   7118     19       373     128
 2024   6041     10       245      89
 2025   7600     16       352     121



In [117]:
print("STEP 10: DATA QUALITY CHECKS\n")

quality_check = df_derived.select([
    countDistinct("id").alias("unique_ids"),
    spark_sum(when(col("mag").isNull(), 1).otherwise(0)).alias("null_magnitude"),
    spark_sum(when(col("depth").isNull(), 1).otherwise(0)).alias("null_depth"),
    spark_sum(when(col("mag") < 0, 1).otherwise(0)).alias("negative_magnitude"),
    spark_sum(when(col("depth") < 0, 1).otherwise(0)).alias("negative_depth"),
    spark_sum(when(col("mag") > 10, 1).otherwise(0)).alias("extreme_magnitude")
])

quality_df = quality_check.toPandas()
print("Data quality metrics:")
print(quality_df.to_string(index=False))
print()

STEP 10: DATA QUALITY CHECKS

Data quality metrics:
 unique_ids  null_magnitude  null_depth  negative_magnitude  negative_depth  extreme_magnitude
      80829               0           0                   0              12                  0



In [126]:
print("\nQUICK FIX: HANDLING NEGATIVE DEPTHS\n")

negative_depths = df_derived.filter(col("depth") < 0).select(
    "time_parsed", "latitude", "longitude", "depth", "mag", "place"
)
negative_count = negative_depths.count()

if negative_count > 0:
    print(f"Found {negative_count} negative depth records:")
    negative_depths.show(negative_count, truncate=False)

    df_derived = df_derived.filter(col("depth") >= 0)
    print(f"\n Removed {negative_count} negative depth records")
    print(f" Cleaned records: {df_derived.count():,}\n")
else:
    print(" No negative depths found - data is clean!\n")

final_check = df_derived.select([
    spark_sum(when(col("depth") < 0, 1).otherwise(0)).alias("negative_depth"),
    spark_min("depth").alias("min_depth"),
    countDistinct("id").alias("total_records")
]).toPandas()

print("Verification after fix:")
print(final_check.to_string(index=False))


QUICK FIX: HANDLING NEGATIVE DEPTHS

 No negative depths found - data is clean!

Verification after fix:
 negative_depth  min_depth  total_records
              0        0.0          80817


In [128]:
print("STEP 11: SAVING TRANSFORMED DATA\n")

df_final = df_joined.filter(col("depth") >= 0).select(
    "time_parsed", "latitude", "longitude", "depth", "mag",
    "magnitude_category", "depth_category", "energy_release",
    "year", "month", "place", "yearly_avg_magnitude", "magnitude_vs_avg"
)

print(f"Records before saving (after cleaning): {df_final.count():,}")

df_final.write.mode("overwrite").partitionBy("year", "magnitude_category").parquet("/content/earthquake_transformed")

print(" Transformed data saved to: /content/earthquake_transformed")
print(f" Final dataset records: {df_final.count():,}")
print(" Partitioned by: year + magnitude_category")
print(" Data quality: 100% (no negative depths)\n")

# Verification - FIXED: removed 'id' column reference
saved_verification = df_final.select([
    spark_min("depth").alias("min_depth"),
    spark_max("depth").alias("max_depth"),
    avg("depth").alias("avg_depth"),
    count("*").alias("total_records")  # Changed from countDistinct("id")
]).toPandas()

print("Saved data statistics:")
print(saved_verification.to_string(index=False))
print()


STEP 11: SAVING TRANSFORMED DATA

Records before saving (after cleaning): 80,817
 Transformed data saved to: /content/earthquake_transformed
 Final dataset records: 80,817
 Partitioned by: year + magnitude_category
 Data quality: 100% (no negative depths)

Saved data statistics:
 min_depth  max_depth  avg_depth  total_records
       0.0     683.36  62.109177          80817



In [119]:
print("STEP 12: TRANSFORMATION VISUALIZATIONS\n")

fig1 = make_subplots(
    rows=2, cols=2,
    subplot_titles=(
        "Magnitude Distribution by Category",
        "Depth Distribution by Category",
        "Energy Release by Year",
        "Event Count by Categories"
    ),
    specs=[
        [{"type": "box"}, {"type": "box"}],
        [{"type": "scatter"}, {"type": "bar"}]
    ]
)

mag_cat_data = df_derived.select("magnitude_category", "mag").toPandas()
for cat in ["Major", "Strong", "Moderate", "Light"]:
    cat_data = mag_cat_data[mag_cat_data["magnitude_category"] == cat]
    fig1.add_trace(
        go.Box(y=cat_data["mag"], name=cat),
        row=1, col=1
    )

depth_cat_data = df_derived.select("depth_category", "depth").toPandas()
for cat in ["Shallow", "Upper Crust", "Mid Crust", "Deep", "Very Deep"]:
    cat_data = depth_cat_data[depth_cat_data["depth_category"] == cat]
    if len(cat_data) > 0:
        fig1.add_trace(
            go.Box(y=cat_data["depth"], name=cat),
            row=1, col=2
        )

energy_yearly = df_derived.groupBy("year").agg(
    avg("energy_release").alias("avg_energy")
).orderBy("year").toPandas()

fig1.add_trace(
    go.Scatter(
        x=energy_yearly["year"],
        y=energy_yearly["avg_energy"],
        mode="lines+markers",
        line=dict(color="#FF6B6B", width=3),
        marker=dict(size=10),
        name="Avg Energy"
    ),
    row=2, col=1
)

category_counts = df_derived.groupBy("magnitude_category").count().orderBy("count", ascending=False).toPandas()

fig1.add_trace(
    go.Bar(
        x=category_counts["magnitude_category"],
        y=category_counts["count"],
        marker=dict(
            color=category_counts["count"],
            colorscale="Viridis",
            showscale=False
        ),
        text=category_counts["count"],
        textposition="outside",
        name="Events"
    ),
    row=2, col=2
)

fig1.update_xaxes(title_text="Category", row=1, col=1)
fig1.update_yaxes(title_text="Magnitude", row=1, col=1)
fig1.update_xaxes(title_text="Category", row=1, col=2)
fig1.update_yaxes(title_text="Depth (km)", row=1, col=2)
fig1.update_xaxes(title_text="Year", row=2, col=1)
fig1.update_yaxes(title_text="Avg Energy Release", row=2, col=1)
fig1.update_xaxes(title_text="Category", row=2, col=2)
fig1.update_yaxes(title_text="Count", row=2, col=2)

fig1.update_layout(
    height=1000,
    title_text="Data Transformation Impact Analysis",
    title_font_size=20,
    template="plotly_dark",
    showlegend=False
)

fig1.write_html("task3_transformation_analysis.html")
print("Visualization saved: task3_transformation_analysis.html")
fig1.show()


STEP 12: TRANSFORMATION VISUALIZATIONS

Visualization saved: task3_transformation_analysis.html


In [121]:
print("STEP 13: PIPELINE SUMMARY\n")

fig2 = go.Figure()

pipeline_steps = [
    {"step": "1. Original Data", "records": df_repartitioned.count()},
    {"step": "2. Filtered (mag>=5)", "records": df_filtered_magnitude.count()},
    {"step": "3. Combined Filter", "records": df_combined_filter.count()},
    {"step": "4. With Derived Cols", "records": df_derived.count()},
    {"step": "5. Final Transformed", "records": df_final.count()}
]

steps = [p["step"] for p in pipeline_steps]
records = [p["records"] for p in pipeline_steps]

fig2.add_trace(go.Bar(
    x=steps,
    y=records,
    marker=dict(
        color=records,
        colorscale="Blues",
        showscale=False
    ),
    text=records,
    textposition="outside",
    texttemplate='%{text:,}'
))

fig2.update_layout(
    title="Data Transformation Pipeline - Record Flow",
    xaxis_title="Pipeline Step",
    yaxis_title="Record Count",
    template="plotly_dark",
    height=600
)

fig2.write_html("task3_pipeline_flow.html")
print("Pipeline flow saved: task3_pipeline_flow.html")
fig2.show()



STEP 13: PIPELINE SUMMARY

Pipeline flow saved: task3_pipeline_flow.html


# **TASK 4: ANALYTICAL PROCESSING & INSIGHTS EXTRACTION**

In [143]:
from pyspark.sql.functions import (
    col, when, count, avg, sum as spark_sum, max as spark_max, min as spark_min,
    stddev, variance, corr, countDistinct,
    year, month, dayofweek, hour, quarter,
    lag, lead, row_number, dense_rank, percent_rank,
    window, datediff, abs as spark_abs
)
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator, ClusteringEvaluator
from pyspark.ml.stat import Correlation
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
import pandas as pd
import numpy as np


In [144]:
print("STEP 1: LOADING CLEANED TRANSFORMED DATA\n")

df_analysis = spark.read.parquet("/content/earthquake_transformed")

print(f"✓ Loaded records: {df_analysis.count():,}")
print(f"✓ Columns: {len(df_analysis.columns)}")
print("\nSchema:")
df_analysis.printSchema()
print()


STEP 1: LOADING CLEANED TRANSFORMED DATA

✓ Loaded records: 80,817
✓ Columns: 13

Schema:
root
 |-- time_parsed: timestamp (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- depth: double (nullable = true)
 |-- mag: double (nullable = true)
 |-- depth_category: string (nullable = true)
 |-- energy_release: double (nullable = true)
 |-- month: integer (nullable = true)
 |-- place: string (nullable = true)
 |-- yearly_avg_magnitude: double (nullable = true)
 |-- magnitude_vs_avg: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- magnitude_category: string (nullable = true)




In [145]:
print("STEP 2: TEMPORAL TREND ANALYSIS\n")

# Monthly trends
monthly_trends = df_analysis.groupBy("year", "month").agg(
    count("*").alias("event_count"),
    avg("mag").alias("avg_magnitude"),
    avg("depth").alias("avg_depth"),
    spark_max("mag").alias("max_magnitude")
).orderBy("year", "month").toPandas()

monthly_trends['date'] = pd.to_datetime(monthly_trends[['year', 'month']].assign(day=1))

print("Monthly Trends Summary:")
print(f"  • Total months analyzed: {len(monthly_trends)}")
print(f"  • Average events per month: {monthly_trends['event_count'].mean():.0f}")
print(f"  • Peak month: {monthly_trends.loc[monthly_trends['event_count'].idxmax(), 'date'].strftime('%Y-%m')}")
print(f"  • Peak events: {monthly_trends['event_count'].max()}")

# Yearly summary
yearly_summary = df_analysis.groupBy("year").agg(
    count("*").alias("total_events"),
    avg("mag").alias("avg_magnitude"),
    avg("depth").alias("avg_depth"),
    spark_max("mag").alias("max_magnitude"),
    spark_min("mag").alias("min_magnitude"),
    stddev("mag").alias("std_magnitude")
).orderBy("year").toPandas()

print("\nYearly Summary Statistics:")
print(yearly_summary.to_string(index=False))
print()



STEP 2: TEMPORAL TREND ANALYSIS

Monthly Trends Summary:
  • Total months analyzed: 132
  • Average events per month: 612
  • Peak month: 2025-07
  • Peak events: 1322

Yearly Summary Statistics:
 year  total_events  avg_magnitude  avg_depth  max_magnitude  min_magnitude  std_magnitude
 2015          7121       4.800574  67.478937            8.3            4.5       0.382353
 2016          7415       4.802430  62.594136            7.9            4.5       0.379377
 2017          6342       4.812811  69.916749            8.2            4.5       0.367763
 2018          7509       4.809875  74.310655            8.2            4.5       0.374962
 2019          7170       4.803556  63.764266            8.0            4.5       0.366086
 2020          6478       4.799250  60.755028            7.8            4.5       0.366970
 2021          8922       4.811724  52.118757            8.2            4.5       0.370613
 2022          7748       4.791230  56.639154            7.6            4.5 

In [147]:
print("STEP 3: MAGNITUDE DISTRIBUTION ANALYSIS\n")

magnitude_distribution = df_analysis.groupBy("magnitude_category").agg(
    count("*").alias("event_count"),
    avg("mag").alias("avg_magnitude"),
    avg("depth").alias("avg_depth"),
    avg("energy_release").alias("avg_energy")
).orderBy("event_count", ascending=False).toPandas()

print("Magnitude Category Distribution:")
print(magnitude_distribution.to_string(index=False))

# Calculate percentages
magnitude_distribution['percentage'] = (
    magnitude_distribution['event_count'] / magnitude_distribution['event_count'].sum() * 100
)

print("\nMagnitude Category Percentages:")
for _, row in magnitude_distribution.iterrows():
    print(f"  • {row['magnitude_category']}: {row['percentage']:.2f}% ({int(row['event_count']):,} events)")
print()

STEP 3: MAGNITUDE DISTRIBUTION ANALYSIS

Magnitude Category Distribution:
magnitude_category  event_count  avg_magnitude  avg_depth   avg_energy
             Light        75814       4.733899  61.770372 1.198062e+12
          Moderate         3533       5.653320  61.220488 2.142358e+13
            Strong         1317       6.272445  79.501441 2.623896e+14
             Major          153       7.349020 100.803967 2.037060e+16

Magnitude Category Percentages:
  • Light: 93.81% (75,814 events)
  • Moderate: 4.37% (3,533 events)
  • Strong: 1.63% (1,317 events)
  • Major: 0.19% (153 events)



In [148]:
print("STEP 4: DISTRIBUTED ML - MAGNITUDE CATEGORY PREDICTION\n")
print("Building Random Forest Classifier using PySpark MLlib...\n")

# Prepare features for classification
feature_cols = ["depth", "latitude", "longitude", "year", "month"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Encode target variable
indexer = StringIndexer(inputCol="magnitude_category", outputCol="label")

# Random Forest Classifier (Distributed)
rf_classifier = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    numTrees=100,
    maxDepth=10,
    seed=42
)

# Create ML Pipeline
rf_pipeline = Pipeline(stages=[assembler, indexer, rf_classifier])

# Train-test split (80-20)
train_data, test_data = df_analysis.randomSplit([0.8, 0.2], seed=42)

print(f"Training set size: {train_data.count():,}")
print(f"Test set size: {test_data.count():,}")
print("\nTraining Random Forest model...")

# Train the model (DISTRIBUTED)
rf_model = rf_pipeline.fit(train_data)

print("✓ Model training completed\n")

# Make predictions
rf_predictions = rf_model.transform(test_data)

# Evaluate model
rf_evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

rf_accuracy = rf_evaluator.evaluate(rf_predictions)

# Additional metrics
rf_f1 = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
).evaluate(rf_predictions)

rf_precision = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="weightedPrecision"
).evaluate(rf_predictions)

rf_recall = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="weightedRecall"
).evaluate(rf_predictions)

print("RANDOM FOREST CLASSIFIER RESULTS:")
print(f"  • Accuracy: {rf_accuracy:.4f} ({rf_accuracy*100:.2f}%)")
print(f"  • F1 Score: {rf_f1:.4f}")
print(f"  • Precision: {rf_precision:.4f}")
print(f"  • Recall: {rf_recall:.4f}")

# Feature importance
rf_trained_model = rf_model.stages[-1]
feature_importance = rf_trained_model.featureImportances.toArray()

importance_df = pd.DataFrame({
    'Feature': feature_cols,
    'Importance': feature_importance
}).sort_values('Importance', ascending=False)

print("\nFeature Importance:")
print(importance_df.to_string(index=False))
print()


STEP 4: DISTRIBUTED ML - MAGNITUDE CATEGORY PREDICTION

Building Random Forest Classifier using PySpark MLlib...

Training set size: 64,478
Test set size: 16,339

Training Random Forest model...
✓ Model training completed

RANDOM FOREST CLASSIFIER RESULTS:
  • Accuracy: 0.9375 (93.75%)
  • F1 Score: 0.9078
  • Precision: 0.8971
  • Recall: 0.9375

Feature Importance:
  Feature  Importance
    depth    0.266779
 latitude    0.207001
longitude    0.188623
     year    0.170265
    month    0.167332



In [149]:
print("STEP 5: DISTRIBUTED ML - MAGNITUDE REGRESSION\n")
print("Building Gradient Boosting Regressor...\n")

# Prepare features for regression
gbt_assembler = VectorAssembler(
    inputCols=["depth", "latitude", "longitude", "year", "month"],
    outputCol="features"
)

# Gradient Boosted Trees Regressor
gbt_regressor = GBTRegressor(
    featuresCol="features",
    labelCol="mag",
    maxIter=100,
    maxDepth=5,
    seed=42
)

# Pipeline
gbt_pipeline = Pipeline(stages=[gbt_assembler, gbt_regressor])

# Use same train/test split
print("Training Gradient Boosting model...")

gbt_model = gbt_pipeline.fit(train_data)

print("✓ Model training completed\n")

# Predictions
gbt_predictions = gbt_model.transform(test_data)

# Evaluate
gbt_evaluator_rmse = RegressionEvaluator(
    labelCol="mag",
    predictionCol="prediction",
    metricName="rmse"
)

gbt_evaluator_mae = RegressionEvaluator(
    labelCol="mag",
    predictionCol="prediction",
    metricName="mae"
)

gbt_evaluator_r2 = RegressionEvaluator(
    labelCol="mag",
    predictionCol="prediction",
    metricName="r2"
)

gbt_rmse = gbt_evaluator_rmse.evaluate(gbt_predictions)
gbt_mae = gbt_evaluator_mae.evaluate(gbt_predictions)
gbt_r2 = gbt_evaluator_r2.evaluate(gbt_predictions)

print("GRADIENT BOOSTING REGRESSOR RESULTS:")
print(f"  • RMSE: {gbt_rmse:.4f}")
print(f"  • MAE: {gbt_mae:.4f}")
print(f"  • R² Score: {gbt_r2:.4f}")

# Get feature importance
gbt_trained_model = gbt_model.stages[-1]
gbt_importance = gbt_trained_model.featureImportances.toArray()

gbt_importance_df = pd.DataFrame({
    'Feature': feature_cols,
    'Importance': gbt_importance
}).sort_values('Importance', ascending=False)

print("\nFeature Importance:")
print(gbt_importance_df.to_string(index=False))
print()

STEP 5: DISTRIBUTED ML - MAGNITUDE REGRESSION

Building Gradient Boosting Regressor...

Training Gradient Boosting model...
✓ Model training completed

GRADIENT BOOSTING REGRESSOR RESULTS:
  • RMSE: 0.3599
  • MAE: 0.2576
  • R² Score: 0.0557

Feature Importance:
  Feature  Importance
    depth    0.289862
longitude    0.239881
 latitude    0.225728
     year    0.126229
    month    0.118299



In [150]:
print("STEP 6: DISTRIBUTED ML - SPATIAL CLUSTERING\n")
print("Building K-Means Clustering model...\n")

# Prepare features for clustering
cluster_assembler = VectorAssembler(
    inputCols=["latitude", "longitude", "depth", "mag"],
    outputCol="features_raw"
)

# Standard Scaler
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)

# K-Means with 5 clusters
kmeans = KMeans(
    k=5,
    seed=42,
    featuresCol="features",
    predictionCol="cluster"
)

# Pipeline
kmeans_pipeline = Pipeline(stages=[cluster_assembler, scaler, kmeans])

print("Training K-Means clustering model...")

kmeans_model = kmeans_pipeline.fit(df_analysis)

print("✓ Clustering completed\n")

# Add cluster predictions
clustered_data = kmeans_model.transform(df_analysis)

# Analyze clusters
cluster_summary = clustered_data.groupBy("cluster").agg(
    count("*").alias("cluster_size"),
    avg("mag").alias("avg_magnitude"),
    avg("depth").alias("avg_depth"),
    avg("latitude").alias("avg_latitude"),
    avg("longitude").alias("avg_longitude")
).orderBy("cluster").toPandas()

print("K-MEANS CLUSTERING RESULTS:")
print(cluster_summary.to_string(index=False))

# Silhouette score
silhouette_evaluator = ClusteringEvaluator(
    featuresCol="features",
    predictionCol="cluster",
    metricName="silhouette"
)

silhouette_score = silhouette_evaluator.evaluate(clustered_data)
print(f"\nSilhouette Score: {silhouette_score:.4f}")
print()


STEP 6: DISTRIBUTED ML - SPATIAL CLUSTERING

Building K-Means Clustering model...

Training K-Means clustering model...
✓ Clustering completed

K-MEANS CLUSTERING RESULTS:
 cluster  cluster_size  avg_magnitude  avg_depth  avg_latitude  avg_longitude
       0         20451       4.739059  39.710161    -33.756448     -87.068854
       1          3637       4.748474 523.856264    -16.149354     -68.278441
       2         38108       4.706466  45.621838      8.925665     134.706370
       3          6956       5.697852  39.154738     -0.771117      51.145934
       4         11665       4.721579  24.962048     31.855849     -42.986448

Silhouette Score: 0.4949



In [151]:
print("STEP 7: CORRELATION ANALYSIS\n")

# Prepare data for correlation
correlation_data = df_analysis.select("mag", "depth", "energy_release").toPandas()

correlation_matrix = correlation_data.corr()

print("Correlation Matrix:")
print(correlation_matrix.to_string())

print("\nKey Correlations:")
print(f"  • Magnitude vs Depth: {correlation_matrix.loc['mag', 'depth']:.4f}")
print(f"  • Magnitude vs Energy: {correlation_matrix.loc['mag', 'energy_release']:.4f}")
print(f"  • Depth vs Energy: {correlation_matrix.loc['depth', 'energy_release']:.4f}")
print()

STEP 7: CORRELATION ANALYSIS

Correlation Matrix:
                     mag     depth  energy_release
mag             1.000000 -0.037267        0.098637
depth          -0.037267  1.000000        0.002857
energy_release  0.098637  0.002857        1.000000

Key Correlations:
  • Magnitude vs Depth: -0.0373
  • Magnitude vs Energy: 0.0986
  • Depth vs Energy: 0.0029



In [152]:
print("STEP 8: HIGH-RISK EVENT ANALYSIS\n")

high_risk_events = df_analysis.filter(col("mag") >= 7.0).select(
    "time_parsed", "mag", "depth", "place", "magnitude_category", "year"
).orderBy(col("mag").desc())

high_risk_count = high_risk_events.count()
high_risk_df = high_risk_events.limit(20).toPandas()

print(f"Total high-risk events (magnitude >= 7.0): {high_risk_count}")
print("\nTop 20 High-Risk Events:")
print(high_risk_df.to_string(index=False))
print()


STEP 8: HIGH-RISK EVENT ANALYSIS

Total high-risk events (magnitude >= 7.0): 153

Top 20 High-Risk Events:
            time_parsed  mag  depth                                                  place magnitude_category  year
2025-07-29 23:24:52.483  8.8  35.00            2025 Kamchatka Peninsula, Russia Earthquake              Major  2025
2015-09-16 22:54:32.860  8.3  22.44                              48 km W of Illapel, Chile              Major  2015
2021-07-29 06:15:49.188  8.2  35.00                        2021 Chignik, Alaska Earthquake              Major  2021
2018-08-19 00:19:40.670  8.2 600.00                                   2018 Fiji Earthquake              Major  2018
2017-09-08 04:49:19.180  8.2  47.39                    2017 Tehuantepec, Mexico Earthquake              Major  2017
2021-08-12 18:35:17.231  8.1  22.79                 2021 South Sandwich Islands Earthquake              Major  2021
2021-03-04 19:28:33.178  8.1  28.93          2021 Kermadec Islands, New Zealand E

In [153]:
print("STEP 9: ANOMALY DETECTION\n")

# Calculate statistical thresholds
mag_mean = df_analysis.select(avg("mag")).collect()[0][0]
mag_std = df_analysis.select(stddev("mag")).collect()[0][0]
mag_threshold = mag_mean + (2 * mag_std)

depth_mean = df_analysis.select(avg("depth")).collect()[0][0]
depth_std = df_analysis.select(stddev("depth")).collect()[0][0]
depth_threshold = depth_mean + (2 * depth_std)

print(f"Magnitude anomaly threshold (mean + 2*std): {mag_threshold:.2f}")
print(f"Depth anomaly threshold (mean + 2*std): {depth_threshold:.2f}")

anomalies = df_analysis.filter(
    (col("mag") > mag_threshold) | (col("depth") > depth_threshold)
).select("time_parsed", "mag", "depth", "place", "magnitude_category")

anomaly_count = anomalies.count()
print(f"\nAnomalous events detected: {anomaly_count}")

if anomaly_count > 0:
    anomaly_df = anomalies.limit(10).toPandas()
    print("\nSample Anomalies:")
    print(anomaly_df.to_string(index=False))
print()



STEP 9: ANOMALY DETECTION

Magnitude anomaly threshold (mean + 2*std): 5.55
Depth anomaly threshold (mean + 2*std): 289.00

Anomalous events detected: 7465

Sample Anomalies:
            time_parsed  mag  depth                           place magnitude_category
2021-12-30 10:14:48.517  4.5 357.63       232 km W of Hihifo, Tonga              Light
2021-12-30 00:19:54.707  5.1 586.42                     Fiji region              Light
2021-12-29 14:00:48.275  4.7 414.84 256 km W of Ozernovskiy, Russia              Light
2021-12-29 13:29:39.840  4.5 544.24      206 km ENE of Levuka, Fiji              Light
2021-12-28 17:34:34.109  4.6 579.88        248 km E of Levuka, Fiji              Light
2021-12-28 02:45:13.670  4.5 642.40        67 km NE of Levuka, Fiji              Light
2021-12-26 23:45:20.017  4.9 584.97                     Fiji region              Light
2021-12-26 18:44:41.465  4.5 555.15        235 km E of Levuka, Fiji              Light
2021-12-25 13:42:41.867  4.6 516.35       

In [154]:
print("STEP 10: PREDICTIVE INSIGHTS & FORECASTING\n")

# Calculate growth rates
yearly_growth = yearly_summary.copy()
yearly_growth['growth_rate'] = yearly_growth['total_events'].pct_change() * 100

print("Year-over-Year Growth Rates:")
print(yearly_growth[['year', 'total_events', 'growth_rate']].to_string(index=False))

# Simple linear forecast for next year
from sklearn.linear_model import LinearRegression

X = yearly_summary['year'].values.reshape(-1, 1)
y = yearly_summary['total_events'].values

lr_model = LinearRegression()
lr_model.fit(X, y)

next_year = 2026
forecast_2026 = lr_model.predict([[next_year]])[0]

print(f"\nSimple Linear Forecast for {next_year}: {int(forecast_2026):,} events")
print(f"Trend coefficient: {lr_model.coef_[0]:.2f} events/year")
print()


STEP 10: PREDICTIVE INSIGHTS & FORECASTING

Year-over-Year Growth Rates:
 year  total_events  growth_rate
 2015          7121          NaN
 2016          7415     4.128634
 2017          6342   -14.470668
 2018          7509    18.401135
 2019          7170    -4.514583
 2020          6478    -9.651325
 2021          8922    37.727694
 2022          7748   -13.158485
 2023          7638    -1.419721
 2024          6385   -16.404818
 2025          8089    26.687549

Simple Linear Forecast for 2026: 7,719 events
Trend coefficient: 62.16 events/year



In [155]:
print("STEP 11: ML MODEL PERFORMANCE SUMMARY\n")

print("DISTRIBUTED ML MODELS COMPARISON")


print("\n1. RANDOM FOREST CLASSIFIER (Magnitude Category)")
print(f"   • Task: Predict earthquake magnitude category")
print(f"   • Algorithm: Random Forest (100 trees)")
print(f"   • Accuracy: {rf_accuracy*100:.2f}%")
print(f"   • F1 Score: {rf_f1:.4f}")
print(f"   • Top Feature: {importance_df.iloc[0]['Feature']} ({importance_df.iloc[0]['Importance']:.4f})")

print("\n2. GRADIENT BOOSTING REGRESSOR (Magnitude Value)")
print(f"   • Task: Predict exact magnitude value")
print(f"   • Algorithm: Gradient Boosted Trees")
print(f"   • RMSE: {gbt_rmse:.4f}")
print(f"   • R² Score: {gbt_r2:.4f}")
print(f"   • Top Feature: {gbt_importance_df.iloc[0]['Feature']} ({gbt_importance_df.iloc[0]['Importance']:.4f})")

print("\n3. K-MEANS CLUSTERING (Spatial Patterns)")
print(f"   • Task: Identify seismic zones")
print(f"   • Algorithm: K-Means (k=5)")
print(f"   • Silhouette Score: {silhouette_score:.4f}")
print(f"   • Clusters Discovered: {len(cluster_summary)}")

print()

STEP 11: ML MODEL PERFORMANCE SUMMARY

DISTRIBUTED ML MODELS COMPARISON

1. RANDOM FOREST CLASSIFIER (Magnitude Category)
   • Task: Predict earthquake magnitude category
   • Algorithm: Random Forest (100 trees)
   • Accuracy: 93.75%
   • F1 Score: 0.9078
   • Top Feature: depth (0.2668)

2. GRADIENT BOOSTING REGRESSOR (Magnitude Value)
   • Task: Predict exact magnitude value
   • Algorithm: Gradient Boosted Trees
   • RMSE: 0.3599
   • R² Score: 0.0557
   • Top Feature: depth (0.2899)

3. K-MEANS CLUSTERING (Spatial Patterns)
   • Task: Identify seismic zones
   • Algorithm: K-Means (k=5)
   • Silhouette Score: 0.4949
   • Clusters Discovered: 5



In [156]:
print("STEP 12: KEY INSIGHTS SUMMARY\n")

total_records = df_analysis.count()
total_years = df_analysis.select(countDistinct("year")).collect()[0][0]
avg_events_per_year = total_records / total_years

most_active_year = yearly_summary.loc[yearly_summary['total_events'].idxmax(), 'year']
most_active_count = yearly_summary['total_events'].max()

strongest_event = df_analysis.orderBy(col("mag").desc()).first()

print("SUMMARY OF KEY FINDINGS")
print(f"\n1. DATASET OVERVIEW")
print(f"   • Total seismic events: {total_records:,}")
print(f"   • Time span: {total_years} years (2015-2025)")
print(f"   • Average events/year: {avg_events_per_year:.0f}")

print(f"\n2. TEMPORAL PATTERNS")
print(f"   • Most active year: {most_active_year} ({int(most_active_count):,} events)")
print(f"   • Peak monthly activity: {monthly_trends['event_count'].max()} events")
print(f"   • Average magnitude: {mag_mean:.2f}")

print(f"\n3. MAGNITUDE DISTRIBUTION")
for _, row in magnitude_distribution.iterrows():
    print(f"   • {row['magnitude_category']}: {row['percentage']:.1f}%")

print(f"\n4. STRONGEST EVENT ON RECORD")
print(f"   • Magnitude: {strongest_event['mag']}")
print(f"   • Date: {strongest_event['time_parsed']}")
print(f"   • Location: {strongest_event['place']}")
print(f"   • Depth: {strongest_event['depth']} km")

print(f"\n5. ML MODEL INSIGHTS")
print(f"   • RF Classifier Accuracy: {rf_accuracy*100:.1f}%")
print(f"   • GBT Regressor R²: {gbt_r2:.3f}")
print(f"   • K-Means Clusters: {len(cluster_summary)} seismic zones")

print(f"\n6. RISK ASSESSMENT")
print(f"   • High-risk events (mag >= 7.0): {high_risk_count}")
print(f"   • Percentage of high-risk: {(high_risk_count/total_records)*100:.2f}%")

print(f"\n7. PREDICTIVE INSIGHTS")
print(f"   • Forecast for 2026: {int(forecast_2026):,} events")
print(f"   • Trend: {'Increasing' if lr_model.coef_[0] > 0 else 'Decreasing'}")

print()


STEP 12: KEY INSIGHTS SUMMARY

SUMMARY OF KEY FINDINGS

1. DATASET OVERVIEW
   • Total seismic events: 80,817
   • Time span: 11 years (2015-2025)
   • Average events/year: 7347

2. TEMPORAL PATTERNS
   • Most active year: 2021 (8,922 events)
   • Peak monthly activity: 1322 events
   • Average magnitude: 4.80

3. MAGNITUDE DISTRIBUTION
   • Light: 93.8%
   • Moderate: 4.4%
   • Strong: 1.6%
   • Major: 0.2%

4. STRONGEST EVENT ON RECORD
   • Magnitude: 8.8
   • Date: 2025-07-29 23:24:52.483000
   • Location: 2025 Kamchatka Peninsula, Russia Earthquake
   • Depth: 35.0 km

5. ML MODEL INSIGHTS
   • RF Classifier Accuracy: 93.8%
   • GBT Regressor R²: 0.056
   • K-Means Clusters: 5 seismic zones

6. RISK ASSESSMENT
   • High-risk events (mag >= 7.0): 153
   • Percentage of high-risk: 0.19%

7. PREDICTIVE INSIGHTS
   • Forecast for 2026: 7,719 events
   • Trend: Increasing



In [157]:
print("STEP 13: GENERATING COMPREHENSIVE VISUALIZATIONS\n")

fig1 = make_subplots(
    rows=2, cols=2,
    subplot_titles=(
        "Random Forest - Feature Importance",
        "GBT Regressor - Feature Importance",
        "K-Means Clusters - Size Distribution",
        "Model Performance Comparison"
    ),
    specs=[
        [{"type": "bar"}, {"type": "bar"}],
        [{"type": "pie"}, {"type": "bar"}]
    ]
)

fig1.add_trace(
    go.Bar(
        x=importance_df['Importance'],
        y=importance_df['Feature'],
        orientation='h',
        marker=dict(color='#FF6B6B'),
        name='RF'
    ),
    row=1, col=1
)

fig1.add_trace(
    go.Bar(
        x=gbt_importance_df['Importance'],
        y=gbt_importance_df['Feature'],
        orientation='h',
        marker=dict(color='#4ECDC4'),
        name='GBT'
    ),
    row=1, col=2
)


fig1.add_trace(
    go.Pie(
        labels=[f"Cluster {i}" for i in cluster_summary['cluster']],
        values=cluster_summary['cluster_size'],
        marker=dict(colors=['#FF6B6B', '#4ECDC4', '#FFD93D', '#95E1D3', '#F38181']),
        textinfo='label+percent'
    ),
    row=2, col=1
)

model_metrics = pd.DataFrame({
    'Model': ['RF Accuracy', 'GBT R²', 'K-Means Silhouette'],
    'Score': [rf_accuracy, gbt_r2, silhouette_score]
})

fig1.add_trace(
    go.Bar(
        x=model_metrics['Model'],
        y=model_metrics['Score'],
        marker=dict(color=['#FF6B6B', '#4ECDC4', '#FFD93D']),
        text=[f"{s:.3f}" for s in model_metrics['Score']],
        textposition='outside',
        name='Score'
    ),
    row=2, col=2
)

fig1.update_xaxes(title_text="Importance", row=1, col=1)
fig1.update_yaxes(title_text="Feature", row=1, col=1)
fig1.update_xaxes(title_text="Importance", row=1, col=2)
fig1.update_yaxes(title_text="Feature", row=1, col=2)
fig1.update_xaxes(title_text="Model", row=2, col=2)
fig1.update_yaxes(title_text="Score", row=2, col=2)

fig1.update_layout(
    height=1000,
    title_text="Distributed ML Model Performance Dashboard",
    title_font_size=22,
    template="plotly_dark",
    showlegend=False
)

fig1.write_html("task4_ml_performance.html")
print("✓ Saved: task4_ml_performance.html")
fig1.show()


fig2 = make_subplots(
    rows=1, cols=2,
    subplot_titles=(
        "GBT Regressor: Predicted vs Actual Magnitude",
        "Random Forest: Confusion Pattern"
    ),
    specs=[[{"type": "scatter"}, {"type": "bar"}]]
)

gbt_sample = gbt_predictions.select("mag", "prediction").sample(False, 0.1).toPandas()

fig2.add_trace(
    go.Scatter(
        x=gbt_sample['mag'],
        y=gbt_sample['prediction'],
        mode='markers',
        marker=dict(size=5, color='#4ECDC4', opacity=0.5),
        name='Predictions'
    ),
    row=1, col=1
)

mag_range = [gbt_sample['mag'].min(), gbt_sample['mag'].max()]
fig2.add_trace(
    go.Scatter(
        x=mag_range,
        y=mag_range,
        mode='lines',
        line=dict(color='#FF6B6B', dash='dash', width=2),
        name='Perfect Prediction'
    ),
    row=1, col=1
)

rf_pred_dist = rf_predictions.groupBy("label", "prediction").count().toPandas()
rf_pred_grouped = rf_pred_dist.groupby('label')['count'].sum().reset_index()

fig2.add_trace(
    go.Bar(
        x=rf_pred_grouped['label'],
        y=rf_pred_grouped['count'],
        marker=dict(color='#FFD93D'),
        name='Predictions'
    ),
    row=1, col=2
)

fig2.update_xaxes(title_text="Actual Magnitude", row=1, col=1)
fig2.update_yaxes(title_text="Predicted Magnitude", row=1, col=1)
fig2.update_xaxes(title_text="Category", row=1, col=2)
fig2.update_yaxes(title_text="Count", row=1, col=2)

fig2.update_layout(
    height=600,
    title_text="ML Model Predictions Analysis",
    title_font_size=20,
    template="plotly_dark"
)

fig2.write_html("task4_ml_predictions.html")
print("✓ Saved: task4_ml_predictions.html")
fig2.show()


cluster_geo_sample = clustered_data.select("latitude", "longitude", "cluster", "mag").sample(False, 0.05).toPandas()

fig3 = px.scatter_geo(
    cluster_geo_sample,
    lat='latitude',
    lon='longitude',
    color='cluster',
    size='mag',
    hover_data=['mag'],
    color_continuous_scale='Viridis',
    title='K-Means Clustering: Geographic Seismic Zones',
    projection='natural earth'
)

fig3.update_layout(
    template='plotly_dark',
    height=700
)

fig3.write_html("task4_cluster_map.html")
print("✓ Saved: task4_cluster_map.html")
fig3.show()


fig4 = make_subplots(
    rows=2, cols=2,
    subplot_titles=(
        "Monthly Event Frequency",
        "Yearly Magnitude Trends",
        "Magnitude Distribution",
        "Forecast for 2026"
    ),
    specs=[
        [{"type": "scatter"}, {"type": "scatter"}],
        [{"type": "histogram"}, {"type": "scatter"}]
    ]
)

fig4.add_trace(
    go.Scatter(
        x=monthly_trends['date'],
        y=monthly_trends['event_count'],
        mode='lines',
        line=dict(color='#FF6B6B', width=2),
        fill='tozeroy',
        name='Events'
    ),
    row=1, col=1
)

fig4.add_trace(
    go.Scatter(
        x=yearly_summary['year'],
        y=yearly_summary['avg_magnitude'],
        mode='lines+markers',
        line=dict(color='#4ECDC4', width=3),
        marker=dict(size=10),
        name='Avg Magnitude'
    ),
    row=1, col=2
)

mag_sample = df_analysis.select("mag").sample(False, 0.1).toPandas()
fig4.add_trace(
    go.Histogram(
        x=mag_sample['mag'],
        nbinsx=50,
        marker=dict(color='#FFD93D'),
        name='Magnitude'
    ),
    row=2, col=1
)

fig4.add_trace(
    go.Scatter(
        x=yearly_summary['year'],
        y=yearly_summary['total_events'],
        mode='lines+markers',
        name='Actual',
        line=dict(color='#4ECDC4', width=3),
        marker=dict(size=10)
    ),
    row=2, col=2
)

fig4.add_trace(
    go.Scatter(
        x=[next_year],
        y=[forecast_2026],
        mode='markers',
        name='Forecast 2026',
        marker=dict(size=20, color='#FF6B6B', symbol='star')
    ),
    row=2, col=2
)

fig4.update_xaxes(title_text="Date", row=1, col=1)
fig4.update_yaxes(title_text="Event Count", row=1, col=1)
fig4.update_xaxes(title_text="Year", row=1, col=2)
fig4.update_yaxes(title_text="Avg Magnitude", row=1, col=2)
fig4.update_xaxes(title_text="Magnitude", row=2, col=1)
fig4.update_yaxes(title_text="Frequency", row=2, col=1)
fig4.update_xaxes(title_text="Year", row=2, col=2)
fig4.update_yaxes(title_text="Event Count", row=2, col=2)

fig4.update_layout(
    height=1000,
    title_text="Comprehensive Temporal Analysis",
    title_font_size=22,
    template="plotly_dark",
    showlegend=True
)

fig4.write_html("task4_temporal_analysis.html")
print("✓ Saved: task4_temporal_analysis.html")
fig4.show()


fig5 = go.Figure(data=go.Heatmap(
    z=correlation_matrix.values,
    x=correlation_matrix.columns,
    y=correlation_matrix.columns,
    colorscale='RdBu',
    zmid=0,
    text=correlation_matrix.values,
    texttemplate='%{text:.3f}',
    textfont={"size": 14},
    colorbar=dict(title="Correlation")
))

fig5.update_layout(
    title="Correlation Matrix: Magnitude, Depth, Energy",
    template="plotly_dark",
    height=600
)

fig5.write_html("task4_correlation_matrix.html")
print("✓ Saved: task4_correlation_matrix.html")
fig5.show()

STEP 13: GENERATING COMPREHENSIVE VISUALIZATIONS

✓ Saved: task4_ml_performance.html


✓ Saved: task4_ml_predictions.html


✓ Saved: task4_cluster_map.html


✓ Saved: task4_temporal_analysis.html


✓ Saved: task4_correlation_matrix.html
