In [1]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

Your runtime has 54.8 gigabytes of available RAM

You are using a high-RAM runtime!


In [2]:
!cat /proc/meminfo | grep MemTotal

MemTotal:       53470708 kB


In [3]:
import os
# Set Spark version
spark_version = 'spark-3.5.4'
os.environ['SPARK_VERSION'] = spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ReadBestModel").getOrCreate()

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Load the best trained model
import tensorflow as tf
from keras import backend as K

# Define the custom metric
def mse(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

# Load the pre-trained best model
best_model_path = "/content/drive/My Drive/TeamFiles/best_neural_network.h5"
best_model = tf.keras.models.load_model(best_model_path, custom_objects={"mse": mse})
print("Best model reloaded successfully!")

# Load cleaned data
file_path = "/content/drive/My Drive/TeamFiles/cleaned_data.parquet"
cleaned_df = spark.read.parquet(file_path)

print("Cleaned data loaded successfully!")
cleaned_df.show(5)


0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,661 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:10 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,610 kB]
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:12 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:13 http://archive.ubuntu.com/ubun



Best model reloaded successfully!
Cleaned data loaded successfully!
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+-----+---+
|FSA|      DATE|HOUR|CUSTOMER_TYPE|PRICE_PLAN|TOTAL_CONSUMPTION|PREMISE_COUNT|AVG_CONSUMPTION_PER_PREMISE|YEAR|MONTH|DAY|
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+-----+---+
|M1V|2024-07-01|   1|    SGS <50kW|    Tiered|           3125.6|         1716|         1.8214452214452215|2024|    7|  1|
|L7M|2024-07-01|   1|    SGS <50kW|       TOU|           1563.8|          683|         2.2896046852122987|2024|    7|  1|
|L9W|2024-07-01|   1|  Residential|    Tiered|            925.0|         1232|         0.7508116883116883|2024|    7|  1|
|M2J|2024-07-01|   1|    SGS <50kW|    Tiered|            598.0|          267|         2.2397003745318353|2024|    7|  1|
|K2C|2024-07-01|   1|  Residential|       TOU|           5095.2|         7714|

In [4]:
from pyspark.sql.functions import monotonically_increasing_id

# Add row_id BEFORE train-test split
cleaned_df = cleaned_df.withColumn("row_id", monotonically_increasing_id())

# Show first few rows to verify row_id
cleaned_df.show(5)


+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+-----+---+----------+
|FSA|      DATE|HOUR|CUSTOMER_TYPE|PRICE_PLAN|TOTAL_CONSUMPTION|PREMISE_COUNT|AVG_CONSUMPTION_PER_PREMISE|YEAR|MONTH|DAY|    row_id|
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+-----+---+----------+
|M1V|2024-07-01|   1|    SGS <50kW|    Tiered|           3125.6|         1716|         1.8214452214452215|2024|    7|  1|8589934592|
|L7M|2024-07-01|   1|    SGS <50kW|       TOU|           1563.8|          683|         2.2896046852122987|2024|    7|  1|8589934593|
|L9W|2024-07-01|   1|  Residential|    Tiered|            925.0|         1232|         0.7508116883116883|2024|    7|  1|8589934594|
|M2J|2024-07-01|   1|    SGS <50kW|    Tiered|            598.0|          267|         2.2397003745318353|2024|    7|  1|8589934595|
|K2C|2024-07-01|   1|  Residential|       TOU|           5095.2|     

In [5]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline

# Encode categorical columns
customer_type_indexer = StringIndexer(inputCol="CUSTOMER_TYPE", outputCol="CUSTOMER_TYPE_INDEX")
price_plan_indexer = StringIndexer(inputCol="PRICE_PLAN", outputCol="PRICE_PLAN_INDEX")

# Assemble feature columns (keeping all important features)
feature_columns = [
    "HOUR", "PREMISE_COUNT", "AVG_CONSUMPTION_PER_PREMISE",
    "CUSTOMER_TYPE_INDEX", "PRICE_PLAN_INDEX", "YEAR", "MONTH", "DAY"
]

# Assemble & scale features
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

# Create a pipeline
pipeline = Pipeline(stages=[customer_type_indexer, price_plan_indexer, assembler, scaler])

# Apply transformations
prepared_data = pipeline.fit(cleaned_df).transform(cleaned_df)

print("Data successfully transformed for prediction!")
prepared_data.show(5)


Data successfully transformed for prediction!
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+-----+---+----------+-------------------+----------------+--------------------+--------------------+
|FSA|      DATE|HOUR|CUSTOMER_TYPE|PRICE_PLAN|TOTAL_CONSUMPTION|PREMISE_COUNT|AVG_CONSUMPTION_PER_PREMISE|YEAR|MONTH|DAY|    row_id|CUSTOMER_TYPE_INDEX|PRICE_PLAN_INDEX|            features|     scaled_features|
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+-----+---+----------+-------------------+----------------+--------------------+--------------------+
|M1V|2024-07-01|   1|    SGS <50kW|    Tiered|           3125.6|         1716|         1.8214452214452215|2024|    7|  1|8589934592|                1.0|             1.0|[1.0,1716.0,1.821...|[0.0,0.0118100395...|
|L7M|2024-07-01|   1|    SGS <50kW|       TOU|           1563.8|          683|         2.2896046852122987|

In [6]:
prepared_data.write.partitionBy("MONTH").mode("overwrite").parquet("partitioned_data")

In [7]:
# Remove any previous Spark installation
!rm -rf spark-3.5.4-bin-hadoop3
!rm -rf /usr/local/lib/python*/dist-packages/pyspark
!rm -rf /usr/local/lib/python*/dist-packages/py4j

# Install Spark and Java again
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz
!tar xf spark-3.5.4-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [8]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# Start a new Spark session
spark = SparkSession.builder \
    .appName("PartitionedPredictions") \
    .config("spark.master", "local[*]") \
    .getOrCreate()

print("Spark restarted successfully!")


Spark restarted successfully!


In [9]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7c4878301f50>


In [10]:
test_partition = "partitioned_data/MONTH=11"

try:
    print(f"Trying to load: {test_partition}")
    prepared_data = spark.read.format("parquet").load(test_partition)
    prepared_data.show(5)
except Exception as e:
    print(f"Error loading partition: {e}")


Trying to load: partitioned_data/MONTH=11
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+---+----------+-------------------+----------------+--------------------+--------------------+
|FSA|      DATE|HOUR|CUSTOMER_TYPE|PRICE_PLAN|TOTAL_CONSUMPTION|PREMISE_COUNT|AVG_CONSUMPTION_PER_PREMISE|YEAR|DAY|    row_id|CUSTOMER_TYPE_INDEX|PRICE_PLAN_INDEX|            features|     scaled_features|
+---+----------+----+-------------+----------+-----------------+-------------+---------------------------+----+---+----------+-------------------+----------------+--------------------+--------------------+
|N0N|2024-11-02|  23|    SGS <50kW|    Tiered|            141.7|           96|         1.4760416666666665|2024|  2|8591461592|                1.0|             1.0|[23.0,96.0,1.4760...|[0.95652173913043...|
|L6R|2024-11-01|   1|  Residential|    Tiered|            697.2|         1177|         0.5923534409515718|2024|  1|8591371119|        

In [11]:
import numpy as np
import pandas as pd

# Path where partitions are stored
partitioned_data_path = "partitioned_data"

# Get the list of available partitions
available_partitions = [folder for folder in os.listdir(partitioned_data_path) if folder.startswith("MONTH=")]
available_partitions.sort()  # Sort to maintain order

print(f"Found partitions: {available_partitions}")

# Initialize an empty list to store Pandas DataFrames
all_prepared_pandas = []

# Load each partition separately
for partition_folder in available_partitions:
    partition_path = f"{partitioned_data_path}/{partition_folder}"

    print(f"Loading partition: {partition_path}")

    try:
        # Read partitioned data from Spark
        prepared_data = spark.read.format("parquet").load(partition_path)

        # Convert Spark DataFrame to Pandas
        prepared_pandas = prepared_data.select(
            "row_id", "FSA", "DATE", "HOUR", "CUSTOMER_TYPE", "PRICE_PLAN",
            "PREMISE_COUNT", "TOTAL_CONSUMPTION", "scaled_features"
        ).toPandas()

        # Store in the list for merging later
        all_prepared_pandas.append(prepared_pandas)

        print(f"Successfully loaded partition: {partition_path}")

    except Exception as e:
        print(f"Error processing partition {partition_path}: {e}")

# Merge all partitions into one DataFrame (if needed)
if all_prepared_pandas:
    prepared_pandas = pd.concat(all_prepared_pandas, ignore_index=True)
    X_full = np.array(prepared_pandas["scaled_features"].tolist())
    print(f"Data ready for prediction! Shape: {X_full.shape}")
else:
    prepared_pandas = pd.DataFrame()
    print("No valid partitions found!")


Found partitions: ['MONTH=11', 'MONTH=4', 'MONTH=7']
Loading partition: partitioned_data/MONTH=11
Successfully loaded partition: partitioned_data/MONTH=11
Loading partition: partitioned_data/MONTH=4
Successfully loaded partition: partitioned_data/MONTH=4
Loading partition: partitioned_data/MONTH=7
Successfully loaded partition: partitioned_data/MONTH=7
Data ready for prediction! Shape: (4180454, 8)


In [12]:
import numpy as np

# Convert features into NumPy array
X_full = np.array(prepared_pandas["scaled_features"].tolist(), dtype=np.float32)

print(f"Feature array shape: {X_full.shape}")


Feature array shape: (4180454, 8)


In [13]:
# Load the best trained model
import tensorflow as tf
from keras import backend as K

# Define the custom metric
def mse(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

# Load the pre-trained best model
best_model_path = "/content/drive/My Drive/TeamFiles/best_neural_network.h5"
best_model = tf.keras.models.load_model(best_model_path, custom_objects={"mse": mse})
print("Best model reloaded successfully!")




Best model reloaded successfully!


In [14]:
# Make predictions
y_full_pred = best_model.predict(X_full)

# Convert predictions to Pandas DataFrame
predictions_pandas = pd.DataFrame({
    "row_id": prepared_pandas["row_id"],
    "PREDICTED_CONSUMPTION": y_full_pred.flatten()
})

print("Predictions successfully generated!")
print(predictions_pandas.head())

[1m130640/130640[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m140s[0m 1ms/step
Predictions successfully generated!
       row_id  PREDICTED_CONSUMPTION
0  8591461592             107.067856
1  8591371119             702.102112
2  8591461593            5241.788086
3  8591371120             861.344971
4  8591461594             788.427124


In [15]:
# Merge predictions with prepared data
merged_pandas = prepared_pandas.merge(predictions_pandas, on="row_id", how="left")

# Drop row_id as it's no longer needed
merged_pandas = merged_pandas.drop(columns=["row_id"])

# Convert back to Spark DataFrame
final_cleaned_df = spark.createDataFrame(merged_pandas)

print("Predictions merged successfully into the dataset!")


Predictions merged successfully into the dataset!


In [24]:
best_model.summary()

In [None]:
from pyspark.sql.functions import col

# Convert `scaled_features` to a string format
final_cleaned_df = final_cleaned_df.withColumn("scaled_features", col("scaled_features").cast("string"))

# Define CSV save path
local_csv_path = "/content/final_cleaned_data_with_predictions.csv"

# Save to CSV
final_cleaned_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(local_csv_path)

print(f"CSV saved locally: {local_csv_path}")


In [None]:
from google.colab import files
files.download(local_csv_path)


In [None]:
print(f"Total rows in combined dataset: {final_cleaned_df.count()}")