# Spatial Patterns of Extremes

## Imports

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from pyspark.sql.functions import sum as _sum
import matplotlib.pyplot as plt
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

## Stop session if needed

In [None]:
#spark.stop()

## Start Spark session

In [None]:
spark = SparkSession.builder.master('local[*]').appName('Climalyticsat').config('spark.driver.memory', '8g').config('spark.executor.memory', '8g').getOrCreate()

## Load data and write out as Parquet

### Load all station's climate data

In [None]:
climate_data = spark.read.csv('climate_all_stations.csv', header=True, inferSchema=True)

### Extract a “year” column for efficient time‐based pruning

In [None]:
# turn string into a proper date, then pull out the year
climate_data = (
    climate_data
      .withColumn('date',   to_date('date', 'yyyy-MM'))
      .withColumn('year',   year('date'))
)

### Load and join stations metadata

In [None]:
stations = (
    spark.read
         .csv('stations_metadata.csv', header=True, inferSchema=True)
         .withColumnRenamed('id', 'station_id')
         .withColumnRenamed('Höhe [m]', 'elevation')
         .select('station_id', 'elevation')
)

climate = climate_data.join(stations, on='station_id', how='inner')

#### Bucket elevation into (250m) bands

In [None]:
climate = climate.withColumn(
    'elevation_band',
    (floor(climate.elevation / 250) * 250).cast('int')  # 0–249m → 0, 250–499m → 250, etc.
)

### Write out as Parquet, partitioned by year & elevation_band

In [None]:
climate.write.partitionBy('year', 'elevation_band').parquet('extremes_parquet/')

## RQ2 - Spatial Patterns of Extremes
Which geographic zones (valleys, plateaus, alpine corridors) show the largest shifts in “hot days” (≥ 30 °C) and “frost days” (≤ 0 °C) since 1970?

### Load Parquet (qear, elevation_band)

In [None]:
climate = spark.read.parquet('extremes_parquet/')

### Add geographical zones

In [None]:
# needed columns
# tage_tropen,Tropentage,"Zahl der Tropentage,  Tagesmaximum der Lufttemperatur in 2 m Höhe >=30.0°C",d
# tage_frost,Frosttage,"Zahl der Frosttage, 24-Stunden-Minimalwert der Lufttemperatur in 2m Höhe < 0.0 °C",d

climate = climate.withColumn(
    'zone',
    when(col('elevation') <= 700,      'valley')
   .when((col('elevation') > 700) & (col('elevation') <= 1500), 'plateau')
   .otherwise('alpine')
)

### Compute the “shift” since 1970

#### End-minus-start difference

In [None]:
# 1) total per station×year
zone_year_station = (
    climate
      .groupBy('zone','station_id','year')
      .agg(
        _sum('tage_tropen').alias('hot_days'),
        _sum('tage_frost').alias('frost_days')
      )
)

# 2) average per zone×year
zone_year = (
    zone_year_station
      .groupBy('zone','year')
      .agg(
        avg('hot_days' ).alias('hot_days'),
        avg('frost_days').alias('frost_days')
      )
)

In [None]:
first_decade = (zone_year
    .filter(col('year').between(1970,1979))
    .groupBy('zone')
    .agg(avg('hot_days').alias('hot_70s'),
         avg('frost_days').alias('frost_70s'))
)

last_decade = (zone_year
    .filter(col('year').between(2014,2023))
    .groupBy('zone')
    .agg(avg('hot_days').alias('hot_10s'),
         avg('frost_days').alias('frost_10s'))
)

shift = (first_decade
    .join(last_decade, 'zone')
    .withColumn('hot_shift',   col('hot_10s')   - col('hot_70s'))
    .withColumn('frost_shift', col('frost_10s') - col('frost_70s'))
)


In [None]:
# print the per-zone, per-decade averages
first_decade.show(truncate=False)
last_decade.show(truncate=False)

# print the computed shifts
shift.select('zone','hot_70s','hot_10s','hot_shift',
             'frost_70s','frost_10s','frost_shift') \
     .show(truncate=False)


#### Visualize with matplotlib

In [None]:

shift_pd = shift.toPandas()

zones      = shift_pd['zone']
hot_shift  = shift_pd['hot_shift']
frost_shift= shift_pd['frost_shift']
x = range(len(zones))

plt.figure()
plt.bar([i - 0.2 for i in x], hot_shift,   width=0.4, label='Hot Days Shift')
plt.bar([i + 0.2 for i in x], frost_shift, width=0.4, label='Frost Days Shift')
plt.xticks(x, zones)
plt.xlabel('Zone')
plt.ylabel('Change in Average Number of Days')
plt.title('Shift in Hot (≥30 °C) & Frost (≤0 °C) Days Since 1970 by Zone')
plt.legend()
plt.tight_layout()
plt.show()


In [None]:
station_counts = (
    climate
      .select('station_id', 'zone')
      .distinct()
      .groupBy('zone')
      .agg(countDistinct('station_id').alias('num_stations'))
      .orderBy('zone')
)

# 3) Bring to Pandas & plot
station_counts_pd = station_counts.toPandas()

import matplotlib.pyplot as plt

zones  = station_counts_pd['zone']
counts = station_counts_pd['num_stations']
x      = range(len(zones))

plt.figure()
plt.bar(x, counts)
plt.xticks(x, zones)
plt.xlabel('Zone')
plt.ylabel('Number of Stations')
plt.title('Number of Weather Stations per Zone')
plt.tight_layout()
plt.show()

### Build your per‐zone time series (mean hot/frost days per station per year)

In [None]:
zone_year = (
    climate
      # only keep years up to 2023, since for 2024 no or very little data is available
      .filter(col('year') <= 2023)
      .groupBy('zone','station_id','year')
      .agg(
        _sum('tage_tropen').alias('hot_days'),
        _sum('tage_frost').alias('frost_days')
      )
      .groupBy('zone','year')
      .agg(
        avg('hot_days').alias('hot_days'),
        avg('frost_days').alias('frost_days')
      )
      .orderBy('zone','year')
)

#### Plot the full time-series by zone

In [None]:
for metric in ['hot_days','frost_days']:
    plt.figure()
    for zone, grp in zone_year.toPandas().groupby('zone'):
        plt.plot(grp['year'], grp[metric], marker='o', label=zone)
    plt.title(f"Yearly avg. {metric.replace('_',' ').title()} by Zone")
    plt.xlabel('Year')
    plt.ylabel(f"Avg. {metric.replace('_',' ').title()}")
    plt.legend()
    plt.tight_layout()
    plt.show()

### Linear Regression

In [None]:

# Fit one regression per zone
slopes = []
for zone_name in ['valley','plateau','alpine']:
    sdf = zone_year.filter(col('zone') == zone_name)
    
    # assemble the feature vector (just year)
    va = VectorAssembler(inputCols=['year'], outputCol='features')
    sdf = va.transform(sdf)

    # hot‐day trend
    lr_hot = LinearRegression(featuresCol='features',
                              labelCol='hot_days')
    m_hot  = lr_hot.fit(sdf)
    # frost‐day trend
    lr_frost = LinearRegression(featuresCol='features',
                                labelCol='frost_days')
    m_frost  = lr_frost.fit(sdf)

    # extract and cast to plain Python floats
    slopes.append((
      zone_name,
      float(m_hot.coefficients[0]),
      float(m_frost.coefficients[0])
    ))

    s_hot = m_hot.summary
    print(f"{zone_name} — hot days:")
    print(f"  slope = {m_hot.coefficients[0]:.3f} days/yr, R² = {s_hot.r2:.3f}, p = {s_hot.pValues[1]:.3e}")

    s_f = m_frost.summary
    print(f"{zone_name} — frost days:")
    print(f"  slope = {m_frost.coefficients[0]:.3f} days/yr, R² = {s_f.r2:.3f}, p = {s_f.pValues[1]:.3e}")
    print()


slopes_df = spark.createDataFrame(
    slopes,
    schema=['zone','slope_hot','slope_frost']
)


In [None]:
# 4) Plot with matplotlib
pd = slopes_df.toPandas()
x = range(len(pd))
plt.figure()
plt.bar([i-0.2 for i in x], pd['slope_hot'],   width=0.4, label='Hot days / yr')
plt.bar([i+0.2 for i in x], pd['slope_frost'], width=0.4, label='Frost days / yr')
plt.xticks(x, pd['zone'])
plt.ylabel('Days per Year Change')
plt.title('Linear Trend in Hot & Frost Days by Zone (1970–2023)')
plt.legend()
plt.tight_layout()
plt.show()