In [179]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [180]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

drive			       spark-3.1.1-bin-hadoop3.2.tgz.1	spark-3.1.1-bin-hadoop3.2.tgz.5
sample_data		       spark-3.1.1-bin-hadoop3.2.tgz.2	weatherAUS.csv
spark-3.1.1-bin-hadoop3.2      spark-3.1.1-bin-hadoop3.2.tgz.3
spark-3.1.1-bin-hadoop3.2.tgz  spark-3.1.1-bin-hadoop3.2.tgz.4


In [181]:
!pip install -q pyspark
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [182]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [183]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import col
from pyspark.sql.functions import isnan, when, count, col

In [184]:
# Create a Spark session
spark = SparkSession.builder.appName("RainPrediction").getOrCreate()



In [185]:
# Load the dataset
df_weather = spark.read.csv("/content/drive/MyDrive/weatherAUS.csv", header=True, inferSchema=True)

In [186]:
df_weather


Date,Location,MinTemp,MaxTemp,Rainfall,Evaporation,Sunshine,WindGustDir,WindGustSpeed,WindDir9am,WindDir3pm,WindSpeed9am,WindSpeed3pm,Humidity9am,Humidity3pm,Pressure9am,Pressure3pm,Cloud9am,Cloud3pm,Temp9am,Temp3pm,RainToday,RISK_MM,RainTomorrow
2008-12-01,Albury,13.4,22.9,0.6,,,W,44,W,WNW,20.0,24,71,22,1007.7,1007.1,8.0,,16.9,21.8,No,0.0,No
2008-12-02,Albury,7.4,25.1,0.0,,,WNW,44,NNW,WSW,4.0,22,44,25,1010.6,1007.8,,,17.2,24.3,No,0.0,No
2008-12-03,Albury,12.9,25.7,0.0,,,WSW,46,W,WSW,19.0,26,38,30,1007.6,1008.7,,2.0,21.0,23.2,No,0.0,No
2008-12-04,Albury,9.2,28.0,0.0,,,NE,24,SE,E,11.0,9,45,16,1017.6,1012.8,,,18.1,26.5,No,1.0,No
2008-12-05,Albury,17.5,32.3,1.0,,,W,41,ENE,NW,7.0,20,82,33,1010.8,1006.0,7.0,8.0,17.8,29.7,No,0.2,No
2008-12-06,Albury,14.6,29.7,0.2,,,WNW,56,W,W,19.0,24,55,23,1009.2,1005.4,,,20.6,28.9,No,0.0,No
2008-12-07,Albury,14.3,25.0,0.0,,,W,50,SW,W,20.0,24,49,19,1009.6,1008.2,1.0,,18.1,24.6,No,0.0,No
2008-12-08,Albury,7.7,26.7,0.0,,,W,35,SSE,W,6.0,17,48,19,1013.4,1010.1,,,16.3,25.5,No,0.0,No
2008-12-09,Albury,9.7,31.9,0.0,,,NNW,80,SE,NW,7.0,28,42,9,1008.9,1003.6,,,18.3,30.2,No,1.4,Yes
2008-12-10,Albury,13.1,30.1,1.4,,,W,28,S,SSE,15.0,11,58,27,1007.0,1005.7,,,20.1,28.2,Yes,0.0,No


In [187]:
# Display the schema of the DataFrame
df_weather.printSchema()


root
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: string (nullable = true)
 |-- MaxTemp: string (nullable = true)
 |-- Rainfall: string (nullable = true)
 |-- Evaporation: string (nullable = true)
 |-- Sunshine: string (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: string (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: string (nullable = true)
 |-- WindSpeed3pm: string (nullable = true)
 |-- Humidity9am: string (nullable = true)
 |-- Humidity3pm: string (nullable = true)
 |-- Pressure9am: string (nullable = true)
 |-- Pressure3pm: string (nullable = true)
 |-- Cloud9am: string (nullable = true)
 |-- Cloud3pm: string (nullable = true)
 |-- Temp9am: string (nullable = true)
 |-- Temp3pm: string (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RISK_MM: double (nullable = true)
 |-- RainTomorrow: string (nullable = true)



In [188]:
# 1.Explore the dataset to find what are the categorical variables in the dataset. List categorical variable names

# Identify list of columns based on data types
list_of_columns = [col_name for col_name, data_type in df_weather.dtypes if data_type == 'string']

# Display the list of categorical variable names
print("list of columns:", list_of_columns )

list of columns: ['Date', 'Location', 'MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustDir', 'WindGustSpeed', 'WindDir9am', 'WindDir3pm', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm', 'RainToday', 'RainTomorrow']


In [189]:
from pyspark.sql.types import FloatType
# Columns to convert to numeric types
continuous_variables = ['MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm',
                   'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm', 'RISK_MM']

# Convert numeric columns to FloatType
for col in continuous_variables:
    df_weather = df_weather.withColumn(col, df_weather[col].cast(FloatType()))

# Get the names of all columns in the DataFrame
all_columns = df_weather.columns

# Get the names of string columns (columns not in numeric_columns)
categorical_variables= [col for col in all_columns if col not in continuous_variables]

# Display the updated schema
df_weather.printSchema()

# Display the list of numeric and string columns
print("continuous_variables:", continuous_variables)
print(" categorical_variables:", categorical_variables)



root
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- MinTemp: float (nullable = true)
 |-- MaxTemp: float (nullable = true)
 |-- Rainfall: float (nullable = true)
 |-- Evaporation: float (nullable = true)
 |-- Sunshine: float (nullable = true)
 |-- WindGustDir: string (nullable = true)
 |-- WindGustSpeed: float (nullable = true)
 |-- WindDir9am: string (nullable = true)
 |-- WindDir3pm: string (nullable = true)
 |-- WindSpeed9am: float (nullable = true)
 |-- WindSpeed3pm: float (nullable = true)
 |-- Humidity9am: float (nullable = true)
 |-- Humidity3pm: float (nullable = true)
 |-- Pressure9am: float (nullable = true)
 |-- Pressure3pm: float (nullable = true)
 |-- Cloud9am: float (nullable = true)
 |-- Cloud3pm: float (nullable = true)
 |-- Temp9am: float (nullable = true)
 |-- Temp3pm: float (nullable = true)
 |-- RainToday: string (nullable = true)
 |-- RISK_MM: float (nullable = true)
 |-- RainTomorrow: string (nullable = true)

continuous_variables

In [190]:
# 2. Explore how many categorical variables have null values.

from pyspark.sql.functions import count, when, col
df_weather_na = df_weather.replace('NA', None)
null_counts = df_weather_na.select([count(when(col(c).isNull(), c)).alias(c) for c in categorical_variables])
columns_with_na = [col_name for col_name in categorical_variables if null_counts.select(col_name).head()[col_name] > 0]
num_categorical_na = len(columns_with_na)
print("Number of Categorical Variables with 'NA' Values:", num_categorical_na)

Number of Categorical Variables with 'NA' Values: 4


In [191]:
# 3. Get the frequency count of each categorical variable. For instance, how many discrete values in each categorical variable and how many datapoints from each distinct value

from pyspark.sql.functions import col, countDistinct
 # Loop through each categorical variable to get frequency counts
for col_name in categorical_variables:
    print(f"Frequency count for '{col_name}':")
    df_weather.groupBy(col_name).count().show()

    print(f"Number of discrete values for '{col_name}':")
    distinct_count =df_weather.select(countDistinct(col_name)).collect()[0][0]
    print(distinct_count)

Frequency count for 'Date':
+----------+-----+
|      Date|count|
+----------+-----+
|2008-12-03|   23|
|2009-01-04|   45|
|2009-06-23|   46|
|2009-12-04|   46|
|2010-02-12|   44|
|2010-09-24|   46|
|2011-01-29|   44|
|2013-03-14|   49|
|2014-02-16|   48|
|2014-02-22|   48|
|2014-05-27|   49|
|2014-12-13|   47|
|2015-05-01|   46|
|2016-08-17|   49|
|2017-05-14|   49|
|2008-04-20|    2|
|2008-10-26|    8|
|2008-11-19|    8|
|2010-03-06|   45|
|2012-03-04|   46|
+----------+-----+
only showing top 20 rows

Number of discrete values for 'Date':
3436
Frequency count for 'Location':
+-------------+-----+
|     Location|count|
+-------------+-----+
|       Cairns| 2988|
|NorfolkIsland| 2964|
|      Bendigo| 3034|
|      Walpole| 2819|
|     Canberra| 3418|
|      Woomera| 2990|
|     Adelaide| 3090|
|        Cobar| 2988|
|SydneyAirport| 3005|
| PerthAirport| 3009|
|   Wollongong| 2983|
|  Williamtown| 2553|
|        Moree| 2854|
|      Mildura| 3007|
|     Portland| 2996|
|       Albany| 301

In [194]:
# 4. Print the first five rows of the dataset.
# Load the dataset
df_weather = spark.read.csv("/content/drive/MyDrive/weatherAUS.csv", header=True, inferSchema=True)

# Display the first five rows of the DataFrame
df_weather.show(5)

+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+-------+------------+
|      Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RISK_MM|RainTomorrow|
+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+-------+------------+
|2008-12-01|  Albury|   13.4|   22.9|     0.6|         NA|      NA|          W|           44|         W|       WNW|          20|          24|         71|         22|     1007.7|     1007.1|       8|      NA|   16.9|   21.8|       No|  

In [195]:
# 5. What are the available columns of the dataset?

# Display available columns
print("Available columns:", df_weather.columns)

Available columns: ['Date', 'Location', 'MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustDir', 'WindGustSpeed', 'WindDir9am', 'WindDir3pm', 'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm', 'RainToday', 'RISK_MM', 'RainTomorrow']


In [196]:
6# Drop the 'RISK_MM' variable column
df_weather = df_weather.drop("RISK_MM")
df_weather.show()

+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|      Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|2008-12-01|  Albury|   13.4|   22.9|     0.6|         NA|      NA|          W|           44|         W|       WNW|          20|          24|         71|         22|     1007.7|     1007.1|       8|      NA|   16.9|   21.8|       No|          No|
|2008-12-02|

In [197]:
7# Display summary of the dataset
df_weather_na.describe().show()

+-------+----------+--------+------------------+------------------+-----------------+-----------------+------------------+-----------+------------------+----------+----------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+---------+------------------+------------+
|summary|      Date|Location|           MinTemp|           MaxTemp|         Rainfall|      Evaporation|          Sunshine|WindGustDir|     WindGustSpeed|WindDir9am|WindDir3pm|     WindSpeed9am|     WindSpeed3pm|       Humidity9am|       Humidity3pm|       Pressure9am|       Pressure3pm|          Cloud9am|         Cloud3pm|          Temp9am|           Temp3pm|RainToday|           RISK_MM|RainTomorrow|
+-------+----------+--------+------------------+------------------+-----------------+-----------------+------------------+-----------+------------------+----------+----------+-----------------

In [198]:
8# Display first five rows for categorical variables

# Get the names of all columns in the DataFrame
all_columns = df_weather.columns

# Get the names of string columns (columns not in numeric_columns)
categorical_variables= [col for col in all_columns if col not in continuous_variables]
df_weather.select(categorical_variables).show(5)


+----------+--------+-----------+----------+----------+---------+------------+
|      Date|Location|WindGustDir|WindDir9am|WindDir3pm|RainToday|RainTomorrow|
+----------+--------+-----------+----------+----------+---------+------------+
|2008-12-01|  Albury|          W|         W|       WNW|       No|          No|
|2008-12-02|  Albury|        WNW|       NNW|       WSW|       No|          No|
|2008-12-03|  Albury|        WSW|         W|       WSW|       No|          No|
|2008-12-04|  Albury|         NE|        SE|         E|       No|          No|
|2008-12-05|  Albury|          W|       ENE|        NW|       No|          No|
+----------+--------+-----------+----------+----------+---------+------------+
only showing top 5 rows



In [199]:
# 9.Decompose the date field into year, month, and day fields. Then drop the original date
# field.
from pyspark.sql.functions import year, month, dayofmonth
# Decompose date field into year, month, and day fields
df_weather = df_weather.withColumn("Year", year(df_weather["Date"]))
df_weather= df_weather.withColumn("Month", month(df_weather["Date"]))
df_weather= df_weather.withColumn("Day", dayofmonth(df_weather["Date"]))
df_weather_dd= df_weather.drop("Date")
df_weather_dd.show()

+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+----+-----+---+
|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|Year|Month|Day|
+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+----+-----+---+
|  Albury|   13.4|   22.9|     0.6|         NA|      NA|          W|           44|         W|       WNW|          20|          24|         71|         22|     1007.7|     1007.1|       8|      NA|   16.9|   21.8|       No|          No|2008|   12| 

In [200]:
# 10. How many unique locations are there in the dataset
unique_locations_count = df_weather.select("Location").distinct().count()
print("Number of unique locations:", unique_locations_count)

Number of unique locations: 49


In [201]:
# 11. Print the number of times each unique location appears in the dataset.
location_counts = df_weather.groupBy("Location").count()
location_counts.show()

+-------------+-----+
|     Location|count|
+-------------+-----+
|       Cairns| 2988|
|NorfolkIsland| 2964|
|      Bendigo| 3034|
|      Walpole| 2819|
|     Canberra| 3418|
|      Woomera| 2990|
|     Adelaide| 3090|
|        Cobar| 2988|
|SydneyAirport| 3005|
| PerthAirport| 3009|
|   Wollongong| 2983|
|  Williamtown| 2553|
|        Moree| 2854|
|      Mildura| 3007|
|     Portland| 2996|
|       Albany| 3016|
|   SalmonGums| 2955|
|     Brisbane| 3161|
|       Sydney| 3337|
|        Perth| 3193|
+-------------+-----+
only showing top 20 rows



In [202]:
# 12. Perform One Hot Encoding for each categorical variable

from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.pipeline import Pipeline

# Filter out 'Date' column if not needed for encoding
string_columns_filtered = [col_name for col_name in categorical_variables if col_name != 'Date']
string_columns_filtered
# Initialize lists to store stages for the pipeline
indexers = []
encoders = []

# Perform One-Hot Encoding for each categorical variable excluding 'Date'
for col_name in string_columns_filtered:
    # StringIndexer for categorical variable
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_index")

    # OneHotEncoder for indexed categorical variable
    encoder = OneHotEncoder(inputCol=col_name + "_index", outputCol=col_name + "_encoded")

    # Add indexer and encoder to respective lists
    indexers.append(indexer)
    encoders.append(encoder)

# Create a pipeline with all indexers and encoders
pipeline = Pipeline(stages=indexers + encoders)

# Fit and transform the pipeline on the DataFrame
df_weather_encoded = pipeline.fit(df_weather_dd).transform(df_weather_dd)#dd

# Display the DataFrame with One-Hot Encoded columns
df_weather_encoded.show(5)

+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+----+-----+---+--------------+-----------------+----------------+----------------+---------------+------------------+----------------+-------------------+------------------+------------------+-----------------+--------------------+
|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|Year|Month|Day|Location_index|WindGustDir_index|WindDir9am_index|WindDir3pm_index|RainToday_index|RainTomorrow_index|Location_encoded|WindGustDir_encoded|WindDir9am_encoded|WindDir3pm_encoded|RainToday_encoded|RainTomorrow_encoded|
+--------+-------+-------+--------+-----------+--------+----------

In [203]:
# 13. The RainTomorrow is the label, and all other fields are features

from pyspark.sql.functions import lit

# Create a dummy 'Date' column with a constant value to include in the DataFrame
df_weather_dd= df_weather_dd.withColumn("Date", lit("2024-03-11"))
# Convert string columns to numeric using StringIndexer
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(df_weather_dd) for col in categorical_variables]
df_indexed = df_weather_dd
for indexer in indexers:
    df_indexed = indexer.transform(df_indexed)

# Define the features (excluding 'RainTomorrow' and any other non-feature columns)
feature_columns = [col + "_index" for col in categorical_variables if col != 'RainTomorrow']

# Assemble the features into a single feature vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transform the DataFrame to include the feature vector
df_features = assembler.transform(df_indexed)

# Select only the 'features' and 'RainTomorrow' columns for model training
df_model = df_features.select("features", "RainTomorrow")

# Display the DataFrame with features and label
df_model.show(20, truncate=False)

+-----------------------------+------------+
|features                     |RainTomorrow|
+-----------------------------+------------+
|[0.0,14.0,0.0,7.0,7.0,0.0]   |No          |
|[0.0,14.0,10.0,10.0,3.0,0.0] |No          |
|[0.0,14.0,7.0,7.0,3.0,0.0]   |No          |
|[0.0,14.0,14.0,2.0,10.0,0.0] |No          |
|[0.0,14.0,0.0,11.0,8.0,0.0]  |No          |
|[0.0,14.0,10.0,7.0,1.0,0.0]  |No          |
|[0.0,14.0,0.0,8.0,1.0,0.0]   |No          |
|[0.0,14.0,0.0,4.0,1.0,0.0]   |No          |
|[0.0,14.0,15.0,2.0,8.0,0.0]  |Yes         |
|[0.0,14.0,0.0,6.0,5.0,1.0]   |No          |
|[0.0,14.0,4.0,4.0,9.0,0.0]   |Yes         |
|[0.0,14.0,16.0,13.0,14.0,1.0]|Yes         |
|[0.0,14.0,0.0,10.0,13.0,1.0] |Yes         |
|[0.0,14.0,8.0,7.0,12.0,1.0]  |No          |
|[0.0,14.0,10.0,1.0,7.0,2.0]  |No          |
|[0.0,14.0,12.0,14.0,10.0,0.0]|Yes         |
|[0.0,14.0,0.0,0.0,7.0,1.0]   |Yes         |
|[0.0,14.0,5.0,16.0,4.0,1.0]  |No          |
|[0.0,14.0,5.0,2.0,13.0,0.0]  |No          |
|[0.0,14.0

In [204]:
# 14 You may use the median value (median imputation) of each field to fill null values if it is a numerical column. Use the most frequent value in case of a categorical column
from pyspark.sql.functions import col
from pyspark.ml.feature import Imputer

# Define numerical columns and categorical variables based on your DataFrame structure
numerical_columns = ['MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine', 'WindGustSpeed', 'WindSpeed9am', 'WindSpeed3pm',
                   'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm']  # Define your numerical columns here
categorical_variables = ['Location', 'WindGustDir', 'WindDir9am', 'WindDir3pm', 'RainToday', 'RainTomorrow']  # Define your categorical variables here

# Convert numerical columns to numerical type
for col_name in numerical_columns:
    df_weather_dd = df_weather_dd.withColumn(col_name, col(col_name).cast('double'))

# Impute missing values with median for numerical columns
imputer = Imputer(strategy='median', inputCols=numerical_columns, outputCols=[col + "_imputed" for col in numerical_columns])
df_imputed = imputer.fit(df_weather_dd).transform(df_weather_dd)

# Define a function to impute missing values with most frequent value for categorical columns
def fill_missing_with_mode(df_weather_dd, column):
    mode_value = df_weather_dd.select(column).groupBy(column).count().orderBy(col("count").desc()).first()[column]
    return df_weather_dd.withColumn(column + "_imputed", when(col(column).isNull(), mode_value).otherwise(col(column)))

# Impute missing values with mode for categorical columns
for col_name in categorical_variables:
    df_imputed = fill_missing_with_mode(df_imputed, col_name)

# Select only the imputed columns and 'RainTomorrow' for model training
imputed_columns = [col + "_imputed" for col in numerical_columns] + [col + "_imputed" for col in categorical_variables]
df_final = df_imputed.select(imputed_columns + ["RainTomorrow"])

# Display the DataFrame with imputed values
df_final.show(20, truncate=False)


+---------------+---------------+----------------+-------------------+----------------+---------------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+----------------+----------------+---------------+---------------+----------------+-------------------+------------------+------------------+-----------------+--------------------+------------+
|MinTemp_imputed|MaxTemp_imputed|Rainfall_imputed|Evaporation_imputed|Sunshine_imputed|WindGustSpeed_imputed|WindSpeed9am_imputed|WindSpeed3pm_imputed|Humidity9am_imputed|Humidity3pm_imputed|Pressure9am_imputed|Pressure3pm_imputed|Cloud9am_imputed|Cloud3pm_imputed|Temp9am_imputed|Temp3pm_imputed|Location_imputed|WindGustDir_imputed|WindDir9am_imputed|WindDir3pm_imputed|RainToday_imputed|RainTomorrow_imputed|RainTomorrow|
+---------------+---------------+----------------+-------------------+----------------+---------------------+--------------------+--------------------

In [205]:
 # 15. Normalize each numerical column (bring it to a value between 0 and 1)
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

# Assemble all imputed numerical columns into a single feature vector
assembler = VectorAssembler(inputCols=[col + "_imputed" for col in numerical_columns], outputCol="numerical_features")
df_assembled = assembler.transform(df_final)

# Initialize MinMaxScaler to normalize each numerical column
scaler = MinMaxScaler(inputCol="numerical_features", outputCol="scaled_numerical_features")

# Compute summary statistics and generate the scaler model
scaler_model = scaler.fit(df_assembled)

# Normalize each numerical column to a value between 0 and 1
df_normalized = scaler_model.transform(df_assembled)

# Display the DataFrame with normalized numerical columns
df_normalized.show(5, truncate=False)

+---------------+---------------+----------------+-------------------+----------------+---------------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+----------------+----------------+---------------+---------------+----------------+-------------------+------------------+------------------+-----------------+--------------------+------------+--------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|MinTemp_imputed|MaxTemp_imputed|Rainfall_imputed|Evaporation_imputed|Sunshine_imputed|WindGustSpeed_imputed|WindSpeed9am_imputed|WindSpeed3pm_imputed|Humidity9am_imputed|Humidity3pm_imputed|Pressur

In [206]:
# 16. Train your Logistic Regression model on the training dataset (70% 30% split)
# Split the dataset into training and test sets
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Split the data into training and test sets (70% - 30%)
train_data, test_data = df_normalized.randomSplit([0.7, 0.3], seed=42)

# Convert the target variable 'RainTomorrow' to a numeric format
indexer = StringIndexer(inputCol='RainTomorrow', outputCol='label')
train_data_indexed = indexer.fit(train_data).transform(train_data)

# Initialize Logistic Regression model
lr = LogisticRegression(featuresCol='scaled_numerical_features', labelCol='label')

# Create a Pipeline with the Logistic Regression model
pipeline = Pipeline(stages=[lr])

# Train the Logistic Regression model on the training dataset
model = pipeline.fit(train_data_indexed)

# Make predictions on the test set
test_data_indexed = indexer.fit(test_data).transform(test_data)
predictions = model.transform(test_data_indexed)

# Evaluate the model using BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='label')
auc = evaluator.evaluate(predictions)

# Display the Area Under ROC for the model
print("Area Under ROC: ", auc)

Area Under ROC:  0.8624691198798907


In [207]:
# 17. Predict RainTomorrow for test set
# Make predictions on the test set using the trained model
predictions_test = model.transform(test_data_indexed)

# Display predictions for RainTomorrow in the test set
predictions_test.select("RainTomorrow", "prediction").show(10)

+------------+----------+
|RainTomorrow|prediction|
+------------+----------+
|          No|       0.0|
|          No|       1.0|
|          No|       1.0|
|          No|       0.0|
|         Yes|       1.0|
|          No|       1.0|
|          No|       0.0|
|          No|       0.0|
|          No|       0.0|
|          No|       0.0|
+------------+----------+
only showing top 10 rows



In [209]:
# 18 Describe the performance of your model using the confusion matrix and print TP, TN,FP, and FN. In addition, provide the accuracy and F1 score of your model
from pyspark.mllib.evaluation import MulticlassMetrics

# Convert predictions to RDD
prediction_and_label = predictions_test.select("prediction", "label").rdd

# Instantiate metrics object
metrics = MulticlassMetrics(prediction_and_label)

# Calculate confusion matrix
confusion_matrix = metrics.confusionMatrix().toArray()

# Extract TP, TN, FP, FN from confusion matrix
TP = confusion_matrix[1, 1]
TN = confusion_matrix[0, 0]
FP = confusion_matrix[0, 1]
FN = confusion_matrix[1, 0]

# Calculate accuracy
accuracy = metrics.accuracy

# Calculate F1 score
f1_score = metrics.fMeasure(1.0)

# Display TP, TN, FP, FN, accuracy, and F1 score
print("True Positives (TP):", TP)
print("True Negatives (TN):", TN)
print("False Positives (FP):", FP)
print("False Negatives (FN):", FN)
print("Accuracy:", accuracy)
print("F1 Score:", f1_score)

True Positives (TP): 4585.0
True Negatives (TN): 31291.0
False Positives (FP): 1806.0
False Negatives (FN): 4923.0
Accuracy: 0.8420607909869734
F1 Score: 0.576765834329203


In [177]:
# Stop SparkSession
spark.stop()
