In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [12]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.mllib.evaluation import RegressionMetrics
import sys
from pyspark.sql.functions import when
from pyspark.sql.functions import col
import matplotlib.pyplot as plt

spark = SparkSession.builder.appName("Jena_Climate").getOrCreate()

In [13]:
#load data
def load_data():
    df=spark.read.option("header", "true").csv("jena_climate_2009_2016.csv")
    return df

#Use MinMaxScaler to get data between 0 and 1
def preprocess_data(df):
    #consider temperature as feature to predict weather
    df_ftrs=df.select(['T (degC)'])
    #normalize data
    vector_asblr = VectorAssembler(inputCols=df_ftrs.columns, outputCol="x")
    norm_scalar = MinMaxScaler(inputCol="x", outputCol="y")
    pipe_line = Pipeline(stages=[vector_asblr, norm_scalar])
    scale_model = pipe_line.fit(df_ftrs)
    scale_features = scale_model.transform(df_ftrs)

    #separate feature columns
    scale_features = scale_features.select(create_lst_from_arr(F.col("y")).alias("split_ftrs")).select([F.col("split_ftrs")[f].alias("T_Scaled") for f in range(len(df_ftrs.columns))])

    return scale_features

# split array
def create_lst_from_arr(colum):
    def create_lst(vec):
        return vec.toArray().tolist()
    return F.udf(create_lst, ArrayType(DoubleType()))(colum)

#Preparing dataset based on time sequence
def prepare_data(wt_data, time_sequence = 1):
    i=0
    return_data = []
    wt_data = [z[0] for z in wt_data.select('T_Scaled').toLocalIterator()]
    while i < len(wt_data) - time_sequence - 1:
       return_data.append((wt_data[i:(i + time_sequence)], wt_data[i + time_sequence]))
       i += 1
    return return_data

#Split 80 % train and 20 % test
def train_test_split(wt_data,  time_sequence):
    total_smpls = wt_data.count()
    train_data_size=int(total_smpls*0.80)

    traind = wt_data.limit(train_data_size)
    testd  = wt_data.subtract(traind)

    traind = spark.createDataFrame(prepare_data(traind), ['X','Y'], time_sequence)
    testd = spark.createDataFrame(prepare_data(testd), ['X','Y'],  )

    return traind, testd

In [14]:
#perform forward propagation
def frwd_prop(inp, inp_w, outp_w, hid_w):
    i=0
    # out_p=[]
    state = []
    state.append(np.zeros((hd_lyr, 1)))
    while i < inp.shape[0]:
        z = (inp_w @ inp[[i]].T) + (hid_w @ state[-1])
        o = activefunc(z)
        state.append(o)
        i += 1
    out_p =  outp_w @ state[-1]
    return (state,out_p)

# activation function - tanh
def activefunc(inpt, differen= False):
    if(differen== True):
        return (1 - np.square(np.tanh(inpt)))
    return np.tanh(inpt)

def backpropogation(inputs, targets, hidden_states, out_ps, wt_ih,  wt_ho,wt_hh):
    loss = np.mean(np.square(inputs - out_ps))
    # out_p layer gradients
    out_err = out_ps - targets
    gradient_out_p = out_err.dot(hidden_states[-1].T)

    # Initialize gradients
    grd_hh = np.zeros_like(wt_hh)
    grd_ih = np.zeros_like(wt_ih)
    grd_ho = np.zeros_like(wt_ho)
    next_error = np.zeros((wt_hh.shape[0], 1))

    for t in reversed(range(len(inputs))):
        error = np.dot(wt_ho.T, out_err) + np.dot(wt_hh.T, next_error)
        gradient = (1 - hidden_states[t] ** 2) * error
        grd_ho += out_err.dot(hidden_states[t].T)
        grd_hh += np.dot(gradient, hidden_states[t - 1].T)
        grd_ih += np.dot(gradient, inputs[t].reshape(-1, 1))
        next_error = np.dot(wt_hh.T, gradient)

    # Update weights using gradients and learning rate
    wt_ho -= learn_rate * grd_ho
    wt_hh -= learn_rate * grd_hh
    wt_ih -= learn_rate * grd_ih

    return wt_ih, wt_hh, wt_ho

#model training for # of epochs
def train_model(train,epochs):
    #initialize w
    inp_w,outp_w,hid_w = (np.random.uniform(0, 1, (hd_lyr, i_lyr)) / 2),(np.random.uniform(0, 1, (o_lyr, hd_lyr)) / 2),(np.random.uniform(0, 1, (hd_lyr, hd_lyr)) / 2)

    for iter in range(epochs):
        if(iter == epochs-1):
            trn_prd = []
        for i in train.rdd.toLocalIterator():
            xd = np.array(i['X'])
            xd = xd.reshape((1,xd.shape[0]))
            state, out_p = frwd_prop(xd,  inp_w, outp_w, hid_w)
            if(iter == epochs-1):
                trn_prd.append(out_p.tolist()[0])
            inp_w, hid_w, outp_w = backpropogation(xd, i['Y'], state,out_p, inp_w, outp_w, hid_w)
    trn_prd = np.array(trn_prd).T[0]
    return trn_prd, inp_w, hid_w, outp_w

#Based on weights trained, test model
def test_model(testd, inp_w, hid_w, outp_w):
    tst_prd = []
    for i in testd.rdd.toLocalIterator():
        xd = np.array(i['X'])
        xd = xd.reshape((xd.shape[0],1))
        state, out_p = frwd_prop(xd, inp_w, hid_w, outp_w)
        tst_prd.append(out_p.tolist()[0])
    tst_prd = np.array(tst_prd).T[0]
    return tst_prd

In [None]:
if __name__ == "__main__":
    #load and preprocess data
    data = load_data()

    print("-----Weather Data-----")
    data.show()
    print("-----Weather Data Schema-----")
    data.printSchema()

    data = data.withColumn("p (mbar)", when(col("p (mbar)") == "", None).otherwise(col("p (mbar)").cast("double")))
    data = data.withColumn("T (degC)", when(col("T (degC)") == "", None).otherwise(col("T (degC)").cast("double")))
    data = data.withColumn("VPmax (mbar)", when(col("VPmax (mbar)") == "", None).otherwise(col("VPmax (mbar)").cast("double")))
    data = data.withColumn("VPdef (mbar)", when(col("VPdef (mbar)") == "", None).otherwise(col("VPdef (mbar)").cast("double")))
    data = data.withColumn("sh (g/kg)", when(col("sh (g/kg)") == "", None).otherwise(col("sh (g/kg)").cast("double")))
    data = data.withColumn("rho (g/m**3)", when(col("rho (g/m**3)") == "", None).otherwise(col("rho (g/m**3)").cast("double")))
    data = data.withColumn("wv (m/s)", when(col("wv (m/s)") == "", None).otherwise(col("wv (m/s)").cast("double")))

    # Check the schema again and proceed with further analysis
    print("-----Processed Weather Data Schema-----")
    data.printSchema()
    preprocessed_data = preprocess_data(data)

    print("----Processed Data-----")
    preprocessed_data.show()

    #set layers size
    i_lyr,hd_lyr,o_lyr = 1,20,1

    #setting hyperparameters
    learn_rate = 0.01
    time_sequence = [1,5,10]
    epochs = [25,50]

    for steps in time_sequence:
        for iter in epochs:

            print("----***----RNN training with time_sequence : {} and epochs : {}".format(steps, iter))
            #perform split data
            train, test = train_test_split(preprocessed_data, steps)

            #Model- rnn train
            train_prd, input_w, hidden_w, output_w = train_model(train, iter)

            #Model- rnn test
            test_prd = test_model(test, input_w, hidden_w, output_w)

            #To evaluate, convert into proper format
            prd_trn = train_prd.tolist()
            prd_tst = test_prd.tolist()

            actual_train = train.rdd.map(lambda x: x[1]).collect()
            actual_test = test.rdd.map(lambda x: x[1]).collect()

            train = spark.createDataFrame(zip(actual_train, prd_trn), ['actual_output', 'predicted_output'])
            test = spark.createDataFrame(zip(actual_test, prd_tst), ['actual_output', 'predicted_output'])

            train_values = train.rdd.map(tuple)
            test_values = test.rdd.map(tuple)


            #Performance evaluation of the model
            print("\n\n----Training Evaluation----")
            performance = RegressionMetrics(train_values)

            print("MSE: {}".format(performance.meanSquaredError))
            print("RMSE: {}".format(performance.rootMeanSquaredError))
            print("MAE: {}".format(performance.meanAbsoluteError))


            print("\n\n----Testing Evaluation----")
            performance = RegressionMetrics(test_values)

            print("MSE: {}".format(performance.meanSquaredError))
            print("RMSE: {}".format(performance.rootMeanSquaredError))
            print("MAE: {}\n\n".format(performance.meanAbsoluteError))



-----Weather Data-----
+-------------------+--------+--------+--------+-----------+------+------------+------------+------------+---------+---------------+------------+--------+-------------+--------+
|          Date Time|p (mbar)|T (degC)|Tpot (K)|Tdew (degC)|rh (%)|VPmax (mbar)|VPact (mbar)|VPdef (mbar)|sh (g/kg)|H2OC (mmol/mol)|rho (g/m**3)|wv (m/s)|max. wv (m/s)|wd (deg)|
+-------------------+--------+--------+--------+-----------+------+------------+------------+------------+---------+---------------+------------+--------+-------------+--------+
|01.01.2009 00:10:00|  996.52|   -8.02|  265.40|      -8.90| 93.30|        3.33|        3.11|        0.22|     1.94|           3.12|     1307.75|    1.03|         1.75|  152.30|
|01.01.2009 00:20:00|  996.57|   -8.41|  265.01|      -9.28| 93.40|        3.23|        3.02|        0.21|     1.89|           3.03|     1309.80|    0.72|         1.50|  136.10|
|01.01.2009 00:30:00|  996.53|   -8.51|  264.91|      -9.31| 93.90|        3.21|       

In [None]:
# actual output vs predicted output plot
plt.title('RNN')
plt.scatter(actual_test,prd_tst)
plt.xlabel('Actual Output')
plt.ylabel('Predicted Output')
plt.legend(['Weather Data'])

In [None]:
# performance metric (RMSE,MSE,MAE) vs time sequence plot
time=[1,5,10]
#for epochs=25
mse=[0.006698249149405647,0.006786473715583001,0.0023744362953510172]
rmse=[0.0818428319977116,0.08238005654029985,0.0487281878931591]
mae=[0.07379011546905719,0.07313467459501904,0.041061530588503654]
#for epochs 50
# mse=[0.17913360633229894,0.03965016704468165,]
# rmse=[0.42324178235649057,0.19912349696779044,]
# mae=[0.422092780183203,0.19474719884296077,]
plt.title('RNN')
plt.plot(time,rmse,label='RMSE',color='b')
plt.plot(time,mse,label='MSE',color='g')
plt.plot(time,mae,label='MAE',color='r')
plt.xlabel('time sequence')
plt.ylabel('performance metric')
plt.legend()