## Data Preprocessing

In [1]:
# Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col, to_date,mean, when

In [2]:
# Initialize SparkSession
spark = SparkSession.builder.appName("Weather_Data").getOrCreate()

In [3]:
# Load weather data 
weather_data = spark.read.csv("data/SriLanka_Weather_Dataset.csv", header=True)

In [4]:
weather_data.count()

147480

#### Handle Missing Values

In [5]:
# Explore missing values in each column
for column in weather_data.columns:
    missing_count =  weather_data.filter(col(column).isNull()).count()
    print(f"Missing values in column '{column}': {missing_count}")

Missing values in column 'time': 0
Missing values in column 'weathercode': 0
Missing values in column 'temperature_2m_max': 0
Missing values in column 'temperature_2m_min': 0
Missing values in column 'temperature_2m_mean': 0
Missing values in column 'apparent_temperature_max': 0
Missing values in column 'apparent_temperature_min': 0
Missing values in column 'apparent_temperature_mean': 0
Missing values in column 'sunrise': 0
Missing values in column 'sunset': 0
Missing values in column 'shortwave_radiation_sum': 0
Missing values in column 'precipitation_sum': 0
Missing values in column 'rain_sum': 0
Missing values in column 'snowfall_sum': 0
Missing values in column 'precipitation_hours': 0
Missing values in column 'windspeed_10m_max': 0
Missing values in column 'windgusts_10m_max': 0
Missing values in column 'winddirection_10m_dominant': 0
Missing values in column 'et0_fao_evapotranspiration': 0
Missing values in column 'latitude': 0
Missing values in column 'longitude': 0
Missing val

In [6]:
weather_data.printSchema()

root
 |-- time: string (nullable = true)
 |-- weathercode: string (nullable = true)
 |-- temperature_2m_max: string (nullable = true)
 |-- temperature_2m_min: string (nullable = true)
 |-- temperature_2m_mean: string (nullable = true)
 |-- apparent_temperature_max: string (nullable = true)
 |-- apparent_temperature_min: string (nullable = true)
 |-- apparent_temperature_mean: string (nullable = true)
 |-- sunrise: string (nullable = true)
 |-- sunset: string (nullable = true)
 |-- shortwave_radiation_sum: string (nullable = true)
 |-- precipitation_sum: string (nullable = true)
 |-- rain_sum: string (nullable = true)
 |-- snowfall_sum: string (nullable = true)
 |-- precipitation_hours: string (nullable = true)
 |-- windspeed_10m_max: string (nullable = true)
 |-- windgusts_10m_max: string (nullable = true)
 |-- winddirection_10m_dominant: string (nullable = true)
 |-- et0_fao_evapotranspiration: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nu

#### Drop Unwanted Coloumns

In [7]:
weather_data = weather_data.drop("weathercode").drop("sunrise").drop("sunset").drop("snowfall_sum").drop("precipitation_hours")
weather_data = weather_data.drop("winddirection_10m_dominant").drop("latitude").drop("longitude").drop("elevation").drop("country")

In [8]:
weather_data.printSchema()

root
 |-- time: string (nullable = true)
 |-- temperature_2m_max: string (nullable = true)
 |-- temperature_2m_min: string (nullable = true)
 |-- temperature_2m_mean: string (nullable = true)
 |-- apparent_temperature_max: string (nullable = true)
 |-- apparent_temperature_min: string (nullable = true)
 |-- apparent_temperature_mean: string (nullable = true)
 |-- shortwave_radiation_sum: string (nullable = true)
 |-- precipitation_sum: string (nullable = true)
 |-- rain_sum: string (nullable = true)
 |-- windspeed_10m_max: string (nullable = true)
 |-- windgusts_10m_max: string (nullable = true)
 |-- et0_fao_evapotranspiration: string (nullable = true)
 |-- city: string (nullable = true)



#### Convert Data Types

In [9]:
# List of columns to convert to numeric
numeric_columns = [
    "temperature_2m_max", "temperature_2m_min", "temperature_2m_mean",
    "apparent_temperature_max", "apparent_temperature_min", "apparent_temperature_mean",
    "shortwave_radiation_sum", "precipitation_sum", "rain_sum",
    "windspeed_10m_max", "windgusts_10m_max", "et0_fao_evapotranspiration"
]

# Convert columns to numeric
for column in numeric_columns:
    weather_data = weather_data.withColumn(column, weather_data[column].cast("float"))

# Convert time column to DateType
weather_data = weather_data.withColumn("time", to_date(col("time"), 'yyyy-MM-dd'))

# Check the schema after conversion
weather_data.printSchema()

root
 |-- time: date (nullable = true)
 |-- temperature_2m_max: float (nullable = true)
 |-- temperature_2m_min: float (nullable = true)
 |-- temperature_2m_mean: float (nullable = true)
 |-- apparent_temperature_max: float (nullable = true)
 |-- apparent_temperature_min: float (nullable = true)
 |-- apparent_temperature_mean: float (nullable = true)
 |-- shortwave_radiation_sum: float (nullable = true)
 |-- precipitation_sum: float (nullable = true)
 |-- rain_sum: float (nullable = true)
 |-- windspeed_10m_max: float (nullable = true)
 |-- windgusts_10m_max: float (nullable = true)
 |-- et0_fao_evapotranspiration: float (nullable = true)
 |-- city: string (nullable = true)



In [10]:
weather_data.printSchema()

root
 |-- time: date (nullable = true)
 |-- temperature_2m_max: float (nullable = true)
 |-- temperature_2m_min: float (nullable = true)
 |-- temperature_2m_mean: float (nullable = true)
 |-- apparent_temperature_max: float (nullable = true)
 |-- apparent_temperature_min: float (nullable = true)
 |-- apparent_temperature_mean: float (nullable = true)
 |-- shortwave_radiation_sum: float (nullable = true)
 |-- precipitation_sum: float (nullable = true)
 |-- rain_sum: float (nullable = true)
 |-- windspeed_10m_max: float (nullable = true)
 |-- windgusts_10m_max: float (nullable = true)
 |-- et0_fao_evapotranspiration: float (nullable = true)
 |-- city: string (nullable = true)



 #### Filter Values

In [11]:
from pyspark.sql.functions import col

# Define the list of cities
cities = ['Colombo', 'Badulla', 'Kandy', 'Jaffna', 'Matara', 'Kurunegala']

# Filter the data for the specified date range and cities
filtered_weather_data = weather_data.filter(
    (col("time") >= "2019-01-01") & 
    (col("time") <= "2023-12-31") & 
    (col("city").isin(cities))
)

# Show the filtered data
filtered_weather_data.count()

9774

In [12]:
# Select the "city" column and get unique values
unique_cities = filtered_weather_data.select("city").distinct()

unique_cities.show()

+----------+
|      city|
+----------+
|   Colombo|
|     Kandy|
|    Matara|
|    Jaffna|
|   Badulla|
|Kurunegala|
+----------+



#### Replace Station Names 

In [13]:
# Replace values in the NAME column

filtered_weather_data = filtered_weather_data.na.replace(['Colombo'], ['Colombo Proper'], 'city')
filtered_weather_data = filtered_weather_data.na.replace(['Kurunegala'], ['Kurunegala Proper'], 'city')
filtered_weather_data = filtered_weather_data.na.replace(['Badulla'], ['Nuwara Eliya Proper'], 'city')
filtered_weather_data = filtered_weather_data.na.replace(['Jaffna'], ['Jaffna Proper'], 'city')
filtered_weather_data = filtered_weather_data.na.replace(['Matara'], ['Deniyaya, Matara'], 'city')
filtered_weather_data = filtered_weather_data.na.replace(['Kandy'], ['Kandy Proper'], 'city')

In [14]:
# Select the "city" column and get unique values
unique_cities = filtered_weather_data.select("city").distinct()

unique_cities.show()

+-------------------+
|               city|
+-------------------+
|     Colombo Proper|
|       Kandy Proper|
|   Deniyaya, Matara|
|      Jaffna Proper|
|Nuwara Eliya Proper|
|  Kurunegala Proper|
+-------------------+



#### Change Column Names

In [15]:
# Rename the columns
filtered_weather_data = filtered_weather_data \
    .withColumnRenamed("time", "Timestamp") \
    .withColumnRenamed("temperature_2m_max", "Temperature_Max") \
    .withColumnRenamed("temperature_2m_min", "Temperature_Min") \
    .withColumnRenamed("temperature_2m_mean", "Temperature_Mean") \
    .withColumnRenamed("apparent_temperature_max", "Apparent_Temperature_Max") \
    .withColumnRenamed("apparent_temperature_min", "Apparent_Temperature_Min") \
    .withColumnRenamed("apparent_temperature_mean", "Apparent_Temperature_Mean") \
    .withColumnRenamed("shortwave_radiation_sum", "Shortwave_Radiation_Sum") \
    .withColumnRenamed("precipitation_sum", "Precipitation_Sum") \
    .withColumnRenamed("rain_sum", "Rain_Sum") \
    .withColumnRenamed("windspeed_10m_max", "Wind_Speed_Max") \
    .withColumnRenamed("windgusts_10m_max", "Wind_Gusts_Max") \
    .withColumnRenamed("et0_fao_evapotranspiration", "Evapotranspiration") \
    .withColumnRenamed("city", "City")

# Show the renamed columns
filtered_weather_data.printSchema()

root
 |-- Timestamp: date (nullable = true)
 |-- Temperature_Max: float (nullable = true)
 |-- Temperature_Min: float (nullable = true)
 |-- Temperature_Mean: float (nullable = true)
 |-- Apparent_Temperature_Max: float (nullable = true)
 |-- Apparent_Temperature_Min: float (nullable = true)
 |-- Apparent_Temperature_Mean: float (nullable = true)
 |-- Shortwave_Radiation_Sum: float (nullable = true)
 |-- Precipitation_Sum: float (nullable = true)
 |-- Rain_Sum: float (nullable = true)
 |-- Wind_Speed_Max: float (nullable = true)
 |-- Wind_Gusts_Max: float (nullable = true)
 |-- Evapotranspiration: float (nullable = true)
 |-- City: string (nullable = true)



In [16]:
location_counts=filtered_weather_data.groupBy("City").count()
location_counts.show()

+-------------------+-----+
|               City|count|
+-------------------+-----+
|     Colombo Proper| 1629|
|       Kandy Proper| 1629|
|   Deniyaya, Matara| 1629|
|      Jaffna Proper| 1629|
|Nuwara Eliya Proper| 1629|
|  Kurunegala Proper| 1629|
+-------------------+-----+



In [18]:
# Save the DataFrame as a CSV file
filtered_weather_data.toPandas().to_csv('data/SriLanka_Weather_new.csv')