In [1]:
import json
import pandas as pd
import numpy as np
import time
from datetime import datetime, timedelta
import uuid 
import sklearn.metrics as metrics
from prophet import Prophet
from prophet.serialize import model_to_json, model_from_json
import matplotlib
import matplotlib.pyplot as plt
plt.style.use("fivethirtyeight")
import import_ipynb

from utils import generate_prophet_time_frame,load_model,nonetype_timestamp,nonetype_float

Importing plotly failed. Interactive plots will not work.


importing Jupyter notebook from utils.ipynb


In [2]:

class timedata:
    """Represents data instance objects .
    Takes in a pandas DF having a datetime index other columns representing the measures.

    Parameters
    -----------
    data : Pandas Dataframe with datetime index (frequency = "D").       
       
    
    """ 
    def __init__(self,data):
        if not isinstance(data, pd.DataFrame):
            raise TypeError("Data must be Pandas dataframe")
        self.initial_data=data
        self.data=data
        try :
            self.frequency=pd.infer_freq(self.data.index)
        except :
             self.frequency='D'
        
                
        self.first_timestamp=data.index[0]
        self.end_timestamp=data.index[-1]
        self.nexttimestamp=self.end_timestamp + timedelta(days=1)
        
    @property    
    def length(self):
        return len(self.data)
        # print(f"Total length of data is {len(self.data)}")
        
    def hold_out_data(self,hold_out_percentage:float=0.2) ->pd.DataFrame:
        """Method to produce train and test sets based on hold out percentage"""
        hold_out_size=round(hold_out_percentage * len(self.data))
        
        return self.data[:len(self.data)- hold_out_size],self.data[len(self.data)- hold_out_size:]


class data_extended(timedata):
    """An extended data class to add incoming data
    Not used in the current implementation , mainly used for testing purposes """
    
    def __init__(self,data):
        super().__init__(data)
        self.incoming_df=None
        self.incoming_counter=0
    
    
    def incoming_new(self,measure,value_list):        
        """
        When new data comes,the data instance expands to include the incoming data.
        self.incoming_df holds the new data only    
        
        Note : We can deal with time indexed dataframe that contains alot of measures.
        """
        a={}
        for column_name in self.data.columns.values:
            if column_name !=measure:
                a[column_name]=[None for _ in range(len(value_list))]
            else:
                a[column_name]=value_list
        
        self.incoming_df=pd.DataFrame( data=
                        a,
                         index=pd.date_range(self.nexttimestamp,periods=len(value_list),freq='D')
                        )
        self.incoming_df.index.names = ['date']
        
        self.data=self.data.append(self.incoming_df)

        return self.data,self.incoming_df

    def reset_incoming(self):
        self.incoming_all=None
        self.data=self.initial_data

class model_build:    
    """
    A Prophet forecasting model 
    Attributes
    ----------
    model_id : str
        UUID for the model
    model : Prophet model class                 
    train_data : pd.dataframe 
        Data that was used in training the model
    test_data : pd.dataframe 
        Data that was used in testing the model
    predicted_train : pd.dataframe 
        Data predictions for train
    predicted_test : pd.dataframe 
        Data predictions for test
    train_MAE : float
        Mean absolute error for train predictions
    test_MAE : float
        Mean absolute error for test predictions
    forecasts : pd.dataframe
        Data forecasted by the model
    train_time_complexity: float 
        time taken by the model

    When a model instance is fit again , attributes will change
    
    Note: Every data attribute is a DF which may have multiple measures
    """
    
    def __init__(self,prior_model_path:str=None,prior_model_id:str=None):
#         self.model_name=model_name
        if prior_model_path is not None and prior_model_id is not None :
            self.model=load_model(prior_model_path)
            self.model_id=prior_model_id        
        
    
    def _set_start_end_timestamps(self,data):
        """Sets the start and end timestamps for data used in the model"""
        
        if data is not None :
            start,end=data.index[0],data.index[-1]
        else:
            start,end=None,None
            
        return start,end
    
    def fit(self,train_data:pd.DataFrame,measure:str,test_data:pd.DataFrame=None,**kwargs):
        """
        Builds a prophet model and fits it on train Data .Can accept test data as well . 
        Sets the following attributes 
        -train_data,test_data 
        -predicted_train , predicted_test for evaluation
        -modeltrain_startime,modeltrain_endtime,modeltest_starttime,modeltest_endtime via _set_start_end_timestamps
        -forecasts = None
        
        """
        self.train_data=train_data[measure]
        self.prophet_train_data=train_data.reset_index()[['date',measure]].rename(columns={'date':'ds',measure:'y'})
        
        start = time.time()

        m1=Prophet(**kwargs)
        m1.fit(self.prophet_train_data)
        
        self.train_time_complexity=time.time()-start
        
        self.model=m1
        self.model_id=str(uuid.uuid4())
        
        self.modeltrain_startime,self.modeltrain_endtime=self._set_start_end_timestamps(self.train_data)
        
        ##
        train_ds=self.model.make_future_dataframe(periods=0,include_history=True)
        self.predicted_train=self.model.predict(train_ds)[['ds','yhat','yhat_lower','yhat_upper']].set_index('ds')
        
        
        if test_data is not None :            
            self.test_data=test_data[measure]
            test_ds=self.model.make_future_dataframe(len(self.test_data),include_history=False)
            self.predicted_test=self.model.predict(test_ds)[['ds','yhat','yhat_lower','yhat_upper']].set_index('ds') 
            
            
        else:
            self.test_data=None
            self.predicted_test=None

        self.modeltest_starttime,self.modeltest_endtime=self._set_start_end_timestamps(self.test_data)

        
        self.forecasts=None  #The model has not forecasted out of sample yet


        print(f"Train data is between {self.modeltrain_startime.strftime('%d-%m-%Y')} and {self.modeltrain_endtime.strftime('%d-%m-%Y')}")
        
        try:
            print(f"Test data is between {self.modeltest_starttime.strftime('%d-%m-%Y')} and {self.modeltest_endtime.strftime('%d-%m-%Y')}")
        except:
            pass   
    
    def hypertune(self,train_data:pd.DataFrame,measure:str,test_data:pd.DataFrame=None,**kwargs):
        self.train_data=train_data[measure]
        self.prophet_train_data=train_data.reset_index()[['date',measure]].rename(columns={'date':'ds',measure:'y'})
        
        start = time.time()
        
        m=Prophet(**kwargs)
        df_cv = cross_validation(m, horizon = '30 days')
        


        
        
        
    def evaluate(self) ->dict:
        '''Method to evaluate MAE for predictions . Called after fit method
           
           Returns
           --------
           model_output_data (dict): dictionary contain model properties(model_id,model_MAE,etc)
        '''
        
        self.train_MAE=metrics.mean_absolute_error(self.train_data, self.predicted_train['yhat'])        
        print('Train Mean Absolute Error:',self.train_MAE)  

          
        if self.test_data is not None :           
            
            self.test_MAE=metrics.mean_absolute_error(self.test_data, self.predicted_test['yhat'])        
            print('Test Mean Absolute Error:',self.test_MAE)  
            
        else :
            self.test_MAE=None
        '''model_db_columns=["model_id","model_path","model_type"
                          "model_training_day","model_train_starttime","model_train_endtime",
                          "model_test_starttime","model_test_endtime",
                          "training_type",
                          "model_train_MAE","model_test_MAE" ,                  
                          "vxp_client_id" , "metric" , "game", "platform"
                         ]'''
        
        
        model_output_data={'model_id':[self.model_id] ,
                           'model_path':[f"{self.model_id}.json"],
                           'model_type':['Prophet'],
                            'model_training_date':[datetime.now().strftime("%Y-%m-%d %H:%M:%S")],
                           'model_train_starttime':[self.modeltrain_startime],
                           'model_train_endtime':[self.modeltrain_endtime],
                           'time_complexity':[self.train_time_complexity],
                          'model_test_starttime':[nonetype_timestamp(self.modeltest_starttime)],
                           'model_test_endtime':[nonetype_timestamp(self.modeltest_endtime)],
                          'training_type':['fit'],
                           'model_train_MAE':[self.train_MAE],
                           'model_test_MAE':[nonetype_float(self.test_MAE)]
                          }

        return model_output_data
    
    def forecast(self,start_date:datetime=None,forecast_steps:int=1):
        """Use the model to forecast a number of steps. 
        If start date is not given, forecast from the end of the test .
        If no test set given before in fit ,method forecasts from the end of train.
        Sets the following attributes
        -modelforecast_startime,modelforecast_endtime.
        -forecasts.

        Parameters
        -------------
        Start Date : Start date of forecasting
        Forecast Steps: Number of steps to forecast
        
        Returns 
        ---------
        forecsts(dataframe) :Dataframe having 'ds','yhat','yhat_lower','yhat_upper','model_id
        """
        if start_date is None :
            if self.test_data is None :
                start_date =self.modeltrain_endtime + timedelta(days = 1)
            else :
                start_date =self.modeltest_endtime + timedelta(days = 1)
                
        forecast_timeframe=generate_prophet_time_frame(start_date=start_date,forecast_steps=forecast_steps)
        self.forecasts=self.model.predict(forecast_timeframe)[['ds','yhat','yhat_lower','yhat_upper']].set_index('ds')


        self.modelforecast_startime,self.modelforecast_endtime=self._set_start_end_timestamps(self.forecasts)    
        # print(f"Forecasts between {self.modelforecast_startime.strftime('%d-%m-%Y')} and {self.modelforecast_endtime.strftime('%d-%m-%Y')} \n")
        self.forecasts['forecasted_date']=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        return self.forecasts.reset_index().join(pd.DataFrame({'model_id':forecast_steps * [self.model_id]}))


    def return_model_json_string(self) ->str:
        return json.dumps(model_to_json(self.model))
    
    # def save_model(self,model_directory:str,metric:str) -> str :
    #     """Saves model on local filesystem """
    #     if not os.path.exists(model_directory):
    #         os.makedirs(model_directory)
    #     save_path=os.path.join(model_directory,f'{metric}_{datetime.now().strftime("%Y-%m-%d")}_{self.model_id}.json')         
    #     with open(save_path, 'w') as fout:
    #         json.dump(model_to_json(self.model), fout)  
    #     return save_path
    
    
    
#     def plot_fit(self,metric:str,Parameters:dict) -> matplotlib.figure.Figure:
    def plot_fit(self) :

        """Function that returns a plot figure to be plotted .
        Parameters dictionary (dict) : Used in labelling the plot with the correct game/platform
        Plots the fitting . PLots train,predicted train,predicted test(if test is given) 
        and confidence intervals"""
        plt.figure(figsize=(12,5))

#         game=Parameters['game_id']

        
        if self.test_data is None:     
            self.model.plot(self.predicted_train.reset_index())
            
        else :            
            all_ds=self.model.make_future_dataframe(len(self.test_data),include_history=True)
            results=self.model.predict(all_ds)[['ds','yhat','yhat_lower','yhat_upper']]
            self.model.plot(results)

        plt.xlabel("Date")
#         plt.ylabel(metric)     
        plt.legend()
#         plt.title(f"Game is {game}")

        plt.show()
        '''
        fig_to_upload=plt.gcf()
        fig_name=f'{metric}_train_plot_{datetime.now().strftime("%Y-%m-%d")}_{self.model_id}_plot.png'

        return (fig_to_upload,fig_name)
        '''
        
            
#     def plot(self,metric:str,Parameters:dict,incoming_df=None,measure=None) -> matplotlib.figure.Figure:
    def plot(self,incoming_df=None,measure=None) -> matplotlib.figure.Figure:

        """Function that returns a plot figure to be plotted 
        Parameters dictionary (dict) : Used in labelling the plot with the correct game/platform
        Plots the forecasts.
        Plots train,test,predicted test,forecasts and incoming data (if present)
        and confidence intervals"""
        plt.figure(figsize=(12,5))

#         game=Parameters['game_id']

        plt.plot(self.train_data,label="Train")
        if self.test_data is not None :
            plt.plot(self.test_data,label="Test")
            plt.plot(self.predicted_test['yhat'],label='Test Predictions')
            plt.fill_between(self.test_data.index,self.predicted_test['yhat_lower'],self.predicted_test['yhat_upper'],color='k',alpha=0.15)

        if self.forecasts is not None :
            plt.plot(self.forecasts['yhat'],label='Forecasts')
            plt.fill_between(self.forecasts.index,self.forecasts['yhat_lower'],self.forecasts['yhat_upper'],color='k',alpha=0.15)
        
        if incoming_df is not None :
            plt.plot(incoming_df[measure],label="Incoming")

        plt.xlabel("Date")
#         plt.ylabel(metric)            
        plt.legend()
#         plt.title(f"Game is {game}")

        plt.show()
        
        '''
        fig_to_upload=plt.gcf()
        fig_name=f'{metric}_forecast_plot_{datetime.now().strftime("%Y-%m-%d")}_{self.model_id}_plot.png'

        return (fig_to_upload,fig_name)
        '''

        
