In [None]:
from pyspark.sql import SparkSession
import sys
from pyspark.sql import functions as F


# Create a SparkSession with the required packages
spark = SparkSession.builder \
    .appName("Data_serving_LSTM") \
    .config("spark.executor.extraPythonPackages", "spark_tensorflow_distributor,tensorflow") \
    .config("spark.driver.extraPythonPackages", "spark_tensorflow_distributor,tensorflow") \
    .config("spark.executorEnv.PYTHONPATH", ":".join(sys.path)) \
    .getOrCreate()

spark.sparkContext.setLogLevel("INFO")


df = spark.read.parquet("hdfs:///project/cleaned_data_parquet")

df.show()

In [None]:
from pyspark.sql import functions as F

total_count = df.count()

train_size = int(total_count * 0.6)
val_size = int(total_count * 0.2)
test_size = int(total_count * 0.2)

train_df = df.limit(train_size).orderBy(F.col('date').asc())
val_df = df.limit(train_size + val_size).subtract(train_df).orderBy(F.col('date').asc())
test_df = df.subtract(train_df).subtract(val_df).orderBy(F.col('date').asc())

# Show sizes of the splits
print(f"Train size: {train_df.count()}")
print(f"Validation size: {val_df.count()}")
print(f"Test size: {test_df.count()}")  


train_df.show()
#Print one row of the dataframe
print(train_df.first())


In [None]:
from pyspark.sql import DataFrame
from datetime import datetime

def create_sequences(df, input_length=60, output_length=30):
    sequences = []
    targets = []
    
    data = df.collect()

    for i in range(len(data) - input_length - output_length + 1):
        sequence = [[data[j][1], round(data[j][2], 2), round(data[j][3], 2)] for j in range(i, i + input_length)]
        sequences.append(sequence)
        
        target = [data[i + input_length + j][1] for j in range(output_length)] 
        targets.append(target)
    
    return sequences, targets

x_train, y_train = create_sequences(train_df)

x_val, y_val = create_sequences(val_df)

x_test, y_test = create_sequences(test_df)

print(x_train[-1])
print(y_train[-1])

## LSTM

In [None]:
import numpy as np
from spark_tensorflow_distributor import MirroredStrategyRunner
import tensorflow as tf
import os
import matplotlib.pyplot as plt 
import time
from pyspark.accumulators import AccumulatorParam
import json

class ListAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return []

    def addInPlace(self, value1, value2):
        value1.extend(value2)
        return value1

class DistributedTrainingCallback(tf.keras.callbacks.Callback):
    def __init__(self, accumulator):
        super().__init__()
        self.accumulator = accumulator
            
    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        epoch_info = {
            "epoch": epoch + 1,
            "loss": float(logs.get('loss', 0)),
            "val_loss": float(logs.get('val_loss', 0))
        }
        self.accumulator.add([epoch_info])

# Create accumulator before training
training_accumulator = spark.sparkContext.accumulator([], ListAccumulatorParam())

def train():
    BATCH_SIZE = 64
    EPOCHS = 200

    def make_datasets():
        global x_train, y_train, x_val, y_val
        
        train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(BATCH_SIZE)
        val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val)).batch(BATCH_SIZE)
        
        options = tf.data.Options()
        options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
        train_dataset = train_dataset.with_options(options)
        val_dataset = val_dataset.with_options(options)
        
        return train_dataset, val_dataset

    def make_test_dataset():
        global x_test, y_test
        test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(BATCH_SIZE)
        return test_dataset

    # https://medium.com/@sebastienwebdev/forecasting-weather-patterns-with-lstm-a-python-guide-without-dates-433f0356136c
    def build_and_compile_lstm_model():
        model = tf.keras.Sequential([
            tf.keras.layers.LSTM(50, return_sequences=True, input_shape=(60, 3)),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.LSTM(50),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(30)
        ])
        model.compile(
            loss="mean_squared_error",
            optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
            metrics=["mae", "mse"]
        )
        return model

    # Load datasets
    train_datasets, val_datasets = make_datasets()
    test_datasets = make_test_dataset()

    # Build model
    model = build_and_compile_lstm_model()
    
    # Early stopping callback to prevent overfitting
    early_stopping_cb = tf.keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=50,
        restore_best_weights=True
    )
    
    distributed_callback = DistributedTrainingCallback(training_accumulator)

    # Train model
    model.fit(
        x=train_datasets, 
        epochs=EPOCHS, 
        verbose=1,
        validation_data=val_datasets,
        callbacks=[early_stopping_cb, distributed_callback]
    )

    # Evaluate and calculate metrics
    results= model.evaluate(test_datasets, return_dict=True)
    y_pred = model.predict(test_datasets)
    
    return {
        "MAE": float(results["mae"]),
        "MSE": float(results["mse"]),
        "RMSE": np.sqrt(results["mse"]),
        "y_pred": y_pred
    }

start_time = time.time()

# Run distributed training
runner = MirroredStrategyRunner(num_slots=3, use_gpu=False)
metrics_results = runner.run(train)

end_time = time.time()
execution_time = end_time - start_time

print(f"Execution time: {execution_time}")

In [None]:
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np

training_info = training_accumulator.value
print(f"Training info: {training_info}")

print("\nMetrics Results:")
print("MAE: ", metrics_results["MAE"])
print("MSE: ", metrics_results["MSE"])
print("RMSE: ", metrics_results["RMSE"])

y_pred = np.array(metrics_results["y_pred"])
true_values = test_df.select("temperature").collect()
true_values = true_values[60:]
dates = test_df.select("date").collect()
dates = dates[60:]
dates = [datetime.strptime(date[0], '%Y-%m-%d') for date in dates]

predictions = {}
y_pred_mean = []
for i in range(len(y_pred)):
    for j in range(len(y_pred[i])):
        if dates[i+j] not in predictions:
            predictions[dates[i+j]] = []
        predictions[dates[i+j]].append(y_pred[i][j])

all_pred_dates = []
all_predictions = []
count_correct = 0
count_total = 0

count = 0
for date, preds in predictions.items():
    y_pred_mean.append(np.mean(preds))
    
    true_value = true_values[count][0]
    for pred in preds:
        all_pred_dates.append(date)
        all_predictions.append(pred)
        
        if abs(true_value - pred) <= 1:
            count_correct += 1
        count_total += 1
    
    count += 1

accuracy = count_correct / count_total
print("Accuracy: ", accuracy)

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 12), sharex=True)

ax1.plot(dates, [tv[0] for tv in true_values], 
         label="Actual", 
         color="blue", 
         linewidth=1.5)
ax1.plot(dates, y_pred_mean, 
         color="red", 
         label="Mean Predictions", 
         linewidth=1.5)

ax1.set_ylabel("Temperature", fontsize=12)
ax1.set_title("Temperature Over Time: Actual vs Mean Predictions", 
              fontsize=14, 
              pad=10)

ax1.legend()

ax2.plot(dates, [tv[0] for tv in true_values], 
         label="Actual", 
         color="blue", 
         linewidth=1.5)
ax2.scatter(all_pred_dates, all_predictions, 
           color="red", 
           alpha=1, 
           s=20, 
           label="Individual Predictions")

ax2.set_xlabel("Date", fontsize=12)
ax2.set_ylabel("Temperature", fontsize=12)
ax2.set_title("Temperature Over Time: Actual vs Individual Predictions", 
              fontsize=14, 
              pad=10)

ax2.legend()

plt.xticks(ticks=dates[::90], 
           labels=[date.strftime('%Y-%m-%d') for date in dates[::90]], 
           rotation=45)

plt.tight_layout()

plt.show()

# spark.stop()