### Arima Model

Reference: https://github.com/Eligijus112/Vilnius-weather-LSTM

### Get Data

In [1]:
%%bash
if [ ! -d '/content/data' ]; then
  git clone https://gitlab.com/Dimu_1020/big_data_final '/content/data'
  cd '/content/data'
else
  echo "Dataset already downloaded in '/content/data'"
fi

Cloning into '/content/data'...


### Import dependencies

In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 36.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=b4d69a8e96f7e1c5b5f57ca24c4243ebe6b2a251fe4da6b2a327528bce024ee5
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [3]:
# Data wrangling
import pandas as pd 

# Visualization
import matplotlib.pyplot as plt 
import seaborn as sns 

# Date wrangling
import datetime

# Math operations
import numpy as np

# Random sampling
import random

# sklearn
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline

# Base
import os

# Keras API 
from tensorflow import keras

# Deep learning 
from keras.models import Input, Model, Sequential
from keras.layers import Dense, Dropout, LSTM, Concatenate, SimpleRNN, Masking, Flatten
from keras import losses
from keras.callbacks import EarlyStopping
from keras.initializers import RandomNormal

# pyspark related
# pip install pyspark
import pyspark
from pyspark.sql.functions import split, explode, to_timestamp, from_unixtime, from_utc_timestamp
from pyspark import SparkConf, SparkContext,SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,ArrayType,StringType,DoubleType
from pyspark.sql.functions import *
from pyspark.sql import Window
# from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
#from ts.flint import FlintContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)



## Data Cleaning

### Convert into rdd

In [4]:
spark = SparkSession.builder.appName("DataFrame").getOrCreate()
path_to_file = 'data/HistoryBulk_NY.csv'
df = spark.read.csv(path_to_file, header=True)
df.show()

+---------+--------------------+--------+---------+---------+----------+------+----------+--------+--------+--------+---------+----------+--------+----------+--------+-------+-------+-------+-------+----------+----------+------------+--------------------+------------+
|       dt|              dt_iso|timezone|city_name|      lat|       lon|  temp|feels_like|temp_min|temp_max|pressure|sea_level|grnd_level|humidity|wind_speed|wind_deg|rain_1h|rain_3h|snow_1h|snow_3h|clouds_all|weather_id|weather_main| weather_description|weather_icon|
+---------+--------------------+--------+---------+---------+----------+------+----------+--------+--------+--------+---------+----------+--------+----------+--------+-------+-------+-------+-------+----------+----------+------------+--------------------+------------+
|283996800|1979-01-01 00:00:...|  -18000| New York|40.712775|-74.005973|280.34|    276.58|  280.33|  280.35|    1030|     null|      null|      93|       6.7|     170|   null|   null|   null|  

### Convert to datetime

In [5]:
# convert to datestamp
spark.conf.set('spark.sql.session.timeZone', 'UTC')
df = df.withColumn("dt",from_unixtime(unix_timestamp(col("dt_iso"),"yyyy-MM-dd HH:mm:ss '+0000 UTC'"),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
# Sorting by the date 
df = df.sort(asc("dt"))
df.show()

+-------------------+--------------------+--------+---------+---------+----------+------+----------+--------+--------+--------+---------+----------+--------+----------+--------+-------+-------+-------+-------+----------+----------+------------+--------------------+------------+
|                 dt|              dt_iso|timezone|city_name|      lat|       lon|  temp|feels_like|temp_min|temp_max|pressure|sea_level|grnd_level|humidity|wind_speed|wind_deg|rain_1h|rain_3h|snow_1h|snow_3h|clouds_all|weather_id|weather_main| weather_description|weather_icon|
+-------------------+--------------------+--------+---------+---------+----------+------+----------+--------+--------+--------+---------+----------+--------+----------+--------+-------+-------+-------+-------+----------+----------+------------+--------------------+------------+
|1979-01-01 00:00:00|1979-01-01 00:00:...|  -18000| New York|40.712775|-74.005973|280.34|    276.58|  280.33|  280.35|    1030|     null|      null|      93|      

Check the first date and the last date in the dataset

In [6]:
# Listing the min and the max dates
first = df.agg({'dt': "min"}).collect()[0][0] 
last = df.agg({'dt': "max"}).collect()[0][0]
print(f"First date {first}")
print(f"Most recent date {last}")

First date 1979-01-01 00:00:00
Most recent date 2021-12-11 23:00:00


In [7]:
df = df.withColumn("date",to_date("dt_iso"))
df.show()

+-------------------+--------------------+--------+---------+---------+----------+------+----------+--------+--------+--------+---------+----------+--------+----------+--------+-------+-------+-------+-------+----------+----------+------------+--------------------+------------+----------+
|                 dt|              dt_iso|timezone|city_name|      lat|       lon|  temp|feels_like|temp_min|temp_max|pressure|sea_level|grnd_level|humidity|wind_speed|wind_deg|rain_1h|rain_3h|snow_1h|snow_3h|clouds_all|weather_id|weather_main| weather_description|weather_icon|      date|
+-------------------+--------------------+--------+---------+---------+----------+------+----------+--------+--------+--------+---------+----------+--------+----------+--------+-------+-------+-------+-------+----------+----------+------------+--------------------+------------+----------+
|1979-01-01 00:00:00|1979-01-01 00:00:...|  -18000| New York|40.712775|-74.005973|280.34|    276.58|  280.33|  280.35|    1030|   

### Get hourly data

In [8]:
# Aggregating to hourly level
features = ['temp', 'pressure', 'wind_speed']
data_col = ['dt', 'date'] + features
df_data = df.select(data_col)\
                   .withColumn("temp", avg("temp").over(Window.partitionBy("dt"))) \
                   .withColumn("pressure", avg("pressure").over(Window.partitionBy("dt"))) \
                   .withColumn("wind_speed", avg("wind_speed").over(Window.partitionBy("dt")))
df_data.show()

+-------------------+----------+------+--------+----------+
|                 dt|      date|  temp|pressure|wind_speed|
+-------------------+----------+------+--------+----------+
|1979-01-01 00:00:00|1979-01-01|280.34|  1030.0|       6.7|
|1979-01-01 00:00:00|1979-01-01|280.34|  1030.0|       6.7|
|1979-01-01 00:00:00|1979-01-01|280.34|  1030.0|       6.7|
|1979-01-01 01:00:00|1979-01-01|280.34|  1030.0|       6.7|
|1979-01-01 01:00:00|1979-01-01|280.34|  1030.0|       6.7|
|1979-01-01 01:00:00|1979-01-01|280.34|  1030.0|       6.7|
|1979-01-01 02:00:00|1979-01-01|280.51|  1029.0|       6.7|
|1979-01-01 03:00:00|1979-01-01|281.34|  1029.0|       5.2|
|1979-01-01 04:00:00|1979-01-01|281.34|  1029.0|       5.2|
|1979-01-01 05:00:00|1979-01-01|281.51|  1026.0|      5.67|
|1979-01-01 06:00:00|1979-01-01|281.91|  1026.0|       7.2|
|1979-01-01 07:00:00|1979-01-01|281.91|  1025.0|       6.6|
|1979-01-01 07:00:00|1979-01-01|281.91|  1025.0|       6.6|
|1979-01-01 08:00:00|1979-01-01|282.21| 

Final Features used to feed models includes: 
- $X_{temp}$
- $X_{day_{sin}}$ = $sin(\frac{2\pi X_{hour}}{24} )$
- $X_{day_{cos}}$ = $cos(\frac{2\pi X_{hour}}{24} )$
- $X_{month_{sin}}$ = $sin(\frac{2\pi X_{timestamp}}{365.25\times 24\times 60 \times 60} )$
- $X_{month_{cos}}$ = $cos(\frac{2\pi X_{timestamp}}{365.25\times 24\times 60 \times 60} )$
- $X_{pressure}$
- $X_{wind_{speed}}$

In [9]:
# get integer timestamp
df_data = df_data.withColumn('timestamp',unix_timestamp(col('dt'), format='yyyy-MM-dd HH:mm:ss').alias('unix_timestamp'))

# hours in day
HOUR = 24
# Seconds in day 
SECOND = HOUR * 60 * 60
# Seconds in year 
YEAR = (365.25) * SECOND

features_final = ['temp', 'day_cos', 'day_sin', 'month_sin', 'month_cos', 'pressure', 'wind_speed']
df_final = df_data \
       .withColumn('hour', hour(df_data.dt)) \
       .withColumn('month', month(df_data.dt)) \
       .withColumn('day_cos', cos(col('hour') * (2 * np.pi / HOUR))) \
       .withColumn('day_sin', sin(col('hour') * (2 * np.pi / HOUR))) \
       .withColumn('month_cos', cos(col('timestamp') * (2 * np.pi /YEAR))) \
       .withColumn('month_sin', sin(col('timestamp') * (2 * np.pi /YEAR))) \
       .select(features_final)
df_final.show()

+------+--------------------+--------------------+--------------------+------------------+--------+----------+
|  temp|             day_cos|             day_sin|           month_sin|         month_cos|pressure|wind_speed|
+------+--------------------+--------------------+--------------------+------------------+--------+----------+
|280.34|                 1.0|                 0.0|-0.00430059270297...|0.9999907524084426|  1030.0|       6.7|
|280.34|                 1.0|                 0.0|-0.00430059270297...|0.9999907524084426|  1030.0|       6.7|
|280.34|                 1.0|                 0.0|-0.00430059270297...|0.9999907524084426|  1030.0|       6.7|
|280.34|  0.9659258262890683| 0.25881904510252074|-0.00358383062804...|0.9999935780583941|  1030.0|       6.7|
|280.34|  0.9659258262890683| 0.25881904510252074|-0.00358383062804...|0.9999935780583941|  1030.0|       6.7|
|280.34|  0.9659258262890683| 0.25881904510252074|-0.00358383062804...|0.9999935780583941|  1030.0|       6.7|
|

In [16]:
df_arima = df_data \
       .withColumn('hour', hour(df_data.dt)) \
       .withColumn('month', month(df_data.dt)) \
       .select(['dt', 'temp'] )
df_arima.show()

+-------------------+------+
|                 dt|  temp|
+-------------------+------+
|1979-01-01 00:00:00|280.34|
|1979-01-01 00:00:00|280.34|
|1979-01-01 00:00:00|280.34|
|1979-01-01 01:00:00|280.34|
|1979-01-01 01:00:00|280.34|
|1979-01-01 01:00:00|280.34|
|1979-01-01 02:00:00|280.51|
|1979-01-01 03:00:00|281.34|
|1979-01-01 04:00:00|281.34|
|1979-01-01 05:00:00|281.51|
|1979-01-01 06:00:00|281.91|
|1979-01-01 07:00:00|281.91|
|1979-01-01 07:00:00|281.91|
|1979-01-01 08:00:00|282.21|
|1979-01-01 09:00:00|282.14|
|1979-01-01 10:00:00|282.14|
|1979-01-01 11:00:00|283.03|
|1979-01-01 12:00:00|282.89|
|1979-01-01 13:00:00|282.86|
|1979-01-01 14:00:00|283.72|
+-------------------+------+
only showing top 20 rows



In [18]:
import os
if not os.path.exists("/content/data/pd_final"):
      pd_final = df_final.toPandas()
      pd_final.to_pickle('/content/data/pd_final.pkl')
else:
      pd_final = pd.read_pickle("/content/data/pd_final.pkl")
display(pd_final)
if not os.path.exists("/content/data/pd_arima"):
      pd_arima = df_arima.toPandas()
      pd_arima.to_pickle('/content/data/pd_arima.pkl')
else:
      pd_arima = pd.read_pickle("/content/data/pd_arima.pkl")
pd_arima

Unnamed: 0,temp,day_cos,day_sin,month_sin,month_cos,pressure,wind_speed
0,280.34,1.000000,0.000000,-0.004301,0.999991,1030.0,6.70
1,280.34,1.000000,0.000000,-0.004301,0.999991,1030.0,6.70
2,280.34,1.000000,0.000000,-0.004301,0.999991,1030.0,6.70
3,280.34,0.965926,0.258819,-0.003584,0.999994,1030.0,6.70
4,280.34,0.965926,0.258819,-0.003584,0.999994,1030.0,6.70
...,...,...,...,...,...,...,...
416544,289.09,0.258819,-0.965926,-0.340673,0.940182,1004.0,0.45
416545,289.64,0.500000,-0.866025,-0.339999,0.940426,1003.0,0.45
416546,291.16,0.707107,-0.707107,-0.339325,0.940669,1003.0,0.89
416547,291.23,0.866025,-0.500000,-0.338650,0.940912,1003.0,0.89


Unnamed: 0,dt,temp
0,1979-01-01 00:00:00,280.34
1,1979-01-01 00:00:00,280.34
2,1979-01-01 00:00:00,280.34
3,1979-01-01 01:00:00,280.34
4,1979-01-01 01:00:00,280.34
...,...,...
416544,2021-12-11 19:00:00,289.09
416545,2021-12-11 20:00:00,289.64
416546,2021-12-11 21:00:00,291.16
416547,2021-12-11 22:00:00,291.23


In [None]:
from google.colab import files
files.download("data/pd_final.pkl")
files.download("data/pd_arima.pkl")