In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType, max, col, lit
import os



from pyspark import Row


from datetime import datetime,timedelta

from fbprophet import Prophet
import pandas as pd
import numpy as np

In [3]:
def dfZipWithIndex (df, offset=1, colName="rowId"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''

    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))

    return spark.createDataFrame(new_rdd, new_schema)

In [4]:
schema = StructType([
        StructField("ds", DateType(), True),
        StructField("y", DoubleType(), True),
        StructField("yhat", DoubleType(), True),
        StructField("rmse", DoubleType(), True)
    ])

In [5]:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def fit_pandas_udf(df):
    """
    :param df: Dataframe (train + test data)
    :return: predictions as defined in the output schema
    """

    def train_fitted_prophet(df, cutoff):
        
        names = df.columns
        
        #train
        ts_train = (df
                    .query('id <= @cutoff')
                    .rename(columns={names[1]: 'ds', names[2]: 'y'})
                    .sort_values('ds')
                    )[['ds','y']]
        
        print(ts_train.columns)
        
        
        # test
        ts_test = (df
                   .query('id > @cutoff')
                   .rename(columns={names[1]: 'ds', names[2]: 'y'})
                   .sort_values('ds')
                   .assign(ds=lambda x: pd.to_datetime(x["ds"]))
                   #.drop('y', axis=1)
                   )[['ds','y']]
        
        print(ts_test.columns)
        print(ts_test.ds.values[-28])
        
        floor = ts_train.y.min()*0.8
        cap = ts_train.y.max()*3    
        
        print(floor,cap)
        
        ts_train['floor'], ts_train['cap'] = floor, cap
        ts_test['floor'], ts_test['cap'] = floor, cap

        # init model
        m = Prophet(growth='logistic',
                    yearly_seasonality=True,
                    weekly_seasonality=True,
                    daily_seasonality=True)
        m.fit(ts_train)
        
        print("train",ts_train.columns)
        
        ts_hat = (m.predict(ts_test)
                  .assign(ds=lambda x: pd.to_datetime(x["ds"]))
                  ).merge(ts_test, on=["ds"], how="left")  
        
        
        
        #calculando rmse
        
        se = np.square(ts_hat.y[0:-28] - ts_hat.yhat[0:-28])
        mse = np.mean(se)
        rmse = np.sqrt(mse)
        
        ts_hat['rmse'] = rmse
        
        ts_train['yhat'] = None
        ts_train = ts_train.assign(ds=lambda x: pd.to_datetime(x["ds"]))
        
        ts = ts_train[['ds','y','yhat']].append(ts_hat[['ds','y','yhat']])
        
        ts['rmse'] = rmse
        
        print(ts_hat.columns)
        
        return pd.DataFrame(ts, columns=schema.fieldNames())

    return train_fitted_prophet(df, cutoff)

In [6]:
if __name__ == '__main__':
    spark = (SparkSession
             .builder
             .appName("forecasting")
             .getOrCreate()
             #.config('spark.sql.execution.arrow.enable', 'true')
             )
    
   # data = (spark
   #             .read
   #             .format("csv")
   #             .option('header', 'true')
   #             .option('inferSchema','true')
   #             .load('data_simulation.csv')
   #             #.load('Downloads/AEP_hourly.csv')
   #             
   #             
   #         )
    
    data =  spark\
            .read\
            .option("header","false")\
            .option('inferSchema','true')\
            .csv(f"{os.getcwd()}/dados_stream/part*.csv")\
            .selectExpr("_c0 as Datetime","_c1 as MW")\
            .sort(col("Datetime"))
    
    data.createOrReplaceTempView("data")
    data = spark.sql(f"SELECT LEFT(Datetime,10) AS Datetime, {data.columns[1]}  FROM data")
    data = data.groupBy("Datetime")\
               .mean("MW")\
               .sort(col('DateTime'))
    
    
    # 70% of the real dataset
    data_length = data.count()
    train_size = int(round(0.7 * data_length,0))
    
    
    ##Add future days to predict
    
    #last_day = data.tail(1)[0].__getitem__("Datetime")  # Não sei se é viável
    last_day = data.tail(1)[0].asDict()['Datetime']
    future_days = pd.date_range(start = last_day,
                                periods = 28)
    sequence_days = list(future_days.strftime("%Y-%m-%d"))[1:]
    future = spark.createDataFrame(sequence_days, 
                                   StringType())
    future.createOrReplaceTempView("future")
    future = spark.sql("SELECT value AS Datetime FROM future")
    future = future.withColumn(data.columns[1],
                               lit(None))
    

    
    df = (data.union(future)).sort(col('Datetime'))
    df = dfZipWithIndex(df,colName="id")
    
    
    
    cutoff = train_size
    # Apply forcasting
    global_predictions = (df
                          .groupBy()
                          .apply(fit_pandas_udf)
                          )



In [None]:
global_predictions.show(10000)

In [7]:
global_predictions\
    .write.mode('overwrite')\
    .option('header','true')\
    .csv('time_serie')

Index(['ds', 'y'], dtype='object')                                  (0 + 1) / 1]
Index(['ds', 'y'], dtype='object')
2007-06-27T00:00:00.000000000
6676.647619047619 52332.875
Initial log joint probability = -12.6529
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
      99       1497.92    0.00251952        659.13      0.1419           1      148   
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
     199       1501.56    0.00262777       535.167      0.7913      0.7913      282   
    Iter      log prob        ||dx||      ||grad||       alpha      alpha0  # evals  Notes 
     233       1502.69   1.23294e-05        85.249   1.116e-07       0.001      390  LS failed, Hessian reset 
     286       1502.94   1.19403e-05       81.4255   1.092e-07       0.001      515  LS failed, Hessian reset 
     299       1502.96   8.58874e-05       111.903           1           1      535   
    Iter      log prob    