# Goal: Multiple Time Series modeling using Apache Spark and Facebook Prophet

#### Data set: 
1. Crypto-currencies data set (last 5 years data for different crypto currencies)
2. Additional: In addition to this if possible will try to implement the same for Inflation (CPI) data set of different countries (300,000+ rows for 200+ countries)

#### Processes: Data processing, EDA, multiple time series modeling
#### Technologies: Spark, Python(pySpark), Databricks

In [None]:
import pandas as pd

import matplotlib as mpl
import matplotlib.pyplot as plt

from fbprophet import Prophet

# Initialising parameters
plt.rcParams['figure.figsize'] = (15, 8)
plt.rcParams['axes.grid'] = False

In [None]:
!pip install pyspark # Install pyspark

In [None]:
from pyspark.sql import SparkSession 
import pyspark

spark = SparkSession.builder.master('local').getOrCreate()
# Load csv file in pandas
df = pd.read_csv("/content/sample_data/4BitcoinsLast10YearsData.csv")

In [None]:
print("Shape of the dataset: ", df.shape)
print("\nSize of the pandas table:", df.info())

In [None]:
# OBSERVATION: Number of rows we have: 10,015 with 4 features

In [None]:
df.head()

In [None]:
df=df[['Date', 'Close', 'CryptoName']]

In [None]:
df.dtypes

In [None]:
df.isnull().sum()

In [None]:
df['Date'] = pd.to_datetime(df['Date'], infer_datetime_format=True)
df['Close']=df['Close'].astype(float)
df.head()

In [None]:
df['Date'].min(), df['Date'].max()

In [None]:
df.CryptoName.unique()

In [None]:
df.set_index("Date")[["Close"]].plot(figsize=(18, 8))

In [None]:
#df['Date'] = pd.to_datetime(df['Date'], infer_datetime_format=True)

In [None]:
item_df = df.set_index('Date')
item_df=item_df[['Close', 'CryptoName']]
item_df.head()

In [None]:
item_df[item_df['CryptoName'] == 'bitcoin']['Close'].plot()

In [None]:
dictCrypto={'bitcoin': 1, 'ethereum': 2, 'cardano': 3, 'tether': 4}
df['CryptoId']=df['CryptoName'].apply(lambda x: dictCrypto[x])

# 

In [None]:
df.dtypes

In [None]:
item_df = df.set_index('Date')
item_df.query('CryptoId == 1')[['Close']].plot()
plt.show()

In [None]:
for CryptoName in list(df['CryptoName'].unique()):
  item_df[item_df['CryptoName'] == CryptoName]['Close'].plot(title=CryptoName)
  plt.show()

In [None]:
# Stationarity Check
#zip basically combines result,labels
from statsmodels.tsa.stattools import adfuller
def adfuller_test(sales):
    result=adfuller(sales)
    labels = ['ADF Test Statistic','p-value','#Lags Used','Number of Observations Used']
    for value,label in zip(result,labels):
        print(label+' : '+str(value) )
    if result[1] <= 0.05:
        print("strong evidence against the null hypothesis(Ho), reject the null hypothesis. Data has no unit root and is stationary")
    else:
        print("weak evidence against null hypothesis, time series has a unit root, indicating it is non-stationary ")
        
# source: https://www.kaggle.com/code/avi111297/predicting-sales-using-arima-sarimax-tsf-model

for CryptoName in list(df['CryptoName'].unique()):
  print("\nCrypto Currency: ", CryptoName)
  adfuller_test(item_df[item_df['CryptoName'] == CryptoName]['Close'])

# 

In [None]:
import time 
from sklearn.metrics import mean_absolute_error
def forecast_sales(crypto_pd):
  model = Prophet(interval_width=0.95, seasonality_mode= 'multiplicative', daily_seasonality=True, weekly_seasonality=True, yearly_seasonality=True)
  model.fit(crypto_pd)
  future_pd = model.make_future_dataframe(periods=5, freq='d')
  forecast_pd = model.predict(future_pd)
  f_pd = forecast_pd[['ds', 'yhat', 'yhat_upper', 'yhat_lower']].set_index('ds')
  st_pd = crypto_pd[['ds', 'CryptoName', 'y']].set_index('ds')
  result_pd = f_pd.join(st_pd, how='left')
  result_pd.reset_index(level=0, inplace=True)
  result_pd['CryptoName'] = crypto_pd['CryptoName'].iloc[0]

  #from fbprophet.diagnostics import cross_validation
  #cv_results = cross_validation( model = model, initial = pd.to_timedelta(5,unit="d"), horizon = pd.to_timedelta(5,unit="d"))
  #print("cv_results:", cv_results)
  
  return result_pd[['ds', 'CryptoName', 'y', 'yhat', 'yhat_upper', 'yhat_lower']]


tick= time.time()
for cryptoName in list(df['CryptoName'].unique()):
  pdIndividualCrypto=df[df['CryptoName'] == cryptoName][['Date','CryptoName', 'Close']].rename(columns={'Date': 'ds', 'Close': 'y'})
  final_df=forecast_sales(pdIndividualCrypto)
  

  # calculate Mean Absolute Error (MAE) between expected and predicted values for december
  y_true = final_df.dropna()['y'].values
  y_pred = final_df.dropna()['yhat'].values

  mae = mean_absolute_error(y_true, y_pred)
  print(cryptoName, ': MAE: %.3f' % mae)

  final_df[['y', 'yhat']].plot(title=cryptoName + ': MAE: %.3f' % mae)
  
tock=time.time()
TotalTime=tock-tick
print("Total time taken: {} sec.s".format(round(tock-tick, 3)))

# For indivisual Crypto it took:  54.514 sec.s

Measuring Performance


from Prophet.diagnostics import performance_metrics
final_df = performance_metrics(final_df[['y', 'yhat']])
final_df.head()

In [None]:
#df.rename(columns={'Date': 'ds'}, inplace=True)

In [None]:
sdf = spark.createDataFrame(df)
sdf.printSchema() #data type of each col
sdf.show(5) #It gives you head of pandas DataFrame
sdf.count() #500 records

In [None]:
sdf.select(['CryptoId']).groupby('CryptoId').agg({'CryptoId': 'count'}).show()

In [None]:
sdf.createOrReplaceTempView("Crypto")
spark.sql("select CryptoId, count(*) from Crypto group by CryptoId order by CryptoId").show()

In [None]:
sql = "SELECT CryptoId, Date as ds, sum(Close) as y FROM Crypto GROUP BY CryptoId, ds ORDER BY CryptoId, ds"
spark.sql(sql).show()

In [None]:
store_part = (spark.sql(sql).repartition(spark.sparkContext.defaultParallelism, ['CryptoId'])).cache()
sdf.explain()

In [None]:
from pyspark.sql.types import *
result_schema = StructType([
                  StructField('ds', TimestampType()),
                  StructField('CryptoId', IntegerType()),
                  StructField('y', DoubleType()),
                  StructField('yhat', DoubleType()),
                  StructField('yhat_upper', DoubleType()),
                  StructField('yhat_lower', DoubleType())
])

In [None]:
#forecast_sales(df[['']])

In [None]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_sales(crypto_pd):
  #model = Prophet(interval_width=0.95, seasonality_mode= 'multiplicative', weekly_seasonality=True, yearly_seasonality=True)
  model = Prophet(interval_width=0.95, seasonality_mode= 'multiplicative', daily_seasonality=True, weekly_seasonality=True, yearly_seasonality=True)
  model.fit(crypto_pd)
  future_pd = model.make_future_dataframe(periods=5, freq='w')
  forecast_pd = model.predict(future_pd)
  f_pd = forecast_pd[['ds', 'yhat', 'yhat_upper', 'yhat_lower']].set_index('ds')
  st_pd = crypto_pd[['ds', 'CryptoId', 'y']].set_index('ds')
  result_pd = f_pd.join(st_pd, how='left')
  result_pd.reset_index(level=0, inplace=True)
  result_pd['CryptoId'] = crypto_pd['CryptoId'].iloc[0]

  return result_pd[['ds', 'CryptoId', 'y', 'yhat', 'yhat_upper', 'yhat_lower']]

In [None]:
from pyspark.sql.functions import current_date
tick=time.time()
results = (store_part.groupby('CryptoId').apply(forecast_sales).withColumn('training_date', current_date()))
results.cache()
results.show(5)
tock=time.time()
print("Total time taken: {} seconds".format((tock-tick)/60))

In [None]:
results.coalesce(1)
print(results.count())
results.createOrReplaceTempView('forecasted')
spark.sql("SELECT CryptoId, count(*) FROM  forecasted GROUP BY CryptoId").show()

In [None]:
final_df = results.toPandas()
final_df.head()

In [None]:
dictCrypto={1: 'bitcoin', 2: 'ethereum', 3: 'cardano', 4: 'tether'}
final_df['CryptoName']=final_df['CryptoId'].apply(lambda x: dictCrypto[x])
final_df.head()

In [None]:
final_df[final_df['CryptoName'] == 'bitcoin'][['y', 'yhat']].plot()

In [None]:
final_df = final_df.set_index('ds')
for CryptoName in list(final_df.CryptoName.unique()):
  #final_df.query('CryptoName == {}'.format(CryptoName))[['y', 'yhat']].plot()
  
  pdTemp=final_df[final_df['CryptoName'] == CryptoName][['y', 'yhat']]
  
  # calculate Mean Absolute Error (MAE) between expected and predicted values for december
  y_true = pdTemp.dropna()['y'].values
  y_pred = pdTemp.dropna()['yhat'].values

  mae = mean_absolute_error(y_true, y_pred)
  print(CryptoName, ': MAE: %.3f' % mae)

  pdTemp[['y', 'yhat']].plot(title=CryptoName + ': MAE: %.3f' % mae)
  plt.show()