# United States Weather Forecast with Spark

Projeto de mestrado de Ciência de dados do ISCTE - Instituto Universitário de Lisboa para a cadeira de Processamento, Modelação de Big Data.

## Authors

- João Silva
- Nuno Rodrigues
- Tiago Alves

## Datasets 

US weather history : https://www.kaggle.com/datasets/nachiketkamod/weather-dataset-us
US Noa Stations : assets/ghcnd-stations.txt

# Transformação/Modelação de dados

## Passo 1

Neste primeiro passo vamos converter o ficheiro que lista todas as estações meteorológicas e respetivos nomes das suas localizações.

Para isto vamos então usar o ghcnd-stations.txt  e converter para um .csv válido que possamos usar para cruzamento dos ids das estações listadas no dataset principal e as suas localizações.

Para isto vamos usar a biblioteca "csv" do python e declarar as variáveis para os caminhos do ficheiro de entrada e de saída.

In [ ]:
import csv

# Nome do arquivo de entrada e saída
input_file = './assets/ghcnd-stations.txt'
output_file = './assets/ghcnd_stations.csv'

Vamos agora correr a lógica responsável por abrir o ficheiro de saída, definir os cabeçalhos, abrir o ficheiro de entrada para então mapear linha a linha de forma a finalmente escrever o ficheiro final.

In [ ]:
# Abrir arquivo de saída
with open(output_file, mode='w', newline='') as csv_file:
    csv_writer = csv.writer(csv_file)

    # Escrever cabeçalho
    header = ['station_id', 'latitude', 'longitude', 'elevation', 'state', 'name', 'gsn_flag', 'hcn_crn_flag', 'wmo_id']
    csv_writer.writerow(header)

    # Ler arquivo de entrada e processar linha por linha
    with open(input_file, mode='r') as txt_file:
        for line in txt_file:
            # Extrair dados baseado nas posições fixas das colunas
            station_id = line[0:11].strip()
            latitude = line[12:20].strip()
            longitude = line[21:30].strip()
            elevation = line[31:37].strip()
            state = line[38:40].strip()
            name = line[41:71].strip()
            gsn_flag = line[72:75].strip()
            hcn_crn_flag = line[76:79].strip()
            wmo_id = line[80:85].strip()

            # Escrever linha no arquivo CSV
            csv_writer.writerow([station_id, latitude, longitude, elevation, state, name, gsn_flag, hcn_crn_flag, wmo_id])

In [ ]:
print(f"Arquivo CSV '{output_file}' criado com sucesso.")

## Passo 2 (Opcional)

Este passo é apenas necessário caso já exista uma versão de spark associada a outra versão de python diferente da qual se encontra associada a este projeto. Caso se encontre nesta situação deve então definir as variáveis de ambiente que apontaram para a versão correta.

Aqui fica o exemplo:

In [ ]:
import os

# Definir a mesma versão do Python para driver e worker
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3.11'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/local/bin/python3.11'

## Passo 3

Vamos então agora iniciar a sessão de spark e importar as bibliotecas necessárias para os passos seguintes.

In [ ]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, floor

spark = SparkSession.builder \
    .appName("Weather Dataset with Location") \
    .getOrCreate()

## Passo 4

Vamos ler o dataset alvo deste projeto. 

In [ ]:
file_path = "./assets/sample_dataset.csv"
weather_df = spark.read.csv(file_path, header=True, inferSchema=True)

## Passo 5

Vamos ler o dataset que lista as estações e as suas localizações préviamente convertido de txt para csv como referido nos passos em cima.

In [ ]:
stations_file_path = "./assets/ghcnd_stations.csv"
stations_df = spark.read.csv(stations_file_path, header=True, inferSchema=True)

## Passo 6

De forma a evitar conflito entre as colunas de ambos os datasets que iremos cruzar, vamos então renomear algumas das colunas do dataset das estações cujo nome se encontra igual ao dataset principal.

In [ ]:
stations_df = stations_df.withColumnRenamed("elevation", "station_elevation")  \
                        .withColumnRenamed("latitude", "station_latitude") \
                        .withColumnRenamed("name", "Location") \
                        .withColumnRenamed("longitude", "station_longitude")

## Passo 7

Neste passo vamos combinar ambos os datasets de forma a finar com apenas uma única fonte de verdade para as tranformações seguintes.

In [ ]:
weather_df_with_location = weather_df.join(stations_df, weather_df.ID == stations_df.station_id, how='left')

## Passo 8

Como primeira transformação iremos converter as colunas referentes às temperaturas máximas e minimas para graus Celcius. Tendo em conta que os dados originais parecem encontrar-se multiplicados por 10, iremos inverter o racional dividindo por 10. 

Vamos também lidar com alguns possiveis outliers recorrendo ao método IQR e Z-Score.

Referência: https://www.machinelearningplus.com/pyspark/pyspark-outlier-detection-and-treatment/

Aqui iremos também aproveitar para renomear as colunas "TMAX" e "TMIN" para "temp_max" e "temp_min".

In [ ]:
weather_df_with_location = weather_df_with_location \
    .withColumn("temp_max", col("TMAX") / 10) \
    .withColumn("temp_min", col("TMIN") / 10)

Função IQR.

In [ ]:
def calcular_limites_iqr(df, coluna):
    quantiles = df.approxQuantile(coluna, [0.25, 0.75], 0.0)
    q1, q3 = quantiles
    iqr = q3 - q1
    limite_inferior = q1 - 1.5 * iqr
    limite_superior = q3 + 1.5 * iqr
    return limite_inferior, limite_superior

temp_max_inferior, temp_max_superior = calcular_limites_iqr(weather_df_with_location, "temp_max")
temp_min_inferior, temp_min_superior = calcular_limites_iqr(weather_df_with_location, "temp_min")

Filtrar entradas com base nos resultados.

In [ ]:
weather_df_with_location = weather_df_with_location.filter(
    (col("temp_max") >= temp_max_inferior) & (col("temp_max") <= temp_max_superior) &
    (col("temp_min") >= temp_min_inferior) & (col("temp_min") <= temp_min_superior)
)

Função z-score.

In [ ]:
# Método Z-score para detectar outliers
def calcular_limites_z_score(df, coluna):
    stats = df.select(_mean(col(coluna)).alias('mean'), _stddev(col(coluna)).alias('stddev')).collect()
    mean = stats[0]['mean']
    stddev = stats[0]['stddev']
    limite_inferior = mean - 3 * stddev
    limite_superior = mean + 3 * stddev
    return limite_inferior, limite_superior

temp_max_inferior, temp_max_superior = calcular_limites_z_score(weather_df_with_location, "temp_max")
temp_min_inferior, temp_min_superior = calcular_limites_z_score(weather_df_with_location, "temp_min")

Filtrar entradas com base nos resultados.

In [ ]:
weather_df_with_location = weather_df_with_location.filter(
    (col("temp_max") >= temp_max_inferior) & (col("temp_max") <= temp_max_superior) &
    (col("temp_min") >= temp_min_inferior) & (col("temp_min") <= temp_min_superior)
)

## Passo 9

Iremos agora arredondar os valores da coluna "Elevation" de forma inteiros mais legiveis.

In [ ]:
weather_df_with_location = weather_df_with_location.withColumn("Elevation", floor(round(col("Elevation"))))

## Passo 10

Vamos renomear as colunas EVAP e PRCP para "Evaporation" e "Precipitation".

In [ ]:
weather_df_with_location = weather_df_with_location.withColumnRenamed("EVAP", "Evaporation") \
                                                   .withColumnRenamed("PRCP", "Precipitation")

## Passo 11

Vamos filtrar os nossos dados por todas as entradas que não tenham as colunas "Precipitation" ou "temp_max" ou "temp_min" que não sejam nulas, ou seja, basta uma delas ter um valor nulo para ser descartada da nossa análise final.

In [ ]:
weather_df_with_location = weather_df_with_location.filter(
    col("Precipitation").isNotNull() &
    col("temp_max").isNotNull() &
    col("temp_min").isNotNull()
)

## Passo 12

Neste passo vamos definir quais as colunas que queremos considerar para o nosso dataset final.

In [ ]:
weather_df_with_location = weather_df_with_location.select(
    "ID", "DATE", "temp_max", "temp_min", "Precipitation",
    "Latitude", "Longitude", "Elevation", "Location"
)

## Passo 13

Vamos então agora escrever o ficheiro final.

In [ ]:
output_path = "./assets/sample_weather_dataset_with_location.csv"
weather_df_with_location.coalesce(1).write.csv(output_path, header=True, mode='overwrite')

## Passo final

Como passo final iremos imprimir, meramente de forma informativa, que todo o processo de transformação foi concluído com sucesso e gravado na pasta pré-definida, e dar a nossa sessão spark como encerrada. 

In [ ]:
print(f"Dataset final salvo em {output_path}")

# Encerrar a Spark Session
spark.stop()

_____________________________

# Random Forest

_____________________________

In [ ]:
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
from pyspark import StorageLevel

In [ ]:
spark = SparkSession.builder \
    .appName("Weather Dataset with Random Forest") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [ ]:
file_path = "./assets/sample_dataset.csv"
weather_df = spark.read.csv(file_path, header=True, inferSchema=True)

In [ ]:
# Verificar as colunas
weather_df.printSchema()

In [ ]:
# Persistir em disco
weather_df.persist(StorageLevel.DISK_ONLY)

In [ ]:
# Preparar os dados para o modelo
feature_columns = ["Latitude", "Longitude", "Elevation"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [ ]:
# Transformar o DataFrame
weather_df = assembler.transform(weather_df)

In [ ]:
# Dividir o conjunto de dados em treino e teste
train_df, test_df = weather_df.randomSplit([0.8, 0.2], seed=42)

# Treinar e avaliar o modelo

In [ ]:

def treinar_e_avaliar_random_forest(label_col, train_df, test_df):
    # Treinar o modelo de Random Forest
    rf = RandomForestRegressor(featuresCol="features", labelCol=label_col, numTrees=100)
    rf_model = rf.fit(train_df)

    # Fazer previsões no conjunto de teste
    predictions = rf_model.transform(test_df)

    # Avaliar o modelo
    evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Squared Error (RMSE) no conjunto de teste para {label_col}: {rmse}")

    # Converter para Pandas para visualização
    predictions_pd = predictions.select("prediction", label_col).toPandas()

    # Gerar gráfico de dispersão
    plt.figure(figsize=(10, 6))
    sns.scatterplot(x=label_col, y="prediction", data=predictions_pd)
    plt.xlabel(f"Actual {label_col}")
    plt.ylabel(f"Predicted {label_col}")
    plt.title(f"Actual vs Predicted {label_col}")
    plt.show()

    return rf_model, predictions

# Iniciar Treino

In [ ]:
# temp_max
rf_model_temp_max, predictions_temp_max = treinar_e_avaliar_random_forest("temp_max", train_df, test_df)

# temp_min
rf_model_temp_min, predictions_temp_min = treinar_e_avaliar_random_forest("temp_min", train_df, test_df)

# Precipitation
rf_model_precipitation, predictions_precipitation = treinar_e_avaliar_random_forest("Precipitation", train_df, test_df)

# Liberar recursos de memória
weather_df.unpersist()

# Encerrar a Spark Session
spark.stop()