<a href="https://colab.research.google.com/github/Yuhan3636/Homework1/blob/main/Yuhan_Hu_Final_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Fitting the model #

In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField, LongType
from pyspark.sql.functions import col, stddev, cast

# Initialize Spark
spark = SparkSession.builder \
    .appName("PowerConsumptionPrediction") \
    .getOrCreate()

# 1. Load and prepare data with proper type conversion
data_url = "https://www4.stat.ncsu.edu/online/datasets/power_ml_data.csv"
pdf = pd.read_csv(data_url)

# Convert to Spark DataFrame
df = spark.createDataFrame(pdf)

# Check schema and convert any LongType to DoubleType
print("Original Schema:")
df.printSchema()

# Convert all numeric columns to DoubleType
for field in df.schema.fields:
    if isinstance(field.dataType, (LongType, IntegerType)):
        df = df.withColumn(field.name, col(field.name).cast(DoubleType()))

print("\nModified Schema:")
df.printSchema()

Original Schema:
root
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind_Speed: double (nullable = true)
 |-- General_Diffuse_Flows: double (nullable = true)
 |-- Diffuse_Flows: double (nullable = true)
 |-- Power_Zone_1: double (nullable = true)
 |-- Power_Zone_2: double (nullable = true)
 |-- Power_Zone_3: double (nullable = true)
 |-- Month: long (nullable = true)
 |-- Hour: long (nullable = true)


Modified Schema:
root
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Wind_Speed: double (nullable = true)
 |-- General_Diffuse_Flows: double (nullable = true)
 |-- Diffuse_Flows: double (nullable = true)
 |-- Power_Zone_1: double (nullable = true)
 |-- Power_Zone_2: double (nullable = true)
 |-- Power_Zone_3: double (nullable = true)
 |-- Month: double (nullable = true)
 |-- Hour: double (nullable = true)



# Data Summerization #

In [48]:
# Numerical summaries
numeric_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, (DoubleType, IntegerType))]
summary = df.select(numeric_cols).summary("count", "mean", "stddev", "min", "max", "50%")
summary.show()

# Correlations
corr_matrix = df.select(numeric_cols).toPandas().corr()
print("Correlation Matrix:")
print(corr_matrix)

# Contingency tables
print("One-way table for Month:")
df.groupBy("Month").count().orderBy("Month").show()

print("One-way table for Hour:")
df.groupBy("Hour").count().orderBy("Hour").show()

print("Two-way table for Month and Hour:")
df.groupBy("Month", "Hour").count().orderBy("Month", "Hour").show()

# Group by Month and calculate means and stddevs
print("Means by Month:")
df.groupBy("Month").mean().show()

print("Standard Deviations by Month:")
df.groupBy("Month").agg(*[stddev(col(c)).alias(c) for c in numeric_cols]).show()


+-------+------------------+------------------+------------------+---------------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+
|summary|       Temperature|          Humidity|        Wind_Speed|General_Diffuse_Flows|     Diffuse_Flows|     Power_Zone_1|     Power_Zone_2|     Power_Zone_3|             Month|              Hour|
+-------+------------------+------------------+------------------+---------------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+
|  count|             47174|             47174|             47174|                47174|             47174|            47174|            47174|            47174|             47174|             47174|
|   mean|18.813219803281488| 68.28839827023333| 1.961621401619489|   182.53118043838074| 74.98721140882569|32335.16869049581|21027.20497598018|17831.19760781678| 6.510599058803578|11.488383431551279|


# Data Preparation #

In [51]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    Binarizer,
    StringIndexer,
    OneHotEncoder,
    VectorAssembler,
    PCA
)
from pyspark.ml.regression import LinearRegression


# Convert Hour to DoubleType if needed
if isinstance(df.schema["Hour"].dataType, IntegerType):
    df = df.withColumn("Hour", col("Hour").cast(DoubleType()))

# 4. Pipeline Setup
# Binarize Hour (night vs day)
binarizer = Binarizer(inputCol="Hour", outputCol="is_night", threshold=6.5)

# One-hot encode Month
month_indexer = StringIndexer(inputCol="Month", outputCol="MonthIndex")
month_encoder = OneHotEncoder(inputCol="MonthIndex", outputCol="MonthVec")

# PCA for selected features
pca_cols = ["Temperature", "Humidity", "Wind_Speed", "General_Diffuse_Flows", "Diffuse_Flows"]
vec_assembler = VectorAssembler(inputCols=pca_cols, outputCol="features_for_pca")
pca = PCA(k=2, inputCol="features_for_pca", outputCol="pca_features")

# Final feature assembly
final_features = ["pca_features", "is_night", "Power_Zone_1", "Power_Zone_2", "MonthVec"]
final_assembler = VectorAssembler(inputCols=final_features, outputCol="features")

# Rename response variable
df = df.withColumnRenamed("Power_Zone_3", "label")

# Define the pipeline
pipeline = Pipeline(stages=[
    binarizer,
    month_indexer,
    month_encoder,
    vec_assembler,
    pca,
    final_assembler
])

# Fit the pipeline
preprocess_model = pipeline.fit(df)
processed_data = preprocess_model.transform(df)

# Model Training #

In [56]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator


# Define Elastic Net model
lr = LinearRegression(featuresCol="features", labelCol="label")

# Parameter grid for cross-validation
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1]) \
    .addGrid(lr.elasticNetParam, [0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1]) \
    .build()

# Evaluator
evaluator = RegressionEvaluator(metricName="rmse")

# Cross-validator
cv = CrossValidator(estimator=lr,
                   estimatorParamMaps=param_grid,
                   evaluator=evaluator,
                   numFolds=5)

# Fit the model
cv_model = cv.fit(processed_data)

# Get best model
best_model = cv_model.bestModel

# 6. Evaluate on training data
predictions = best_model.transform(processed_data)
rmse = evaluator.evaluate(predictions)
print(f"Training RMSE: {rmse}")

# Add residual column
predictions = predictions.withColumn("residual", col("label") - col("prediction"))

# Save the complete model (preprocessing + regression)
from pyspark.ml import PipelineModel
full_model = PipelineModel(stages=[preprocess_model, best_model])
full_model.save("power_ml_model")

Training RMSE: 2147.0977406268803


# Streaming Part #

# Reading a stream #

In [81]:
import pandas as pd
import time
import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, PCA
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.ml import PipelineModel
import pyspark.sql.functions as F


In [82]:
spark = SparkSession.builder \
    .appName("PowerStreamingFinal") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()


# Transform/Aggregation Step #

In [83]:
# Load static dataset
static_df = pd.read_csv("/content/power_streaming_data.csv")

# Create streaming folder
streaming_folder = "/content/streaming_folder"
os.makedirs(streaming_folder, exist_ok=True)

# Save one initial batch
static_df.sample(100, random_state=1).to_csv(f"{streaming_folder}/batch1.csv", index=False)

# Load into Spark
static_spark_df = spark.createDataFrame(static_df)

# Cast columns
for colname in ["Temperature", "Humidity", "Wind_Speed", "General_Diffuse_Flows", "Diffuse_Flows",
                "Power_Zone_1", "Power_Zone_2", "Power_Zone_3", "Hour"]:
    static_spark_df = static_spark_df.withColumn(colname, F.col(colname).cast(DoubleType()))
static_spark_df = static_spark_df.withColumn("Month", F.col("Month").cast(IntegerType()))

# Assemble weather features
weather_cols = ["Temperature", "Humidity", "Wind_Speed", "General_Diffuse_Flows", "Diffuse_Flows"]
weather_assembler = VectorAssembler(inputCols=weather_cols, outputCol="weather_features")
static_weather_df = weather_assembler.transform(static_spark_df)

# Fit PCA
pca = PCA(k=3, inputCol="weather_features", outputCol="pca_features")
pca_model = pca.fit(static_weather_df)
static_weather_df = pca_model.transform(static_weather_df)

# Binarize Hour
static_weather_df = static_weather_df.withColumn("Hour_Binary", (F.col("Hour") < 6.5).cast(IntegerType()))

# Month encoding
month_indexer = StringIndexer(inputCol="Month", outputCol="Month_Index")
month_indexer_model = month_indexer.fit(static_weather_df)
month_encoded = month_indexer_model.transform(static_weather_df)

month_encoder = OneHotEncoder(inputCols=["Month_Index"], outputCols=["Month_OHE"])
month_encoder_model = month_encoder.fit(month_encoded)
month_encoded = month_encoder_model.transform(month_encoded)

# Assemble final features
final_assembler = VectorAssembler(
    inputCols=["pca_features", "Hour_Binary", "Power_Zone_1", "Power_Zone_2", "Month_OHE"],
    outputCol="features"
)
static_final = final_assembler.transform(month_encoded)

# Train ElasticNet Model
lr = LinearRegression(featuresCol="features", labelCol="Power_Zone_3", elasticNetParam=0.5, regParam=0.1)
best_model = lr.fit(static_final)

# Save model
best_model.write().overwrite().save("/content/power_consumption_model")

print("Model trained and saved successfully!")


Model trained and saved successfully!


In [84]:
# Load saved model
model = LinearRegressionModel.load("/content/power_consumption_model")
print("Model loaded successfully!")


Model loaded successfully!


In [86]:
streaming_schema = StructType([
    StructField("Temperature", DoubleType(), True),
    StructField("Humidity", DoubleType(), True),
    StructField("Wind_Speed", DoubleType(), True),
    StructField("General_Diffuse_Flows", DoubleType(), True),
    StructField("Diffuse_Flows", DoubleType(), True),
    StructField("Power_Zone_1", DoubleType(), True),
    StructField("Power_Zone_2", DoubleType(), True),
    StructField("Power_Zone_3", DoubleType(), True),
    StructField("Month", IntegerType(), True),
    StructField("Hour", DoubleType(), True)
])


In [87]:
def process_batch(df, epoch_id):
    try:
        # Feature engineering
        df = df.withColumn("Hour_Binary", (F.col("Hour") < 6.5).cast(IntegerType()))
        df = weather_assembler.transform(df)
        df = pca_model.transform(df)
        df = month_indexer_model.transform(df)
        df = month_encoder_model.transform(df)
        df = final_assembler.transform(df)

        # Prepare for prediction
        prepared_df = df.withColumn("label", col("Power_Zone_3"))
        predictions = model.transform(prepared_df)

        # Create residual
        predictions = predictions.withColumn("residual", col("label") - col("prediction"))


        print(f"\n=== Predictions for Batch {epoch_id} ===")
        predictions.select("label", "prediction", "residual").show(5, truncate=False)

    except Exception as e:
        print(f"Processing error: {str(e)}")


# Writing Step #

In [88]:
query = (spark.readStream
         .schema(streaming_schema)
         .option("header", "true")
         .option("maxFilesPerTrigger", 1)   # <= important fix
         .csv("/content/streaming_folder")
         .writeStream
         .foreachBatch(process_batch)
         .outputMode("update")
         .start())

print("Streaming started! Now run producer to create files.")


Streaming started! Now run producer to create files.


# Prodcue Data #

In [None]:
# Produce 50 small CSV batches manually

for i in range(50):
    static_df.sample(3).to_csv(f"/content/streaming_folder/batch_from_loop_{i}.csv", index=False)
    print(f" Batch {i+1}/50: batch_from_loop_{i}.csv created.")
    time.sleep(10)  # Pause 10 seconds between batches

print("Finished creating all 50 batches!")


 Batch 1/50: batch_from_loop_0.csv created.
+-----------+------------------+-------------------+
|label      |prediction        |residual           |
+-----------+------------------+-------------------+
|10867.04453|11841.095620648875|-974.0510906488762 |
|14504.09639|13277.815225982691|1226.2811640173095 |
|11776.0    |15382.783092625416|-3606.7830926254155|
+-----------+------------------+-------------------+


=== Predictions for Batch 1 ===
+-----------+-----------------+-------------------+
|label      |prediction       |residual           |
+-----------+-----------------+-------------------+
|9334.933974|11559.40560330067|-2224.4716293006695|
|21174.16928|22545.19852689332|-1371.0292468933221|
|20521.45749|20985.26678692278|-463.80929692278005|
+-----------+-----------------+-------------------+


=== Predictions for Batch 2 ===
+-----------+------------------+------------------+
|label      |prediction        |residual          |
+-----------+------------------+-----------------

In [None]:
query.stop()