# Initialization
---

### Loading the required Libraries for Time-Series Analysis

In [0]:
%pip install FBProphet

In [0]:
import logging
logging.getLogger('py4j').setLevel(logging.ERROR)

### Loading the data

In [0]:
df1 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("dbfs:/FileStore/shared_uploads/goswamisagard@gmail.com/df1_sample.csv")

In [0]:
df1.printSchema()

In [0]:
df1 = df1.withColumnRenamed("_c0", "ID")
df1 = df1.withColumnRenamed("DateTime", "ds")
df1 = df1.withColumnRenamed("speed", "y")df1 = df1.withColumnRenamed("_c0", "ID")

In [0]:
from pyspark.sql.types import StringType
df1 = df1.withColumn("link_id",df1.link_id.cast('string'))

In [0]:
df = df1.select('ds', 'link_id', 'y')

In [0]:
from pyspark.sql.functions import count, col

cleaning_filters = (df.ds >= '2018-01-01') & (df.ds < '2022-01-01') & (df.y < 68) & (df.y > 0)

In [0]:
# Filtering rows to clean the data
df = df.filter( cleaning_filters )

In [0]:
df.cache()
df.printSchema()
df.show(10)

In [0]:
df.createOrReplaceTempView('data')

# Time-Series Analysis | w.r.t. link_id

In [0]:
# Preparing the input for training model

train = '''
SELECT ds, link_id, AVG(y) AS y
FROM data
GROUP BY ds, link_id
ORDER BY ds, link_id;
'''

df_train = spark.sql(train).toPandas()

## Defining function to train Time-Series Models for each link_id

In [0]:
## Defining function to train Time-Series Models for each link_id

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf( df_link_forecast_schema, PandasUDFType.GROUPED_MAP )


def forecast_link( train_data ):
    
    pred_days = 2*365 + 0*30 + 0*7 + 0 # Predicting for 2 years
    
    #removing null values created form the aggregation
    train_data = train_data.dropna()
    
    
    # Training the Time-Series Model
    ts_link_model = Prophet(
        daily_seasonality=True,
        weekly_seasonality=True,
        yearly_seasonality=True
    )
    ts_link_model.fit( train_data )
    
    
    # Generating Predictions for the models.
    df_link_future = ts_link_model.make_future_dataframe(
        periods = pred_days * 24 ,
        freq = 'H',
        include_history = True
    )
    
    df_link_forecast = ts_link_model.predict( df_link_future )
    
    
    # Assembling the predicted results
    f_pd = df_link_forecast[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
    h_pd = train_data[['ds','link_id','y']].set_index('ds')
    
    results_pd = f_pd.join( h_pd, how='left' )
    results_pd.reset_index(level=0, inplace=True)    
    results_pd['link_id'] = train_data['link_id'].iloc[0]
    
    
    # Returning the prediction results
    return results_pd[ ['ds', 'link_id', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]  

## Running the Time-Series Models for individual datasets

In [0]:
from pyspark.sql.functions import current_date

results = (df_link_train
           .groupBy('link_id')
           .apply(forecast_link)
           .withColumn('training_date', current_date() )
)

results.createOrReplaceTempView('link_forecasts')

## Displaying Results for Predictions

In [0]:
display(results)

ds,link_id,y,yhat,yhat_upper,yhat_lower,training_date
2017-05-08,4329507,34.18,33.058647,44.75394,21.41242,2022-04-24
2017-05-17,4329507,34.18,25.970612,37.704174,13.823777,2022-04-24
2017-06-08,4329507,24.85,28.846054,40.447952,17.597525,2022-04-24
2017-06-08,4329507,16.78,22.64732,34.670284,12.302156,2022-04-24
2017-06-15,4329507,14.91,21.692205,32.6909,10.081255,2022-04-24
2017-06-20,4329507,27.96,22.059807,34.031826,10.509443,2022-04-24
2017-06-22,4329507,27.96,24.059952,35.295666,13.069384,2022-04-24
2017-06-28,4329507,26.72,19.561394,30.781313,7.534235,2022-04-24
2017-07-07,4329507,16.16,21.541931,33.520935,9.812858,2022-04-24
2017-07-15,4329507,26.72,21.76044,33.47502,10.213018,2022-04-24


In [0]:
results.count()

In [0]:
%sql
-- Creating a forecast table
create table if not exists forecasts (
  ds timestamp,
  link_id string,
  speed float,
  speed_predicted float,
  speed_predicted_upper float,
  speed_predicted_lower float,
  training_date date
  )
using delta
partitioned by (training_date);

In [0]:
%sql

-- Loading the prediction data to it
insert into forecasts
select 
  ds,
  link_id,
  y as speed,
  yhat as speed_predicted,
  yhat_upper as speed_predicted_upper,
  yhat_lower as speed_predicted_lower,
  training_date
from link_forecasts;

num_affected_rows,num_inserted_rows
2535280,2535280


In [0]:
results.printSchema()

## Time-Series Analysis Results for individual link_id

In [0]:
%sql

SELECT DISTINCT link_id FROM link_forecasts;

link_id
4616324
4616346
4456510
4616337
4456501
4616318
4763655
4616200
4616218
4616232


**---------- Insert link-Id below to view the predictions. ----------**

In [0]:
%sql
SELECT * FROM link_forecasts
WHERE link_id = '4616192';

ds,link_id,y,yhat,yhat_upper,yhat_lower,training_date
2017-05-24,4616192,49.09,47.952213,62.51553,34.004345,2022-04-24
2017-11-07,4616192,35.41,41.692104,55.210163,27.164154,2022-04-24
2017-11-07,4616192,52.19,46.545486,59.820415,32.34341,2022-04-24
2017-11-08,4616192,39.14,43.991707,57.466694,30.714376,2022-04-24
2017-11-10,4616192,52.81,52.006927,65.83374,37.30972,2022-04-24
2017-11-14,4616192,52.81,51.092117,64.81968,35.851067,2022-04-24
2017-11-15,4616192,50.95,43.27437,56.9359,28.553305,2022-04-24
2017-11-23,4616192,47.84,45.15487,59.163403,30.530085,2022-04-24
2017-11-26,4616192,3.1,42.30861,55.922813,28.568178,2022-04-24
2017-11-30,4616192,26.71,45.637802,59.66028,30.666037,2022-04-24
