In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
#######################################
###!@0 START INIT ENVIRONMENT
!ls /content/drive/MyDrive/spark-3.5.2-bin-hadoop3.tgz
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!tar xf /content/drive/MyDrive/spark-3.5.2-bin-hadoop3.tgz
!pip install -q findspark
!pip install -q pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.2-bin-hadoop3"
###!@0 END INIT ENVIRONMENT

/content/drive/MyDrive/spark-3.5.2-bin-hadoop3.tgz


In [3]:
!mkdir -p /content/data
!rm -rf /content/data/*.csv
!ln -s /content/drive/MyDrive/DES_Project/DataSet/weather_data/*.csv /content/data/

In [4]:
#Load Weather and consumption Data

csv_path1 = '/content/drive/MyDrive/DES_Project/DataSet/weather_data.csv'
csv_path2 = '/content/drive/MyDrive/DES_Project/DataSet/electricity_consumption_delhi.csv'

In [5]:
#######################################
###!@1 START OF PYSPARK INIT
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
input_type = 'sample'
spark = SparkSession.builder\
         .master("local")\
         .appName("Colab")\
         .config('spark.ui.port', '4050')\
         .getOrCreate()
# Spark is ready to go within Colab!
###!@1 END OF PYSPARK INIT

In [6]:
#Load data from csv files

weather_data = spark.read.csv(csv_path1, header=True, inferSchema=True)
electricity_data = spark.read.csv(csv_path2, header=True, inferSchema=True)

In [7]:
#Import required Libraries

import pyspark.sql.functions as F
from pyspark.sql.functions import col, count, when, isnan
from pyspark.sql.functions import to_date
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import avg, date_trunc
from pyspark.sql.window import Window
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import DateType
from datetime import timedelta, datetime

In [8]:
# Display the first few rows

electricity_data.show(5)
weather_data.show(5)

+-----+-------------------+----------------+
| City|               Date|Consumption (MW)|
+-----+-------------------+----------------+
|Delhi|2000-01-01 00:00:00|            2508|
|Delhi|2000-01-01 00:01:00|            2537|
|Delhi|2000-01-01 00:02:00|            2548|
|Delhi|2000-01-01 00:03:00|            2509|
|Delhi|2000-01-01 00:04:00|            2443|
+-----+-------------------+----------------+
only showing top 5 rows

+-----+-------------------+---------------+--------------+------------+--------------+-------------------+----------------+--------------+------------+-------------------+-------------------+
| City|               Date|Temperature (C)|Feels Like (C)|Humidity (%)|Pressure (hPa)|Weather Description|Wind Speed (m/s)|Cloudiness (%)|Rain (1h mm)|            Sunrise|             Sunset|
+-----+-------------------+---------------+--------------+------------+--------------+-------------------+----------------+--------------+------------+-------------------+---------------

In [9]:
# Print schema

electricity_data.printSchema()
weather_data.printSchema()

root
 |-- City: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Consumption (MW): integer (nullable = true)

root
 |-- City: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Temperature (C): double (nullable = true)
 |-- Feels Like (C): double (nullable = true)
 |-- Humidity (%): integer (nullable = true)
 |-- Pressure (hPa): integer (nullable = true)
 |-- Weather Description: string (nullable = true)
 |-- Wind Speed (m/s): double (nullable = true)
 |-- Cloudiness (%): integer (nullable = true)
 |-- Rain (1h mm): double (nullable = true)
 |-- Sunrise: timestamp (nullable = true)
 |-- Sunset: timestamp (nullable = true)



In [10]:
# Check data statistics

electricity_data.describe().show()
weather_data.describe().show()

+-------+--------+------------------+
|summary|    City|  Consumption (MW)|
+-------+--------+------------------+
|  count|13099680|          13099680|
|   mean|    NULL| 5488.466762623209|
| stddev|    NULL|1797.6923665615125|
|    min|   Delhi|              2400|
|    max|   Delhi|              8600|
+-------+--------+------------------+

+-------+--------+------------------+------------------+------------------+-----------------+-------------------+------------------+------------------+------------------+
|summary|    City|   Temperature (C)|    Feels Like (C)|      Humidity (%)|   Pressure (hPa)|Weather Description|  Wind Speed (m/s)|    Cloudiness (%)|      Rain (1h mm)|
+-------+--------+------------------+------------------+------------------+-----------------+-------------------+------------------+------------------+------------------+
|  count|13066560|          13066560|          13066560|          13066560|         13066560|           13066560|          13066560|          13

In [11]:
# Check for missing values

def missing_values(df):
    total_rows = df.count()
    return df.select([
       (count(when(isnan(c) | col(c).isNull(), c)) / total_rows).alias(f"{c}_missing") if dict(df.dtypes)[c] in ["double", "float"]
       else (count(when(col(c).isNull(), c)) / total_rows).alias(f"{c}_missing") for c in df.columns])

missing_values(electricity_data).show()

missing_values(weather_data).show()

+------------+------------+------------------------+
|City_missing|Date_missing|Consumption (MW)_missing|
+------------+------------+------------------------+
|         0.0|         0.0|                     0.0|
+------------+------------+------------------------+

+------------+------------+-----------------------+----------------------+--------------------+----------------------+---------------------------+------------------------+----------------------+--------------------+---------------+--------------+
|City_missing|Date_missing|Temperature (C)_missing|Feels Like (C)_missing|Humidity (%)_missing|Pressure (hPa)_missing|Weather Description_missing|Wind Speed (m/s)_missing|Cloudiness (%)_missing|Rain (1h mm)_missing|Sunrise_missing|Sunset_missing|
+------------+------------+-----------------------+----------------------+--------------------+----------------------+---------------------------+------------------------+----------------------+--------------------+---------------+---------

In [12]:
# Assuming the DataFrame name is `weather_data` and the column is `Date`
weather_data = weather_data.withColumn("Date", F.to_date(F.col("Date")))

# Show the resulting DataFrame to verify
weather_data.show()

# Assuming the DataFrame name is `electricity_data` and the column is `Date`
electricity_data = electricity_data.withColumn("Date", F.to_date(F.col("Date")))

# Show the resulting DataFrame to verify
electricity_data.show()

+-----+----------+---------------+--------------+------------+--------------+-------------------+----------------+--------------+------------+-------------------+-------------------+
| City|      Date|Temperature (C)|Feels Like (C)|Humidity (%)|Pressure (hPa)|Weather Description|Wind Speed (m/s)|Cloudiness (%)|Rain (1h mm)|            Sunrise|             Sunset|
+-----+----------+---------------+--------------+------------+--------------+-------------------+----------------+--------------+------------+-------------------+-------------------+
|Delhi|2000-01-01|          16.47|         15.25|          33|           997|               cold|            2.58|            62|        2.53|2000-01-01 06:30:00|2000-01-01 18:30:00|
|Delhi|2000-01-01|           6.83|          8.02|          58|           997|               cold|            3.95|             3|        1.89|2000-01-01 06:30:00|2000-01-01 18:30:00|
|Delhi|2000-01-01|            4.4|          3.47|          64|          1025|        

In [13]:
# Removing 'Sunrise' and 'Sunset' columns
weather_data = weather_data.drop("Sunrise", "Sunset")

# Aggregating numerical columns
numerical_agg = weather_data.groupBy("Date", "City").agg(
    *[F.avg(F.col(col)).alias(col) for col in weather_data.columns if col not in ["Date", "Weather Description", "City"]]
)

# Finding the most frequent category for Weather Description
weather_description_count = weather_data.groupBy("Date", "Weather Description").count()

# Using a window function to rank weather descriptions by count for each date
window_spec = Window.partitionBy("Date").orderBy(F.desc("count"))
most_frequent_description = weather_description_count.withColumn(
    "rank", F.row_number().over(window_spec)
).filter(F.col("rank") == 1).select("Date", "Weather Description")

# Joining numerical aggregates with the most frequent Weather Description
final_weather_data = numerical_agg.join(most_frequent_description, on="Date", how="inner")

# Show the resulting DataFrame
final_weather_data.show()

+----------+-----+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|      Date| City|   Temperature (C)|    Feels Like (C)|      Humidity (%)|    Pressure (hPa)|  Wind Speed (m/s)|    Cloudiness (%)|      Rain (1h mm)|Weather Description|
+----------+-----+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|2000-01-01|Delhi|13.414423611111115|13.423902777777807| 50.34166666666667|          998.3875|2.5098333333333325|           39.2875|2.4560208333333313|               cold|
|2000-01-02|Delhi|13.526083333333345|13.521673611111115| 50.58958333333333|1000.7111111111111| 2.576916666666667| 41.62222222222222| 2.536604166666667|              windy|
|2000-01-03|Delhi|13.324187499999963|13.342222222222222| 50.09236111111111| 999.5020833333333|2.5229791666666666| 41.74791666666667|2.526923

In [14]:
# Aggregating numerical columns
final_electricity_data = electricity_data.groupBy("Date", "City").agg(
    *[
        F.avg(F.col(col)).alias(col)
        for col in electricity_data.columns
        if col not in ["Date", "City"]
    ]
)

# Show the resulting DataFrame
final_electricity_data.show()

+----------+-----+------------------+
|      Date| City|  Consumption (MW)|
+----------+-----+------------------+
|2000-01-19|Delhi|2498.4215277777776|
|2000-04-24|Delhi| 2501.273611111111|
|2000-07-23|Delhi|2502.6097222222224|
|2001-12-20|Delhi|2751.0645833333333|
|2002-01-11|Delhi|2996.2527777777777|
|2004-03-20|Delhi|3500.3458333333333|
|2004-04-21|Delhi|        3498.66875|
|2005-07-21|Delhi|3751.0715277777776|
|2005-07-25|Delhi|3748.7444444444445|
|2006-05-16|Delhi| 3998.720138888889|
|2006-12-28|Delhi|4001.2791666666667|
|2007-08-15|Delhi| 4250.251388888889|
|2007-08-29|Delhi| 4252.347916666667|
|2000-06-29|Delhi|2500.7805555555556|
|2001-08-11|Delhi|2749.6319444444443|
|2001-11-20|Delhi| 2751.246527777778|
|2002-05-20|Delhi|3000.2805555555556|
|2002-05-31|Delhi|3000.9861111111113|
|2002-10-26|Delhi|3001.0430555555554|
|2002-10-30|Delhi|3001.8118055555556|
+----------+-----+------------------+
only showing top 20 rows



In [15]:
# Merge the two DataFrames on the 'Date' column
merged_data = final_weather_data.join(final_electricity_data, on="Date", how="inner")

# Dropping one of the City columns, keeping only one City column
merged_data = merged_data.drop(final_electricity_data.City)

# Show the resulting merged DataFrame
merged_data.show()

+----------+-----+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+
|      Date| City|   Temperature (C)|    Feels Like (C)|      Humidity (%)|    Pressure (hPa)|  Wind Speed (m/s)|    Cloudiness (%)|      Rain (1h mm)|Weather Description|  Consumption (MW)|
+----------+-----+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+
|2000-01-01|Delhi|13.414423611111115|13.423902777777807| 50.34166666666667|          998.3875|2.5098333333333325|           39.2875|2.4560208333333313|               cold|2501.7951388888887|
|2000-01-02|Delhi|13.526083333333345|13.521673611111115| 50.58958333333333|1000.7111111111111| 2.576916666666667| 41.62222222222222| 2.536604166666667|              windy|2499.1965277777776|
|2000-01-03|Delhi|13.324187499999963|13.34222

In [16]:
# Extracting date-related features from the Date column
merged_data = merged_data.withColumn("Year", F.year(F.col("Date")))
merged_data = merged_data.withColumn("Month", F.month(F.col("Date")))
merged_data = merged_data.withColumn("DayOfWeek", F.dayofweek(F.col("Date")))

# Prepare Historical Data
# Use only Temperature (C) as the predictor
assembler = VectorAssembler(inputCols=["Temperature (C)"], outputCol="features")
temp_data = assembler.transform(merged_data).select("features", F.col("Consumption (MW)").alias("label"))

# Split into train and test sets
train_data, test_data = temp_data.randomSplit([0.8, 0.2], seed=42)

# Display the training and testing data counts
print(f"Training Data Count: {train_data.count()}")
print(f"Testing Data Count: {test_data.count()}")


Training Data Count: 7333
Testing Data Count: 1741


In [17]:
# # Train the model using GBTRegressor
# gbt = GBTRegressor(featuresCol="features", labelCol="label", maxIter=200, maxDepth=10)
# model = gbt.fit(train_data)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/content/spark-3.5.2-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/content/spark-3.5.2-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [18]:
from pyspark.ml.regression import RandomForestRegressor

# Train the Random Forest Regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="label", numTrees=100, maxDepth=10, seed=42)
model = rf.fit(train_data)

In [19]:
# Validate model performance
predictions = model.transform(test_data)
predictions.show()

+--------------------+------------------+-----------------+
|            features|             label|       prediction|
+--------------------+------------------+-----------------+
|[12.935465277777762]|          7997.875|5601.378920092127|
|[12.977972222222222]| 7999.406944444445|5601.378920092127|
|[13.003284722222201]| 2497.961111111111|5601.378920092127|
| [13.03618055555555]| 5002.302777777778|5601.378920092127|
|[13.068527777777803]| 2500.334027777778|5601.378920092127|
|[13.078625000000013]| 3497.509722222222|5601.378920092127|
|[13.097798611111127]| 4751.114583333333|5601.378920092127|
|[13.114430555555593]|3250.6993055555554|5601.378920092127|
|[13.138423611111104]| 7749.614583333333|5601.378920092127|
|[13.139916666666672]| 7500.459722222222|5601.378920092127|
|[13.140618055555533]|3248.7208333333333|5601.378920092127|
|[13.141555555555533]| 4750.328472222222|5601.378920092127|
|[13.143111111111107]| 4250.352083333333|5601.378920092127|
| [13.14550000000001]| 5251.978472222222

In [20]:
# Evaluate the Model on Test Data

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

Root Mean Squared Error (RMSE) on test data: 1762.6810338249086


In [21]:
# Load Future Data
future_csv_path = "/content/drive/MyDrive/DES_Project/DataSet/Predict_Date.csv"
future_data = spark.read.csv(future_csv_path, header=True, inferSchema=True)

In [22]:
# Prepare the features for future data from csv
assembler = VectorAssembler(inputCols=["Temp"], outputCol="features")  # Replace "Temp" with the exact column name
future_assembled = assembler.transform(future_data).select("Date", "Temp", "features")

In [23]:
# Forecast Consumption
predictions = model.transform(future_assembled)

In [24]:
# Select the required columns
predicted_data = predictions.select(
    "Date", "Temp", F.col("prediction").alias("Forecasted Avg Consumption (MW)")
)

In [25]:
# Show a preview of the saved data
predicted_data.show()

+----------+----------+-------------------------------+
|      Date|      Temp|Forecasted Avg Consumption (MW)|
+----------+----------+-------------------------------+
| 11/4/2024| 14.545566|              5236.020391956263|
| 11/5/2024| 15.142779|              5236.020391956263|
| 11/6/2024| 15.459622|              5236.020391956263|
| 11/7/2024| 15.641195|              5236.020391956263|
| 11/8/2024|  15.75013|              5236.020391956263|
| 11/9/2024| 15.817327|              5236.020391956263|
|11/10/2024| 15.859499|              5236.020391956263|
|11/11/2024| 15.886254|              5236.020391956263|
|11/12/2024| 15.903345|              5236.020391956263|
|11/13/2024| 15.914311|              5236.020391956263|
|11/14/2024|  15.92137|              5236.020391956263|
|11/15/2024|  15.92592|              5236.020391956263|
|11/16/2024| 15.928855|              5236.020391956263|
|11/17/2024| 15.930751|              5236.020391956263|
|11/18/2024| 15.931973|              5236.020391

In [26]:
# Save the Predictions to a New CSV
output_path = "/content/drive/MyDrive/DES_Project/DataSet/predicted_consumption.csv"  # Replace with desired output path
predicted_data.write.csv(output_path, header=True, mode="overwrite")

In [None]:
# # Generate the Next 30 Days
# last_date = merged_data.agg(F.max("Date").alias("last_date")).collect()[0]["last_date"]
# start_date = last_date
# future_dates_list = [(start_date + timedelta(days=i)) for i in range(1, 31)]
# future_dates_df = spark.createDataFrame(future_dates_list, DateType()).toDF("Date")

In [None]:
# # Add date-derived features
# future_dates_df = future_dates_df.withColumn("Year", F.year(F.col("Date")))
# future_dates_df = future_dates_df.withColumn("Month", F.month(F.col("Date")))
# future_dates_df = future_dates_df.withColumn("DayOfWeek", F.dayofweek(F.col("Date")))

In [None]:
# # Add placeholder values for weather features (adjust values as needed)
# future_dates_df = future_dates_df.withColumn("Temperature (C)", F.lit(25.0))  # Example value
# future_dates_df = future_dates_df.withColumn("Humidity (%)", F.lit(60.0))
# future_dates_df = future_dates_df.withColumn("Wind Speed (m/s)", F.lit(3.0))
# future_dates_df = future_dates_df.withColumn("Cloudiness (%)", F.lit(50.0))
# future_dates_df = future_dates_df.withColumn("Rain (1h mm)", F.lit(0.0))

In [None]:
# # Prepare Future Data for Prediction
# future_features = vector_assembler.transform(future_dates_df).select("Date", "features")

In [None]:
# # Forecast on Future Dates
# future_predictions = model.transform(future_features)
# forecast = future_predictions.select("Date", "prediction").withColumnRenamed("prediction", "Forecasted Avg Consumption (MW)")

In [None]:
# # Show the forecast for the next 30 days
# forecast.show()