In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j boto3 awswrangler
!pip install pyngrok
!pip install streamlit
import os

from pyngrok import ngrok
import sys

import findspark
findspark.init()
findspark.find()

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql.functions import col, mean, lit, when,month,sum
from pyspark.ml.feature import VectorAssembler, StandardScaler,PCA
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator

import awswrangler as wr
import boto3

spark= SparkSession \
       .builder \
       .appName("Weather_anomaly_detection") \
       .getOrCreate()

spark

# 2. Reading and processing historical meteorological data

Data reading will be done from the S3 bucket where all the CSV files containing the historical data for each city are located.

In [None]:
# Define the list of cities
CITIES=['Bucharest','London','Paris','Roma']
import requests

In [None]:
import ipywidgets as widgets
from IPython.display import display

# Create a dropdown mechanism for choosing a single city from the list
city_dropdown = widgets.Dropdown(
    options=CITIES,
    value=CITIES[0],
    description='Please select one city:',
)
selected_city = city_dropdown.value     

# Function to update the global variable when the value in the dropdown changes
def on_city_change(change):
    global selected_city
    selected_city = change['new']
    print(f"Selected city is: {selected_city}")

city_dropdown.observe(on_city_change, names='value')    
display(city_dropdown)    

def get_selected_city():   
    return selected_city

In [None]:
def get_lat_lon(city_name: str):
    url = f"http://api.openweathermap.org/geo/1.0/direct?q={city_name.title()}&limit=5&appid=e7861e61e40771ccad4480ae27791bab"

    response = requests.request("GET", url)

    data = response.json()
    return {"lat":data[0]["lat"],"lon":data[0]["lon"]}

In [None]:
import json
with open(f"latitude_longitude_{selected_city}.json","w") as f:
  json.dump(get_lat_lon(selected_city),f,indent=4)

In [None]:
# Access the archive weather data file for the selected city and download it locally

AWS_ACCESS_KEY=""
AWS_SECRET_KEY=""
S3_BUCKET = ""
AWS_REGION = ""

fp=f"s3://{S3_BUCKET}/{selected_city}_10_years.csv"
aws_session=boto3.Session(aws_access_key_id=AWS_ACCESS_KEY,aws_secret_access_key=AWS_SECRET_KEY,region_name=AWS_REGION)
wr.s3.download(fp,f"historical_data_read_{selected_city}.csv",boto3_session=aws_session)

In [None]:
# Spark transform to replace periods (.) in column names with underscores (_)

df = spark.read.csv(f"historical_data_read_{selected_city}.csv",inferSchema=True,header=True)
for col_name in df.columns:
    new_col_name = col_name.replace('.', '_')
    df = df.withColumnRenamed(col_name, new_col_name)

df = df.drop('lat', 'lon', 'tz')

df.show()

In [None]:
# Identifying columns with missing values

null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

print("Number of null values:")
null_counts.show()

In [None]:
# Define a new column 'season', indicating the season associated with the date in the date column

spring_condition = (month(df['date']).between(3, 5))
summer_condition = (month(df['date']).between(6, 8))
autumn_condition = (month(df['date']).between(9, 11))
winter_condition = (month(df['date']).isin([12, 1, 2]))

df = df.withColumn('season',
                   when(spring_condition, 'Spring')
                   .when(summer_condition, 'Summer')
                   .when(autumn_condition, 'Autumn')
                   .when(winter_condition, 'Winter'))

df.show(10)

In [None]:
# Temperature conversion from Kelvin to Celsius

temperature_columns = [
    'temperature_min', 'temperature_max', 'temperature_afternoon',
    'temperature_night', 'temperature_evening', 'temperature_morning'
]

for col_name in temperature_columns:
    df = df.withColumn(col_name, col(col_name) - 273.15)

df.show()

In [None]:
# Defining a function to calculate the average temperature for each season

def calculate_average_temperature(season):
    season_data = df.filter(col('season') == season)
    avg_temp = season_data.select(mean(col('temperature_afternoon')).alias('avg_temp')).collect()[0]['avg_temp']
    return avg_temp

# average temp for each season
spring_avg_temp = calculate_average_temperature('Spring')
summer_avg_temp = calculate_average_temperature('Summer')
autumn_avg_temp = calculate_average_temperature('Autumn')
winter_avg_temp = calculate_average_temperature('Winter')
df = df.withColumn(
    "season_avg_temp",
    when(col('season') == 'Spring', lit(spring_avg_temp))
    .when(col('season') == 'Summer', lit(summer_avg_temp))
    .when(col('season') == 'Autumn', lit(autumn_avg_temp))
    .when(col('season') == 'Winter', lit(winter_avg_temp))
    .otherwise(None)
)

df.show()

# 3. Creating a static anomaly detection model using the threshold method

In [None]:
from pyspark.sql import functions as F

# Define a dictionary of thresholds for each type of anomaly
threshold = {
    'temperature_frost': 0,                  # Celsius
    'temperature_heatwave': 35,              # Celsius
    'precipitation_floods': 40,              # Millimeters/day
    'wind_storm': 20,                        # Meters/s
    'humidity_increased': 95,                # Percentage
    'atmospheric_pressure': (900, 1100)      # Hectopascali
}

# Define anomalies together with their conditions
anomaly_types = {
    'anomaly_frost': (F.col('temperature_min') < threshold['temperature_frost']),
    'anomaly_heatwave': (F.col('temperature_max') > threshold['temperature_heatwave']),
    'anomaly_floods': (F.col('precipitation_total') > threshold['precipitation_floods']) ,
    'anomaly_storm': (F.col('wind_max_speed') > threshold['wind_storm']),
    'anomaly_humidity_increased': (F.col('humidity_afternoon') > threshold['humidity_increased']),
    'anomaly_atmospheric_pressure': (F.col('pressure_afternoon') < threshold['atmospheric_pressure'][0]) |
                                     (F.col('pressure_afternoon') > threshold['atmospheric_pressure'][1])
}

# Column initialization for presence and type of anomalies
df = df.withColumn('presence_anomaly', F.lit(0).cast('int'))
df = df.withColumn('type_anomaly', F.lit(None).cast('string'))

# Update values for columns indicating the presence and type of anomalies in the DataFrame
for anomaly, condition in anomaly_types.items():
    df = df.withColumn('type_anomaly', F.when(condition, anomaly).otherwise(F.col('type_anomaly')))
    df = df.withColumn('presence_anomaly', F.when(condition, 1).otherwise(F.col('presence_anomaly')))

# Creating a dictionary of correspondences between anomaly type and numeric identifier
mapping_anomalies = {anomaly: idx + 1 for idx, anomaly in enumerate(anomaly_types.keys())}

df = df.na.fill('does_not_exist')
mapping_anomalies['does_not_exist'] = 0

# Convert the descriptive text of the anomaly to the corresponding numeric identifier within the column type_anomaly
for type_anomaly, mapped_value in mapping_anomalies.items():
    df = df.withColumn('type_anomaly', F.when(df['type_anomaly'] == type_anomaly, mapped_value).otherwise(df['type_anomaly']))

df = df.withColumn('type_anomaly', df['type_anomaly'].cast('int'))
df.show()

In [None]:
# Count and display the frequency of each type of anomaly detected

label_counts = df.groupBy('type_anomaly').count()
label_counts_renamed = label_counts.withColumnRenamed('count', 'frequency_appearance')

label_counts_renamed.show()

In [None]:
# Saving the anomaly mapping dictionary to a JSON file

import json
with open('mapping_anomalies.json',"w") as f:
  json.dump({float(v):k for k,v in mapping_anomalies.items()},f,indent=4)

# 4. Training Logic Regression machine learning models

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA, StringIndexer
from pyspark.sql.functions import col

# Defining columns of meteorological characteristics
X_cols = ['temperature_afternoon', 'temperature_min', 'temperature_max',
          'precipitation_total', 'wind_max_direction',
          'wind_max_speed', 'pressure_afternoon',
          'season_avg_temp']

# Combining feature columns into a single vector column
assembler = VectorAssembler(inputCols=X_cols, outputCol="features")

# Feature scaling
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)

# Applying Principal Component Analysis (PCA)
num_principal_components = 4  
pca = PCA(k=num_principal_components, inputCol="scaled_features", outputCol="pca_features")

# Defining the logical regression model for classifying the presence of anomalies (binary problem)
lr_anomaly_presence = LogisticRegression(labelCol="presence_anomaly", featuresCol="pca_features")

# Defining the logical regression model for anomaly type classification (multiclass problem)
lr_anomaly_type = LogisticRegression(labelCol="type_anomaly", featuresCol="pca_features", maxIter=10, family="multinomial")

# Defining pipelines
pipeline_anomaly_presence_lr = Pipeline(stages=[assembler, scaler, pca, lr_anomaly_presence])
pipeline_anomaly_type_lr = Pipeline(stages=[assembler, scaler, pca, lr_anomaly_type])

train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Training on training data of the 2 models for anomaly presence and type
model_anomaly_presence_lr = pipeline_anomaly_presence_lr.fit(train_data)
model_anomaly_type_lr = pipeline_anomaly_type_lr.fit(train_data)

# Apply previously trained models on the test set to make predictions
predictions_anomaly_presence = model_anomaly_presence_lr.transform(test_data)
predictions_anomaly_type = model_anomaly_type_lr.transform(test_data)

# Performance evaluation of the 2 logical regression models
evaluator_anomaly_presence = BinaryClassificationEvaluator(labelCol="presence_anomaly", rawPredictionCol="rawPrediction")
accuracy_anomaly_presence = evaluator_anomaly_presence.evaluate(predictions_anomaly_presence)
print("Accuracy of the anomaly presence prediction model = ", accuracy_anomaly_presence)

evaluator_anomaly_type = MulticlassClassificationEvaluator(labelCol="type_anomaly", predictionCol="prediction", metricName="accuracy")
accuracy_anomaly_type = evaluator_anomaly_type.evaluate(predictions_anomaly_type)
print("Accuracy of the anomaly type prediction model = ", accuracy_anomaly_type)

# Combining predictions and values from the static threshold method in a single DataFrame for easier interpretation
predictions_combined = predictions_anomaly_presence.select(
    col("features"),
    col("scaled_features"),
    col("pca_features"),
    col("presence_anomaly").alias("presence_anomaly_static"),    
    col("prediction").alias("presence_anomaly_predicted")          
).join(
    predictions_anomaly_type.select(
        col("features"),
        col("scaled_features"),
        col("pca_features"),
        col("type_anomaly").alias("type_anomaly_static"),     
        col("type_anomaly"),
        col("prediction").alias("type_anomaly_predicted")   
    ),
    on=["features", "scaled_features", "pca_features"]
)

predictions_combined.show()


# 5. Training Random Forest machine learning models

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA, StringIndexer
from pyspark.sql.functions import col

# Defining columns of meteorological characteristics
X_cols = ['temperature_afternoon', 'temperature_min', 'temperature_max',
          'precipitation_total', 'wind_max_direction',
          'wind_max_speed', 'pressure_afternoon',
          'season_avg_temp']

assembler = VectorAssembler(inputCols=X_cols, outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)

num_principal_components = 4  
pca = PCA(k=num_principal_components, inputCol="scaled_features", outputCol="pca_features")

# Definition of the Random Forest model to classify the presence of anomalies (binary problem)
rf_anomaly_presence = RandomForestClassifier(labelCol="presence_anomaly", featuresCol="pca_features")

# Definition of the Random Forest model for anomaly type classification (multiclass problem)
rf_anomaly_type = RandomForestClassifier(labelCol="type_anomaly", featuresCol="pca_features")

pipeline_anomaly_presence_rf = Pipeline(stages=[assembler, scaler, pca, rf_anomaly_presence])
pipeline_anomaly_type_rf = Pipeline(stages=[assembler, scaler, pca, rf_anomaly_type])

train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

model_anomaly_presence_rf = pipeline_anomaly_presence_rf.fit(train_data)
model_anomaly_type_rf = pipeline_anomaly_type_rf.fit(train_data)

predictions_anomaly_presence = model_anomaly_presence_rf.transform(test_data)
predictions_anomaly_type = model_anomaly_type_rf.transform(test_data)

evaluator_anomaly_presence = BinaryClassificationEvaluator(labelCol="presence_anomaly", rawPredictionCol="rawPrediction")
accuracy_anomaly_presence = evaluator_anomaly_presence.evaluate(predictions_anomaly_presence)
print("Accuracy of the anomaly presence prediction model = ", accuracy_anomaly_presence)

evaluator_anomaly_type = MulticlassClassificationEvaluator(labelCol="type_anomaly", predictionCol="prediction", metricName="accuracy")
accuracy_anomaly_type = evaluator_anomaly_type.evaluate(predictions_anomaly_type)
print("Accuracy of the anomaly type prediction model = ", accuracy_anomaly_type)

predictions_combined = predictions_anomaly_presence.select(
    col("features"),
    col("scaled_features"),
    col("pca_features"),
    col("presence_anomaly").alias("presence_anomaly_static"),    
    col("prediction").alias("presence_anomaly_predicted")           
).join(
    predictions_anomaly_type.select(
        col("features"),
        col("scaled_features"),
        col("pca_features"),
        col("type_anomaly").alias("type_anomaly_static"),     
        col("type_anomaly"),
        col("prediction").alias("type_anomaly_predicted")      
    ),
    on=["features", "scaled_features", "pca_features"]
)

predictions_combined.show()


In [None]:
# Saving trained models with better performance (logic regression for example)

model_path="model_presence_anomaly"
model_anomaly_presence_lr.write().overwrite().save(model_path)

model_path="model_type_anomaly"
model_anomaly_type_lr.write().overwrite().save(model_path)

# 6. Python app implementation using Streamlit interface

In [None]:
%%writefile app.py

import json
import streamlit as st
import pandas as pd
from pyspark.sql.functions import col, mean, lit, when,month
from pyspark.ml import PipelineModel
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import Row
import findspark
findspark.init()
findspark.find()
import pandas as pd
import numpy as np
from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Spark Session") \
       .getOrCreate()


with open('mapping_anomalies.json','r') as f:
  mapping_anomalies_data=json.load(f)
  mapping_anomalies_data={float(k):v for k,v in mapping_anomalies_data.items()}

import smtplib
import requests
import datetime
import re
import smtplib
import ssl
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import os

password = ""
sender_email="georgiana.iordache300@gmail.com"
city_name="Roma"

def generate_html_content():
    # Start of the HTML content
    html_content = """
    <html>
    <body>
        <h2>Result of the weather anomalies analysis</h2>
        <p>Download the attached file to see the forecast for the next few days.</p>
        <p>Thank you for choosing to use our app!</p>
  </body>
    </html>

    """

    return html_content


# Define the function to send mails to application users
def send_email(sender_email,recipient_email,password,content,subject=f"Weather anomaly detection for {city_name}",prediction_path=f"analysis_results_{city_name}.csv"):

    sender_email = sender_email
    recipient_email = recipient_email
    subject = subject

    msg = MIMEMultipart()

    msg['From'] = sender_email
    msg['To'] = recipient_email
    msg['Subject'] = subject

    smtp_server = "smtp.gmail.com"   
    port=465   

    part1 = MIMEText(content, 'html')
    msg.attach(part1)

    attachment = open(prediction_path, "rb")  
    part2 = MIMEBase("application", "octet-stream")   
    part2.set_payload((attachment).read())
    encoders.encode_base64(part2)  
    part2.add_header(
        "Content-Disposition",
        f"attachment; filename= {os.path.basename(prediction_path)}",
    )

    msg.attach(part2)

    try:
        context = ssl.create_default_context()  
        with smtplib.SMTP_SSL(smtp_server, port, context=context) as server:   
            server.login(sender_email, password)    
            server.sendmail(sender_email, recipient_email, msg.as_string())
    except Exception as e:
        print(f"Error in submission process: {e}")

def read_lat_lon(city_name):
  with open(f"latitude_longitude_{city_name}.json","r") as f:
    data=json.load(f)
  return data

data = read_lat_lon(city_name)
api_key = ""

def get_weather_data(date):
    lat = data["lat"]
    lon = data["lon"]
    openweathermap_base_url = f"https://api.openweathermap.org/data/3.0/onecall/day_summary?lat={lat}&lon={lon}&units=metric&date={date}&appid={api_key}"
    response = requests.get(openweathermap_base_url)
    print(response.content)
    if response.status_code == 200:
        return response.json()
    else:
        return None

def generate_date_range(day):
    today = datetime.datetime.now()
    date_range = [
        (today + datetime.timedelta(days=i))
        .replace(hour=12, minute=0, second=0)
        .strftime("%Y-%m-%d")
        for i in range(day)
    ]
    return date_range

def calculate_avg_temp(df, season):
    season_data = df[df['season'] == season]
    avg_temp = season_data['temperature_afternoon'].mean()
    return avg_temp

def preprocess_data(df):

    df.columns = [col.replace('.', '_') for col in df.columns]
    df = df.drop(columns=['tz', 'lat', 'lon'])

    df['date'] = pd.to_datetime(df['date'])  
    df['month'] = df['date'].dt.month  

    conditions = [
        df['month'].between(3, 5),
        df['month'].between(6, 8),
        df['month'].between(9, 11),
        df['month'].isin([12, 1, 2])
    ]
    choices = ['Spring', 'Summer', 'Autumn', 'Winter']
    df['season'] = np.select(conditions, choices, default=None)

    spring_avg_temp = calculate_avg_temp(df, 'Spring')
    summer_avg_temp      = calculate_avg_temp(df, 'Summer')
    autumn_avg_temp    = calculate_avg_temp(df, 'Autumn')
    winter_avg_temp     = calculate_avg_temp(df, 'Winter')

    avg_temp_map = {
        'Spring': spring_avg_temp,
        'Summer': summer_avg_temp,
        'Autumn': autumn_avg_temp,
        'Winter': winter_avg_temp
    }
    df['season_avg_temp'] = df['season'].map(avg_temp_map)

    X_cols = [
        'temperature_afternoon', 'temperature_min', 'temperature_max',
        'precipitation_total', 'wind_max_direction',
        'wind_max_speed', 'pressure_afternoon',
        'season_avg_temp'
    ]
    X_future = df[X_cols]

    return X_future

def fetch_weather_data_with_progress(days):
    date_range = generate_date_range(day=days)  
    total_dates = len(date_range)
    progress_bar = st.progress(0)   

    future_data = []     
    for idx, dat in enumerate(date_range):
        future_data.append(get_weather_data(date=dat))  
        progress_bar.progress((idx + 1) / total_dates)  

    return future_data

st.title(f"{city_name} - Weather Alert")
model_anomaly_presence = PipelineModel.load("model_presence_anomaly")
model_anomaly_type = PipelineModel.load("model_type_anomaly")

# Streamlit form to collect user's personal data
with st.form(key="form"):
    days = st.slider(label="Please introduce the number of days for which you want the weather analysis", min_value=7, max_value=90)
    first_name = st.text_input(label="First name")
    last_name = st.text_input(label="Last name")
    email_address = st.text_input(label="Email address")
    submit = st.form_submit_button(label="Submit")

    if submit:
            future_data = fetch_weather_data_with_progress(days=days)
            df = pd.json_normalize(future_data)

            # Redenumire coloane
            df.rename(columns={
              'date': 'date',
              'units': 'measurement_unit',
              'cloud_cover.afternoon': 'cloud_cover_afternoon',
              'humidity.afternoon': 'humidity_afternoon',
              'precipitation.total': 'precipitation_total',
              'temperature.min': 'temperature_min',
              'temperature.max': 'temperature_max',
              'temperature.afternoon': 'temperature_afternoon',
              'temperature.night': 'temperature_night',
              'temperature.evening': 'temperature_evening',
              'temperature.morning': 'temperature_morning',
              'pressure.afternoon': 'atm_pressure_afternoon',
              'wind.max.speed': 'wind_max_speed',
              'wind.max.direction': 'wind_max_direction'
          }, inplace=True)

            st.dataframe(df)               
            dates_list = df['date']
            df = preprocess_data(df)        

            df.to_csv("future_data.csv", index=False) 

            sparkdf = spark.read.csv("future_data.csv", inferSchema=True, header=True)

            predictions_presence = model_anomaly_presence.transform(sparkdf)
            predictions_type = model_anomaly_type.transform(sparkdf)

            predictions_pd = predictions_presence.select("prediction").toPandas()
            predictions_pd['prediction'] = predictions_pd['prediction'].map({0: "Normal", 1: "Anomaly"})

            anomaly_type_predictions = predictions_type.select("prediction").toPandas()

            predictions_pd.columns = ['anomaly_presence']  
            anomaly_type_predictions.columns = ['anomaly_type']  

            combined_df = pd.concat([dates_list, predictions_pd['anomaly_presence'], anomaly_type_predictions['anomaly_type']], axis=1)
            if 'anomaly_presence' in combined_df.columns:
                combined_df = combined_df.drop(columns=['anomaly_presence'])

            combined_df.columns = ['date', 'type_anomaly']

            combined_df['type_anomaly']=combined_df['type_anomaly'].map(mapping_anomalies_data)
            combined_df['type_anomaly']=combined_df['type_anomaly'].replace('does_not_exist',None)

            combined_df.to_csv(f"analysis_results_{city_name}.csv", index=False)
            content = generate_html_content()

            send_email(sender_email, email_address, password, content)

            st.success("The results have been sent. Please check your e-mail address.")

In [None]:
!ngrok config add-authtoken 2hmGuq3qcffMbDDHnfPWKyB2XFe_AeBmF2fegTgKwk2iJDmr

In [None]:
port = "8501"
public_url = ngrok.connect(port).public_url
print(public_url)

In [None]:
!streamlit run app.py & npx localtunnel --port 8501