In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import lit
from pyspark.ml.regression import LinearRegression
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor

In [0]:
spark = SparkSession.builder.appName("HiveQuery").enableHiveSupport().getOrCreate()

query = "SELECT * FROM hive_metastore.default.daily_rainfall_flags_removed"
df = spark.sql(query)
df.show()

+----------+----------+---------------+--------------+-------------+----------+----+--------+-----------+
|station_id|state_code|station_list_no|network_div_no|element_units|      date|hour|rainfall|      state|
+----------+----------+---------------+--------------+-------------+----------+----+--------+-----------+
|  22079200|        22|            792|             0|           HI|1955-12-03|2500|     297|Mississippi|
|  22079200|        22|            792|             0|           HI|1955-12-04|2500|      22|Mississippi|
|  22079200|        22|            792|             0|           HI|1955-12-05|2500|       9|Mississippi|
|  22079200|        22|            792|             0|           HI|1955-12-06|2500|      26|Mississippi|
|  22079200|        22|            792|             0|           HI|1955-12-18|2500|      83|Mississippi|
|  22079200|        22|            792|             0|           HI|1956-01-01|2500|       0|Mississippi|
|  22079200|        22|            792|       

In [0]:
df = df.withColumn("month", F.month("date")).withColumn("year", F.year("date"))


In [0]:
# List of columns to be removed
columns_to_remove = ["station_id", "state_code", "station_list_no", "network_div_no", "element_units","hour"]

In [0]:
# Remove the specified columns
df= df.drop(*columns_to_remove)

# Display the cleaned DataFrame
df.show()

+----------+--------+-----------+-----+----+
|      date|rainfall|      state|month|year|
+----------+--------+-----------+-----+----+
|1955-12-03|     297|Mississippi|   12|1955|
|1955-12-04|      22|Mississippi|   12|1955|
|1955-12-05|       9|Mississippi|   12|1955|
|1955-12-06|      26|Mississippi|   12|1955|
|1955-12-18|      83|Mississippi|   12|1955|
|1956-01-01|       0|Mississippi|    1|1956|
|1956-01-03|      11|Mississippi|    1|1956|
|1956-01-18|      40|Mississippi|    1|1956|
|1956-01-22|     260|Mississippi|    1|1956|
|1956-01-23|      44|Mississippi|    1|1956|
|1956-01-30|       5|Mississippi|    1|1956|
|1956-02-01|       5|Mississippi|    2|1956|
|1956-02-02|      11|Mississippi|    2|1956|
|1956-02-03|       5|Mississippi|    2|1956|
|1956-02-04|      10|Mississippi|    2|1956|
|1956-02-05|      37|Mississippi|    2|1956|
|1956-02-06|       8|Mississippi|    2|1956|
|1956-02-08|      84|Mississippi|    2|1956|
|1956-02-09|      17|Mississippi|    2|1956|
|1956-02-1

In [0]:
df.printSchema()

root
 |-- date: date (nullable = true)
 |-- rainfall: long (nullable = true)
 |-- state: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



In [0]:
# Min and Max values of the 'rainfall' column
min_rainfall = df.agg({"rainfall": "min"}).collect()[0][0]
max_rainfall = df.agg({"rainfall": "max"}).collect()[0][0]

print(f"Minimum rainfall value: {min_rainfall}")
print(f"Maximum rainfall value: {max_rainfall}")

Minimum rainfall value: 0
Maximum rainfall value: 1320


In [0]:
# Test & Training


In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Assuming you have already loaded and preprocessed the data into a DataFrame named 'df'

# Prepare the data for model training
vector_assembler = VectorAssembler(inputCols=["rainfall", "year"], outputCol="features")
df_model = vector_assembler.transform(df)

# Split the data into training and test sets (80% for training, 20% for testing)
train_data, test_data = df_model.randomSplit([0.8, 0.2], seed=42)

# Initialize the Random Forest regression model
rf = RandomForestRegressor(featuresCol="features", labelCol="rainfall")

# Train the Random Forest model on the training data
model = rf.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model's performance using Mean Squared Error (MSE)
evaluator = RegressionEvaluator(labelCol="rainfall", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print("Mean Squared Error (MSE):", mse)

# Calculate the R-squared (R2) value
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
print("R-squared (R2):", r2)


Mean Squared Error (MSE): 336.59683375558586
R-squared (R2): 0.861730301521382


In [0]:
# # Example of fine-tuning the Random Forest model's hyperparameters

# from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# # Create a ParamGrid for hyperparameter tuning
# param_grid = (ParamGridBuilder()
#               .addGrid(rf.numTrees, [10, 20, 30])
#               .addGrid(rf.maxDepth, [5, 10, 15])
#               .build())

# # Initialize the CrossValidator with the Random Forest model and ParamGrid
# crossval = CrossValidator(estimator=rf,
#                           estimatorParamMaps=param_grid,
#                           evaluator=evaluator,
#                           numFolds=3)  # Number of folds for cross-validation

# # Run cross-validation to find the best hyperparameters
# cv_model = crossval.fit(train_data)

# # Get the best model with the tuned hyperparameters
# best_model = cv_model.bestModel

# # Make predictions on the test data using the best model
# predictions_tuned = best_model.transform(test_data)

# # Evaluate the best model's performance on the test data
# mse_test_tuned = evaluator.evaluate(predictions_tuned)
# r2_test_tuned = evaluator.evaluate(predictions_tuned, {evaluator.metricName: "r2"})

# print("Mean Squared Error (MSE) on Test Data (Tuned Model):", mse_test_tuned)
# print("R-squared (R2) on Test Data (Tuned Model):", r2_test_tuned)


In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Assuming you have already loaded and preprocessed the data into a DataFrame named 'df'

# Create a SparkSession (if not already created)
spark = SparkSession.builder.appName("RainfallPrediction").getOrCreate()

# Filter the data for New Jersey and New York states
df_nj = df.filter(df["state"] == "New Jersey")
df_ny = df.filter(df["state"] == "New York")

# Prepare the data for model training for New Jersey
vector_assembler_nj = VectorAssembler(inputCols=["rainfall", "year"], outputCol="features")
df_model_nj = vector_assembler_nj.transform(df_nj)

# Prepare the data for model training for New York
vector_assembler_ny = VectorAssembler(inputCols=["rainfall", "year"], outputCol="features")
df_model_ny = vector_assembler_ny.transform(df_ny)

# Split the data into training and test sets for New Jersey (80% for training, 20% for testing)
train_data_nj, test_data_nj = df_model_nj.randomSplit([0.8, 0.2], seed=42)

# Split the data into training and test sets for New York (80% for training, 20% for testing)
train_data_ny, test_data_ny = df_model_ny.randomSplit([0.8, 0.2], seed=42)

# Initialize the Random Forest regression model
rf = RandomForestRegressor(featuresCol="features", labelCol="rainfall")

# Train the Random Forest model on the training data for New Jersey
model_nj = rf.fit(train_data_nj)

# Train the Random Forest model on the training data for New York
model_ny = rf.fit(train_data_ny)

# Make predictions on the test data for New Jersey
predictions_nj = model_nj.transform(test_data_nj)

# Make predictions on the test data for New York
predictions_ny = model_ny.transform(test_data_ny)

# Evaluate the models' performance using Mean Squared Error (MSE) for New Jersey
mse_nj = evaluator.evaluate(predictions_nj)
print("Mean Squared Error (MSE) for New Jersey:", mse_nj)

# Calculate the R-squared (R2) value for New Jersey
r2_nj = evaluator.evaluate(predictions_nj, {evaluator.metricName: "r2"})
print("R-squared (R2) for New Jersey:", r2_nj)

# Evaluate the models' performance using Mean Squared Error (MSE) for New York
mse_ny = evaluator.evaluate(predictions_ny)
print("Mean Squared Error (MSE) for New York:", mse_ny)

# Calculate the R-squared (R2) value for New York
r2_ny = evaluator.evaluate(predictions_ny, {evaluator.metricName: "r2"})
print("R-squared (R2) for New York:", r2_ny)


Mean Squared Error (MSE) for New Jersey: 22.288775676779444
R-squared (R2) for New Jersey: 0.9504686822515916
Mean Squared Error (MSE) for New York: 167.16229902092638
R-squared (R2) for New York: 0.8927971477113349


In [0]:
df_ny = df.filter(df["state"] == "New York")


vector_assembler_ny = VectorAssembler(inputCols=["year", "month"], outputCol="features")
df_model_ny = vector_assembler_ny.transform(df_ny)


rf = RandomForestRegressor(featuresCol="features", labelCol="rainfall")


model_ny = rf.fit(df_model_ny)

df_2012 = spark.range(1, 13).selectExpr("id as month")
df_2012 = df_2012.withColumn("year", lit(2012))

df_model_2012 = vector_assembler_ny.transform(df_2012)

# Make predictions for the data of New York in the year 2012
predictions_2012 = model_ny.transform(df_model_2012)

# Show the predictions for New York in the year 2012
predictions_2012.select("year", "month", "prediction").show()


+----+-----+------------------+
|year|month|        prediction|
+----+-----+------------------+
|2012|    1|22.364033760841412|
|2012|    2|21.169451768323132|
|2012|    3| 27.86094301646122|
|2012|    4|24.184208763699555|
|2012|    5|25.057021730852135|
|2012|    6| 29.30191628609786|
|2012|    7|29.555952657970558|
|2012|    8| 28.02771304764036|
|2012|    9|27.720986862323553|
|2012|   10|27.266379905098383|
|2012|   11|27.391609524625817|
|2012|   12| 27.45301069255609|
+----+-----+------------------+

