In [1]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

Your runtime has 54.8 gigabytes of available RAM

You are using a high-RAM runtime!


In [2]:
!cat /proc/meminfo | grep MemTotal

MemTotal:       53470708 kB


In [3]:
import os
# Set Spark version
spark_version = 'spark-3.5.4'
os.environ['SPARK_VERSION'] = spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ReadBestModel").getOrCreate()

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Load the best trained model
import tensorflow as tf
from keras import backend as K

# Define the custom metric
def mse(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

# Load the pre-trained best model
best_model_path = "/content/drive/My Drive/TeamFiles/best_neural_network.h5"
best_model = tf.keras.models.load_model(best_model_path, custom_objects={"mse": mse})
print("Best model reloaded successfully!")

# Load cleaned data
file_path = "/content/drive/My Drive/TeamFiles/cleaned_data.parquet"
cleaned_df = spark.read.parquet(file_path)

print("Cleaned data loaded successfully!")
cleaned_df.show(5)


Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Hit:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Fetched 257 kB in 2s (126 kB/s)
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Drive already mounte



Best model reloaded successfully!
Cleaned data loaded successfully!
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+-----+---+
|FSA|      DATE|HOUR|CUSTOMER_TYPE|PRICE_PLAN|TOTAL_CONSUMPTION|PREMISE_COUNT|AVG_CONSUMPTION_PER_PREMISE|YEAR|MONTH|DAY|
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+-----+---+
|M1V|2024-07-01|   1|    SGS <50kW|    Tiered|           3125.6|         1716|         1.8214452214452215|2024|    7|  1|
|L7M|2024-07-01|   1|    SGS <50kW|       TOU|           1563.8|          683|         2.2896046852122987|2024|    7|  1|
|L9W|2024-07-01|   1|  Residential|    Tiered|            925.0|         1232|         0.7508116883116883|2024|    7|  1|
|M2J|2024-07-01|   1|    SGS <50kW|    Tiered|            598.0|          267|         2.2397003745318353|2024|    7|  1|
|K2C|2024-07-01|   1|  Residential|       TOU|           5095.2|         7714|

In [4]:
from pyspark.sql.functions import monotonically_increasing_id

# Add row_id BEFORE train-test split
cleaned_df = cleaned_df.withColumn("row_id", monotonically_increasing_id())

# Show first few rows to verify row_id
cleaned_df.show(5)


+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+-----+---+----------+
|FSA|      DATE|HOUR|CUSTOMER_TYPE|PRICE_PLAN|TOTAL_CONSUMPTION|PREMISE_COUNT|AVG_CONSUMPTION_PER_PREMISE|YEAR|MONTH|DAY|    row_id|
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+-----+---+----------+
|M1V|2024-07-01|   1|    SGS <50kW|    Tiered|           3125.6|         1716|         1.8214452214452215|2024|    7|  1|8589934592|
|L7M|2024-07-01|   1|    SGS <50kW|       TOU|           1563.8|          683|         2.2896046852122987|2024|    7|  1|8589934593|
|L9W|2024-07-01|   1|  Residential|    Tiered|            925.0|         1232|         0.7508116883116883|2024|    7|  1|8589934594|
|M2J|2024-07-01|   1|    SGS <50kW|    Tiered|            598.0|          267|         2.2397003745318353|2024|    7|  1|8589934595|
|K2C|2024-07-01|   1|  Residential|       TOU|           5095.2|     

In [5]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline

# Encode categorical columns
customer_type_indexer = StringIndexer(inputCol="CUSTOMER_TYPE", outputCol="CUSTOMER_TYPE_INDEX")
price_plan_indexer = StringIndexer(inputCol="PRICE_PLAN", outputCol="PRICE_PLAN_INDEX")

# Assemble feature columns (keeping all important features)
feature_columns = [
    "HOUR", "PREMISE_COUNT", "AVG_CONSUMPTION_PER_PREMISE",
    "CUSTOMER_TYPE_INDEX", "PRICE_PLAN_INDEX", "YEAR", "MONTH", "DAY"
]

# Assemble & scale features
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

# Create a pipeline
pipeline = Pipeline(stages=[customer_type_indexer, price_plan_indexer, assembler, scaler])

# Apply transformations
prepared_data = pipeline.fit(cleaned_df).transform(cleaned_df)

print("Data successfully transformed for prediction!")
prepared_data.show(5)


Data successfully transformed for prediction!
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+-----+---+----------+-------------------+----------------+--------------------+--------------------+
|FSA|      DATE|HOUR|CUSTOMER_TYPE|PRICE_PLAN|TOTAL_CONSUMPTION|PREMISE_COUNT|AVG_CONSUMPTION_PER_PREMISE|YEAR|MONTH|DAY|    row_id|CUSTOMER_TYPE_INDEX|PRICE_PLAN_INDEX|            features|     scaled_features|
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+-----+---+----------+-------------------+----------------+--------------------+--------------------+
|M1V|2024-07-01|   1|    SGS <50kW|    Tiered|           3125.6|         1716|         1.8214452214452215|2024|    7|  1|8589934592|                1.0|             1.0|[1.0,1716.0,1.821...|[0.0,0.0118100395...|
|L7M|2024-07-01|   1|    SGS <50kW|       TOU|           1563.8|          683|         2.2896046852122987|

In [6]:
prepared_data.write.partitionBy("MONTH").mode("overwrite").parquet("partitioned_data")

In [7]:
# Remove any previous Spark installation
!rm -rf spark-3.5.4-bin-hadoop3
!rm -rf /usr/local/lib/python*/dist-packages/pyspark
!rm -rf /usr/local/lib/python*/dist-packages/py4j

# Install Spark and Java again
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz
!tar xf spark-3.5.4-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [8]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# Start a new Spark session
spark = SparkSession.builder \
    .appName("PartitionedPredictions") \
    .config("spark.master", "local[*]") \
    .getOrCreate()

print("Spark restarted successfully!")


Spark restarted successfully!


In [9]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7ea28c36d910>


In [10]:
test_partition = "partitioned_data/MONTH=11"

try:
    print(f"Trying to load: {test_partition}")
    prepared_data = spark.read.format("parquet").load(test_partition)
    prepared_data.show(5)
except Exception as e:
    print(f"Error loading partition: {e}")


Trying to load: partitioned_data/MONTH=11
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+---+-----------+-------------------+----------------+--------------------+--------------------+
|FSA|      DATE|HOUR|CUSTOMER_TYPE|PRICE_PLAN|TOTAL_CONSUMPTION|PREMISE_COUNT|AVG_CONSUMPTION_PER_PREMISE|YEAR|DAY|     row_id|CUSTOMER_TYPE_INDEX|PRICE_PLAN_INDEX|            features|     scaled_features|
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+---+-----------+-------------------+----------------+--------------------+--------------------+
|N1G|2024-11-23|   8|    SGS <50kW|       TOU|           1881.1|          627|         3.0001594896331736|2024| 23|34360756377|                1.0|             0.0|[8.0,627.0,3.0001...|[0.30434782608695...|
|K2H|2024-11-16|  24|  Residential|       TOU|           7146.9|         9226|         0.7746477346629091|2024| 16|34360465529|   

In [11]:
import numpy as np
import pandas as pd

# Path where partitions are stored
partitioned_data_path = "partitioned_data"

# Get the list of available partitions
available_partitions = [folder for folder in os.listdir(partitioned_data_path) if folder.startswith("MONTH=")]
available_partitions.sort()  # Sort to maintain order

print(f"Found partitions: {available_partitions}")

# Initialize an empty list to store Pandas DataFrames
all_prepared_pandas = []

# Load each partition separately
for partition_folder in available_partitions:
    partition_path = f"{partitioned_data_path}/{partition_folder}"

    print(f"Loading partition: {partition_path}")

    try:
        # Read partitioned data from Spark
        prepared_data = spark.read.format("parquet").load(partition_path)

        # Convert Spark DataFrame to Pandas
        prepared_pandas = prepared_data.select(
            "row_id", "FSA", "DATE", "HOUR", "CUSTOMER_TYPE", "PRICE_PLAN",
            "PREMISE_COUNT", "TOTAL_CONSUMPTION", "scaled_features"
        ).toPandas()

        # Store in the list for merging later
        all_prepared_pandas.append(prepared_pandas)

        print(f"Successfully loaded partition: {partition_path}")

    except Exception as e:
        print(f"Error processing partition {partition_path}: {e}")

# Merge all partitions into one DataFrame (if needed)
if all_prepared_pandas:
    prepared_pandas = pd.concat(all_prepared_pandas, ignore_index=True)
    X_full = np.array(prepared_pandas["scaled_features"].tolist())
    print(f"Data ready for prediction! Shape: {X_full.shape}")
else:
    prepared_pandas = pd.DataFrame()
    print("No valid partitions found!")


Found partitions: ['MONTH=11', 'MONTH=4', 'MONTH=7']
Loading partition: partitioned_data/MONTH=11
Successfully loaded partition: partitioned_data/MONTH=11
Loading partition: partitioned_data/MONTH=4
Successfully loaded partition: partitioned_data/MONTH=4
Loading partition: partitioned_data/MONTH=7
Successfully loaded partition: partitioned_data/MONTH=7
Data ready for prediction! Shape: (4180454, 8)


In [12]:
import numpy as np

# Convert features into NumPy array
X_full = np.array(prepared_pandas["scaled_features"].tolist(), dtype=np.float32)

print(f"Feature array shape: {X_full.shape}")


Feature array shape: (4180454, 8)


In [13]:
# Load the best trained model
import tensorflow as tf
from keras import backend as K

# Define the custom metric
def mse(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

# Load the pre-trained best model
best_model_path = "/content/drive/My Drive/TeamFiles/best_neural_network.h5"
best_model = tf.keras.models.load_model(best_model_path, custom_objects={"mse": mse})
print("Best model reloaded successfully!")




Best model reloaded successfully!


In [14]:
# Make predictions
y_full_pred = best_model.predict(X_full)

# Convert predictions to Pandas DataFrame
predictions_pandas = pd.DataFrame({
    "row_id": prepared_pandas["row_id"],
    "PREDICTED_CONSUMPTION": y_full_pred.flatten()
})

print("Predictions successfully generated!")
print(predictions_pandas.head())

[1m130640/130640[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m145s[0m 1ms/step
Predictions successfully generated!
        row_id  PREDICTED_CONSUMPTION
0  34360756377            1864.750000
1  34360465529            7268.404297
2  34360756378            5176.888672
3  34360465530            9463.594727
4  34360756379            1097.583130


In [15]:
# Merge predictions with prepared data
merged_pandas = prepared_pandas.merge(predictions_pandas, on="row_id", how="left")

# Convert back to Spark DataFrame
final_cleaned_df = spark.createDataFrame(merged_pandas)

print("Predictions merged successfully into the dataset!")


Predictions merged successfully into the dataset!


In [16]:
best_model.summary()

In [17]:
final_cleaned_df.printSchema()


root
 |-- row_id: long (nullable = true)
 |-- FSA: string (nullable = true)
 |-- DATE: date (nullable = true)
 |-- HOUR: long (nullable = true)
 |-- CUSTOMER_TYPE: string (nullable = true)
 |-- PRICE_PLAN: string (nullable = true)
 |-- PREMISE_COUNT: long (nullable = true)
 |-- TOTAL_CONSUMPTION: double (nullable = true)
 |-- scaled_features: vector (nullable = true)
 |-- PREDICTED_CONSUMPTION: double (nullable = true)

