# Ghor al Safi's weather data

This notebook will cleanse, visualize and explore weather data for Ghor al safi region obtained from ArabiaWeather. For further details please check the technical documentation.

In [None]:
# To measure the loading time
import time

start_time = time.time()

# Reading the data

In [None]:
# Install pyspark because weather data is big data and we need pyspark to deal with it
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=435733e038d028067187434070b8ac765190dd9515f00fc20c770f86a59b4204
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, IntegerType, FloatType, StringType, DateType

# In this colab we didn't use cluster we used single machine local[*].
# However, if we want change it to cluster, we can change it to master url 7077
spark =  SparkSession.builder.appName("SparkApp").master("local[*]").getOrCreate()

In [None]:
# Importing the ghor weather dataset
WEATHER_GOR = "Gor.csv"

# Using our own schema to define datatypes of the dataset features
Myschema = StructType([
StructField('Station', StringType(),True),
StructField('Date/Time', DateType(),True),
StructField('Air Dew Point', IntegerType(),True),
StructField('Air Temperature (OC)', IntegerType(),True),
StructField('Humidity %', IntegerType(),True),
StructField('Manual Present Weather', StringType(),True),
StructField('Cloud Type', StringType(),True),
StructField('Clouds Cover (Okta)', IntegerType(),True),
StructField('Cloud Cover %', IntegerType(),True),
StructField('Wind Direction (Degrees)', IntegerType(),True),
StructField('Wind Speed (MPS)', IntegerType(),True),
StructField('Wind Type', StringType(),True)])

# Reading the data in df
df = spark.read.csv(WEATHER_GOR,header=True, schema=Myschema)

In [None]:
# Show the df
df.show(10)

+------------+----------+-------------+--------------------+----------+----------------------+----------+-------------------+-------------+------------------------+----------------+---------+
|     Station| Date/Time|Air Dew Point|Air Temperature (OC)|Humidity %|Manual Present Weather|Cloud Type|Clouds Cover (Okta)|Cloud Cover %|Wind Direction (Degrees)|Wind Speed (MPS)|Wind Type|
+------------+----------+-------------+--------------------+----------+----------------------+----------+-------------------+-------------+------------------------+----------------+---------+
|Ghor El Safi|2017-01-02|            6|                  10|        75|                  null|   cumulus|                  3|           38|                    null|               0|     calm|
|Ghor El Safi|2017-01-02|            7|                  20|        42|                  null|   cumulus|                  1|           13|                    null|               0|     calm|
|Ghor El Safi|2017-01-03|            6| 

In [None]:
# Show summary statistics
df.describe().show()

+-------+------------+------------------+--------------------+------------------+----------------------+-----------+-------------------+-----------------+------------------------+------------------+---------+
|summary|     Station|     Air Dew Point|Air Temperature (OC)|        Humidity %|Manual Present Weather| Cloud Type|Clouds Cover (Okta)|    Cloud Cover %|Wind Direction (Degrees)|  Wind Speed (MPS)|Wind Type|
+-------+------------+------------------+--------------------+------------------+----------------------+-----------+-------------------+-----------------+------------------------+------------------+---------+
|  count|        6959|               886|                6854|               886|                   467|        445|                444|             6959|                    4667|              6956|     6958|
|   mean|        null|14.705417607223476|  27.048001167201633|51.635440180586905|                  null|       null| 2.6013513513513513|2.091248742635436|      187.

# Handling missing values

In [None]:
# Showing the null values' count
from pyspark.sql.functions import *
col_null_cnt_df =  df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns])
col_null_cnt_df.show()

+-------+---------+-------------+--------------------+----------+----------------------+----------+-------------------+-------------+------------------------+----------------+---------+
|Station|Date/Time|Air Dew Point|Air Temperature (OC)|Humidity %|Manual Present Weather|Cloud Type|Clouds Cover (Okta)|Cloud Cover %|Wind Direction (Degrees)|Wind Speed (MPS)|Wind Type|
+-------+---------+-------------+--------------------+----------+----------------------+----------+-------------------+-------------+------------------------+----------------+---------+
|      0|        0|         6073|                 105|      6073|                  6492|      6514|               6515|            0|                    2292|               3|        1|
+-------+---------+-------------+--------------------+----------+----------------------+----------+-------------------+-------------+------------------------+----------------+---------+



In [None]:
# Dropping features with high percentage of nulls and will not affect the agriculture highly
df = df.drop('Air Dew Point') # --> can be known from humidity
df = df.drop('Manual Present Weather') # --> the general weather, can be known from other features
df = df.drop('Cloud Type') # --> type doesn't affect production
df = df.drop('Clouds Cover (Okta)') # --> we have cloud cover in %

In [None]:
# Drop all rows where any value at specific columns are NAs, because its not a big number of rows
df = df.na.drop(how='any', subset=['Air Temperature (OC)','Cloud Cover %', 'Wind Speed (MPS)', 'Wind Type']) # 'any' is the defult

In [None]:
# Using the window technique to fill the null values in the wind direction column using the last non-null value within the window.

from pyspark.sql.functions import col, last
from pyspark.sql.window import Window

target_column = "Wind Direction (Degrees)"

# Sorting the DataFrame by the date column in ascending order
sorted_df = df.orderBy("Date/Time")

# Creating a window specification based on the date column
window_spec = Window.orderBy("Date/Time").rowsBetween(Window.unboundedPreceding, 0)

# Filling null values in the target column using the last non-null value
df = sorted_df.withColumn(target_column, last(col(target_column), ignorenulls=True).over(window_spec))

df.show()

+------------+----------+--------------------+----------+-------------+------------------------+----------------+---------+
|     Station| Date/Time|Air Temperature (OC)|Humidity %|Cloud Cover %|Wind Direction (Degrees)|Wind Speed (MPS)|Wind Type|
+------------+----------+--------------------+----------+-------------+------------------------+----------------+---------+
|Ghor El Safi|2017-01-02|                  10|        75|           38|                    null|               0|     calm|
|Ghor El Safi|2017-01-02|                  20|        42|           13|                    null|               0|     calm|
|Ghor El Safi|2017-01-03|                  11|        71|            0|                    null|               0|     calm|
|Ghor El Safi|2017-01-03|                  20|        44|           13|                    null|               0|     calm|
|Ghor El Safi|2017-01-04|                  20|        52|           13|                    null|               0|     calm|
|Ghor El

In [None]:
# Drop all rows where any value at specific columns are NAs, for any rows that didn't have a non vull value before them
df = df.na.drop(how='any', subset=['Wind Direction (Degrees)']) # 'any' is the defult

In [None]:
# Show the nulls again
from pyspark.sql.functions import *
col_null_cnt_df =  df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns])
col_null_cnt_df.show()

+-------+---------+--------------------+----------+-------------+------------------------+----------------+---------+
|Station|Date/Time|Air Temperature (OC)|Humidity %|Cloud Cover %|Wind Direction (Degrees)|Wind Speed (MPS)|Wind Type|
+-------+---------+--------------------+----------+-------------+------------------------+----------------+---------+
|      0|        0|                   0|      5967|            0|                       0|               0|        0|
+-------+---------+--------------------+----------+-------------+------------------------+----------------+---------+



In [None]:
# Using linear regression to create a formula that predicts the humidity from other features

from pyspark.sql.functions import col, isnan
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression

# Filter out rows with null or NaN values in the Humidity column
data = df.filter(col("Humidity %").isNotNull() & ~isnan("Humidity %"))

# Create StringIndexer to convert the Station column into numerical labels
station_indexer = StringIndexer(inputCol="Station", outputCol="Station_Index")
data = station_indexer.fit(data).transform(data)

# Create StringIndexer to convert the Wind Type column into numerical labels
wind_type_indexer = StringIndexer(inputCol="Wind Type", outputCol="Wind_Type_Index")
data = wind_type_indexer.fit(data).transform(data)

# Include the Station_Index and Wind_Type_Index columns in the VectorAssembler
assembler = VectorAssembler(inputCols=["Air Temperature (OC)", "Cloud Cover %", "Wind Direction (Degrees)", "Wind Speed (MPS)"],
                            outputCol="features")
data = assembler.transform(data)

# Select the relevant columns for the linear regression model
selected_data = data.select("features", "Humidity %")

# Split the data into training and test sets
train_data, test_data = selected_data.randomSplit([0.8, 0.2], seed=42)

# Create and fit the linear regression model
lr = LinearRegression(featuresCol="features", labelCol="Humidity %")
model = lr.fit(train_data)


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test_data)

# Evaluate the model using R-squared
evaluator = RegressionEvaluator(labelCol="Humidity %", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)

print("R-squared:", r2)

R-squared: 0.6293776229624954


In [None]:
# Getting the formula's intercept and coefficients
coefficients = model.coefficients
intercept = model.intercept

print(coefficients)
print(intercept)

[-1.3288296519514737,-0.025095181077803817,-0.006767128617596566,-2.1903271481300397]
89.61556485178714


In [None]:
# Filling nulls in the humidity column with the predections from the formula

from pyspark.sql.functions import col, when

# Define the equation coefficients and intercept
coefficients = [-1.3288296519514737,-0.025095181077803817,-0.006767128617596566,-2.1903271481300397]
intercept = 89.61556485178714

# Fill null values in the "Humidity %" column using the equation
df = df.withColumn("Humidity %", when(col("Humidity %").isNull(),
                                      intercept + (coefficients[0] * col("Air Temperature (OC)")) +
                                      (coefficients[1] * col("Cloud Cover %")) +
                                      (coefficients[1] * col("Wind Direction (Degrees)")) +
                                      (coefficients[3] * col("Wind Speed (MPS)")))
                               .otherwise(col("Humidity %")))

# Show the updated DataFrame
df.show()

+------------+----------+--------------------+----------+-------------+------------------------+----------------+---------+
|     Station| Date/Time|Air Temperature (OC)|Humidity %|Cloud Cover %|Wind Direction (Degrees)|Wind Speed (MPS)|Wind Type|
+------------+----------+--------------------+----------+-------------+------------------------+----------------+---------+
|Ghor El Safi|2017-01-08|                  21|      37.0|           38|                     190|               3|   normal|
|Ghor El Safi|2017-01-09|                  13|      53.0|           38|                     190|               0|     calm|
|Ghor El Safi|2017-01-09|                  20|      29.0|           38|                     260|               2|   normal|
|Ghor El Safi|2017-01-10|                  10|      61.0|            0|                     260|               0|     calm|
|Ghor El Safi|2017-01-10|                  20|      47.0|           38|                     350|               2|   normal|
|Ghor El

In [None]:
# Checking the null count for the last time
from pyspark.sql.functions import *
col_null_cnt_df =  df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns])
col_null_cnt_df.show()

+-------+---------+--------------------+----------+-------------+------------------------+----------------+---------+
|Station|Date/Time|Air Temperature (OC)|Humidity %|Cloud Cover %|Wind Direction (Degrees)|Wind Speed (MPS)|Wind Type|
+-------+---------+--------------------+----------+-------------+------------------------+----------------+---------+
|      0|        0|                   0|         0|            0|                       0|               0|        0|
+-------+---------+--------------------+----------+-------------+------------------------+----------------+---------+



In [None]:
# Checking the data count
df.count()

6840

# EDA and visualizations

In [None]:
# Plotting the general weather among years

import plotly.graph_objects as go
import pandas as pd

features = ["Station", "Date/Time", "Air Temperature (OC)", "Humidity %", "Cloud Cover %", "Wind Direction (Degrees)", "Wind Speed (MPS)", "Wind Type"]

for feature in features:
    data = df.select('Date/Time', feature).toPandas()

    # Create a Plotly figure
    fig = go.Figure()

    # Add a scatter trace for the feature
    fig.add_trace(go.Scatter(x=data['Date/Time'], y=data[feature], mode='lines', name=feature))

    # Set the layout
    fig.update_layout(title=feature + ' Trends', xaxis_title='Date/Time', yaxis_title=feature)

    # Show the plot
    fig.show()


In [None]:
# Dropping features that will not assess the prediction of agriculture production
df = df.drop('Wind Type')
df = df.drop('Cloud Cover %')
df = df.drop('Wind Direction (Degrees)')

In [None]:
# Shwoing the df
df.show()

+------------+----------+--------------------+----------+----------------+
|     Station| Date/Time|Air Temperature (OC)|Humidity %|Wind Speed (MPS)|
+------------+----------+--------------------+----------+----------------+
|Ghor El Safi|2017-01-08|                  21|      37.0|               3|
|Ghor El Safi|2017-01-09|                  13|      53.0|               0|
|Ghor El Safi|2017-01-09|                  20|      29.0|               2|
|Ghor El Safi|2017-01-10|                  10|      61.0|               0|
|Ghor El Safi|2017-01-10|                  20|      47.0|               2|
|Ghor El Safi|2017-01-11|                   7|      79.0|               0|
|Ghor El Safi|2017-01-11|                  20|      47.0|               0|
|Ghor El Safi|2017-01-12|                  12|      65.0|               0|
|Ghor El Safi|2017-01-12|                  20|      36.0|               0|
|Ghor El Safi|2017-01-13|                  15|      56.0|               0|
|Ghor El Safi|2017-01-13|

In [None]:
# Plotting the temperature for months amongst years

import plotly.express as px
import pandas as pd

# Convert the DataFrame to Pandas for easier manipulation
data_pd = df.toPandas()

# Extract the year and month from the Date/Time column
data_pd['Year'] = pd.to_datetime(data_pd['Date/Time']).dt.year
data_pd['Month'] = pd.to_datetime(data_pd['Date/Time']).dt.month

# Group the data by Year and Month and calculate the average temperature
average_temp = data_pd.groupby(['Year', 'Month'])['Air Temperature (OC)'].mean().reset_index()

# Create separate plots for each year
for year in average_temp['Year'].unique():
    year_data = average_temp[average_temp['Year'] == year]

    # Create the plot using Plotly Express
    fig = px.line(year_data, x='Month', y='Air Temperature (OC)')

    # Set the layout
    fig.update_layout(title=f'Average Air Temperature by Month - {year}',
                      xaxis_title='Month',
                      yaxis_title='Air Temperature (OC)')

    # Show the plot
    fig.show()


In [None]:
# Plotting the humidity for months amongst years

import plotly.express as px
import pandas as pd

# Convert the DataFrame to Pandas for easier manipulation
data_pd = df.toPandas()

# Extract the year and month from the Date/Time column
data_pd['Year'] = pd.to_datetime(data_pd['Date/Time']).dt.year
data_pd['Month'] = pd.to_datetime(data_pd['Date/Time']).dt.month

# Group the data by Year and Month and calculate the average humidity
average_humidity = data_pd.groupby(['Year', 'Month'])['Humidity %'].mean().reset_index()

# Iterate over each unique year
for year in average_humidity['Year'].unique():
    year_data = average_humidity[average_humidity['Year'] == year]

    # Create the plot using Plotly Express
    fig = px.line(year_data, x='Month', y='Humidity %')

    # Set the layout
    fig.update_layout(title=f'Average Humidity by Month - {year}',
                      xaxis_title='Month',
                      yaxis_title='Humidity %')

    # Show the plot
    fig.show()


In [None]:
# Plotting the wind speed for months amongst years

import plotly.express as px
import pandas as pd

# Convert the DataFrame to Pandas for easier manipulation
data_pd = df.toPandas()

# Extract the year and month from the Date/Time column
data_pd['Year'] = pd.to_datetime(data_pd['Date/Time']).dt.year
data_pd['Month'] = pd.to_datetime(data_pd['Date/Time']).dt.month

# Group the data by Year and Month and calculate the average wind speed
average_wind_speed = data_pd.groupby(['Year', 'Month'])['Wind Speed (MPS)'].mean().reset_index()

# Iterate over each unique year
for year in average_wind_speed['Year'].unique():
    year_data = average_wind_speed[average_wind_speed['Year'] == year]

    # Create the plot using Plotly Express
    fig = px.line(year_data, x='Month', y='Wind Speed (MPS)')

    # Set the layout
    fig.update_layout(title=f'Average Wind Speed by Month - {year}',
                      xaxis_title='Month',
                      yaxis_title='Wind Speed (MPS)')

    # Show the plot
    fig.show()


In [None]:
# Plotting the temperature and humidity over time

import pandas as pd
import plotly.graph_objects as go

# Convert the Spark DataFrame to Pandas DataFrame
df_pandas = df.toPandas()

# Line Plot: Temperature and Humidity Over Time
fig = go.Figure()
fig.add_trace(go.Scatter(x=df_pandas['Date/Time'], y=df_pandas['Air Temperature (OC)'], name='Temperature'))
fig.add_trace(go.Scatter(x=df_pandas['Date/Time'], y=df_pandas['Humidity %'], name='Humidity'))
fig.update_layout(title='Temperature and Humidity Over Time', xaxis_title='Date/Time', yaxis_title='Value')
fig.show()

In [None]:
# Plotting the distribution of temperature by month
df_pandas['Month'] = pd.to_datetime(df_pandas['Date/Time']).dt.month
fig = px.box(df_pandas, x='Month', y='Air Temperature (OC)', title='Distribution of Temperature by Month')
fig.update_layout(xaxis_title='Month', yaxis_title='Temperature (OC)')
fig.show()

# Data transformation

In [None]:
# Aggregate hours of each day to days based on the mean value (to include the effect of night times)

from pyspark.sql.functions import mean
from pyspark.sql.functions import to_date

df = df.withColumn("Date", to_date(df["Date/Time"]))

df = df.groupBy("Date").agg(
    mean("Air Temperature (OC)").alias("Mean Air Temperature"),
    mean("Humidity %").alias("Mean Humidity"),
    mean("Wind Speed (MPS)").alias("Mean Wind Speed")
)

df.show()

+----------+--------------------+-------------+---------------+
|      Date|Mean Air Temperature|Mean Humidity|Mean Wind Speed|
+----------+--------------------+-------------+---------------+
|2017-01-08|                21.0|         37.0|            3.0|
|2017-01-09|                16.5|         41.0|            1.0|
|2017-01-10|                15.0|         54.0|            1.0|
|2017-01-11|                13.5|         63.0|            0.0|
|2017-01-12|                16.0|         50.5|            0.0|
|2017-01-13|                18.5|         51.5|            0.0|
|2017-01-14|                19.5|         55.0|            1.0|
|2017-01-15|                14.5|         68.0|            0.0|
|2017-01-16|                18.0|         55.5|            1.0|
|2017-01-17|                16.5|         62.0|            0.0|
|2017-01-18|                17.0|         65.5|            0.0|
|2017-01-19|                18.5|         66.5|            0.0|
|2017-01-20|                17.5|       

In [None]:
# Check the count after aggregation
df.count()

2129

In [None]:
# General weather among days again
import plotly.express as px

# Convert the PySpark DataFrame to a Pandas DataFrame
pandas_df = df.select("*").toPandas()

# Plotting the data using Plotly
fig = px.line(pandas_df, x="Date", y=["Mean Air Temperature", "Mean Humidity", "Mean Wind Speed"],
              labels={"value": "Value"}, title="Mean Measures")
fig.update_layout(xaxis_tickangle=-45)
fig.show()


In [None]:
# Mean wind speed will also be dropped as its nearly not changing so it won't affect production
df = df.drop("Mean Wind Speed")

In [None]:
# Split months to summer and winter for irbid

from pyspark.sql.functions import col, when, year, count

# Convert the "Date" column to a date type
df = df.withColumn("Date", col("Date").cast("date"))

# Define the conditions for winter and summer seasons
winter_condition = (month(col("Date")).between(11, 12)) | (month(col("Date")).between(1, 3))
summer_condition = ~(winter_condition)

# Create new column "Season" based on the defined conditions
df = df.withColumn("Season", when(winter_condition, "Winter").otherwise("Summer"))

df.show()

+----------+--------------------+-------------+------+
|      Date|Mean Air Temperature|Mean Humidity|Season|
+----------+--------------------+-------------+------+
|2017-01-08|                21.0|         37.0|Winter|
|2017-01-09|                16.5|         41.0|Winter|
|2017-01-10|                15.0|         54.0|Winter|
|2017-01-11|                13.5|         63.0|Winter|
|2017-01-12|                16.0|         50.5|Winter|
|2017-01-13|                18.5|         51.5|Winter|
|2017-01-14|                19.5|         55.0|Winter|
|2017-01-15|                14.5|         68.0|Winter|
|2017-01-16|                18.0|         55.5|Winter|
|2017-01-17|                16.5|         62.0|Winter|
|2017-01-18|                17.0|         65.5|Winter|
|2017-01-19|                18.5|         66.5|Winter|
|2017-01-20|                17.5|         51.0|Winter|
|2017-01-21|                22.0|         46.0|Winter|
|2017-01-22|                17.5|         62.5|Winter|
|2017-01-2

In [None]:
# Save the DataFrame as a CSV file to use with production data
df.write.csv('/content/Ghor_Cleansed_final.csv', header=True)

# Measuring Loading Time

In [None]:
end_time = time.time()

loading_time = end_time - start_time
print("Loading Time:", loading_time, "seconds")

Loading Time: 367.71215558052063 seconds


# Measuring Computing resources

In [None]:
!pip install psutil

import psutil

cpu_usage = psutil.cpu_percent()
ram_usage = psutil.virtual_memory().percent

print("CPU Usage:", cpu_usage, "%")
print("RAM Usage:", ram_usage, "%")

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
CPU Usage: 25.8 %
RAM Usage: 16.3 %


# Measuring Storage resources

In [None]:
!df -h

Filesystem      Size  Used Avail Use% Mounted on
overlay         108G   25G   83G  24% /
tmpfs            64M     0   64M   0% /dev
shm             5.8G     0  5.8G   0% /dev/shm
/dev/root       2.0G 1005M  952M  52% /usr/sbin/docker-init
tmpfs           6.4G  564K  6.4G   1% /var/colab
/dev/sda1        41G   28G   13G  69% /etc/hosts
tmpfs           6.4G     0  6.4G   0% /proc/acpi
tmpfs           6.4G     0  6.4G   0% /proc/scsi
tmpfs           6.4G     0  6.4G   0% /sys/firmware
