## import libraries

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [4]:
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window
from pyspark.sql.functions import to_date
from pyspark import keyword_only
from pyspark.ml import Pipeline,Estimator,Transformer
from pyspark.sql.functions import hour, dayofweek, month

In [6]:
# importing the module containing all functions
import importlib
import utils1 as utl
importlib.reload(utl)

<module 'utils1' from 'C:\\Users\\user\\Downloads\\utils1.py'>

## Estimators,Transformers and Pipeline

In [12]:
# Estimator interface
class Estimator_in(Estimator):
    def _fit(df,y = None):
        pass

In [13]:
#transformer interface
class Transformer_in(Transformer):
    def _transform(df,y = None):
        pass

### missing values

In [18]:
class MissingValuesEstimator(Estimator_in):
    @keyword_only
    def __init__(self, date_col=None):
        self._date_col = date_col
    def _fit(self, df,y = None):
        return MissingValuesTransformer(date_col = self._date_col)

In [19]:
class MissingValuesTransformer(Transformer_in):
    
    @keyword_only
    def __init__(self, date_col=None):
        self._date_col = date_col
        
    def _transform(self, df,y = None):
        df_pandas = df.toPandas()
        df_pandas.set_index(self._date_col, inplace=True)
        df = utl.missing_handler(df_pandas)
        df.reset_index(inplace=True)
        return spark.createDataFrame(df)

### Outliers

In [20]:
class OutlierHandlerEstimator(Estimator_in):
 
    def _fit(self, df,y = None):
        cappers = {}
        df_pandas = df.toPandas()
        cappers = utl.outlier_handler(df_pandas,cappers)
        return OutlierHandlerTransformer(cappers)

In [21]:
class OutlierHandlerTransformer(Transformer_in):
    
    def __init__(self, cappers):
        self._cappers = cappers
        
    def _transform(self, df,y = None):
        X = df.toPandas()
        df = utl.outlier_capper(X,self._cappers)
        return spark.createDataFrame(df)

### OneHotEncoding

In [39]:
class OneHotEncnoderEstimator(Estimator_in):

    def _fit(self,df,y = None):
        df_pandas = df.toPandas()
        categorical_cols, encoder = utl.oheesimate(df_pandas)
        return OneHotEncnoderTransformer(categorical_cols, encoder)

In [50]:
class OneHotEncnoderTransformer(Transformer_in):
    
    def __init__(self, categorical_cols, encoder):
        self._categorical_cols = categorical_cols
        self._encoder = encoder
        
    def _transform(self,df,y= None):        
        df_pandas = df.toPandas()
        df = utl.oheeapply(df_pandas,self._categorical_cols,self._encoder)
        return spark.createDataFrame(df)

### Stationarity

In [22]:
class StationarityCheckEstimator(Estimator):
    def _fit(self, dataset):
        data = dataset.toPandas()
        columns,diff=utl.get_diff(data)
        return StationarityCheckTransformer(diff,columns)

In [23]:
class StationarityCheckTransformer(Transformer):
    def __init__(self,diff,columns):
        self.diff=diff
        self.columns=columns

    def _transform(self,dataset):
        data = dataset.toPandas()
        data=utl.apply_diff(data,self.diff,self.columns) 
        return spark.createDataFrame(data)

### Lagged features

In [82]:
class laggerEstimator(Estimator_in):
    @keyword_only
    def __init__(self, max_lag= 30):
        self._max_lag = max_lag    

    def _fit(self,df,y = None):
        sig_fet = {}
        df_pandas = df.toPandas()
        
        sig_fet = utl.lagger(df_pandas,self._max_lag,sig_fet)
        return laggerTransformer(sig_fet)

In [83]:
class laggerTransformer(Transformer_in):
    
    def __init__(self, sig_fet):
        self._sig_fet = sig_fet

        
    def _transform(self,df,y= None):        
        df_pandas = df.toPandas()
        df = utl.lagger_apply(df_pandas,self._sig_fet)
        return spark.createDataFrame(df)

### Seasonality

In [110]:
class seasonalityEstimator(Estimator_in):
    @keyword_only
    def __init__(self, target,power_threshold= 2, length_threshold= 30, num_terms=3):
        self._power_threshold = power_threshold  
        self._length_threshold = length_threshold  
        self._num_terms = num_terms
        self._target = target
        
    def _fit(self,df,y = None):
        df_pandas = df.toPandas()
        significant_periods, feature_columns = utl.seasonality_fit(df_pandas ,self._target ,self._power_threshold, self._length_threshold, self._num_terms)

        return seasonalityTransformer(self._target,significant_periods, feature_columns, self._num_terms)

In [111]:
class seasonalityTransformer(Transformer_in):
    
    def __init__(self,target, significant_periods, feature_columns, num_terms):
        self._significant_periods = significant_periods  
        self._feature_columns = feature_columns  
        self._num_terms = num_terms
        self._target = target

        
    def _transform(self,df,y= None):        
        df_pandas = df.toPandas()
        df = utl.seasonality_transform(df_pandas,self._target,self._significant_periods, self._feature_columns,self._num_terms)

        return spark.createDataFrame(df)

### Time features

In [14]:
class time_estimator(Estimator_in):
    
    @keyword_only
    def __init__(self, date_col=None):
        self._date_col = date_col
        
    def _fit(self,df,y = None):
        return time_transformer(date_col=self._date_col)


In [15]:
class time_transformer(Transformer_in):
    
    @keyword_only
    def __init__(self, date_col=None):
        self._date_col = date_col
    
    def _transform(self,df,y = None):
        df = df.withColumn("hour_of_day", hour(df[self._date_col]))
        df = df.withColumn("day_of_week", dayofweek(df[self._date_col]))
        df = df.withColumn("month_of_year", month(df[self._date_col]))
        return df


### removing nans

In [117]:
class dropnansEstimator(Estimator_in):
    def _fit(self,df,y= None):
        return dropnansTransformer()

In [118]:
class dropnansTransformer(Transformer_in):

    def _transform(self,df,y= None):        
        df = df.dropna()
        return df

### removing constant features

In [52]:
class ConstantFeatureEstimator(Estimator_in):
    def _fit(self, df,y = None):
        constant_features = []
        input_cols = df.columns
        constant_features = utl.constantIdentifier(df,input_cols,constant_features)

        return ConstantFeatureTransformer(constant_features)

In [17]:
class ConstantFeatureTransformer(Transformer_in):
    
    def __init__(self, constant_features=None):
        self._constant_features = constant_features
        
    def _transform(self,df ,y = None):
        
        return df.drop(*self._constant_features)

# reading the data and applying the pipeline

In [128]:
# reading data
df = spark.read.csv('airline-passengers.csv',header=True,inferSchema=True)
df = df.withColumn("Month", to_date("Month", "yyyy-MM-dd HH:mm:ss"))

                                             

In [129]:
#splitting the data into train and test
df_copy = df.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("Month")))
train_df = df_copy.where("rank <= .8").drop("rank")
test_df = df_copy.where("rank > .8").drop("rank")   

In [130]:
timestamp = 'Month'
target_col = 'Passengers'

In [131]:
# instances from Estimators classes 
missing = MissingValuesEstimator(date_col = timestamp)
outlier = OutlierHandlerEstimator()
timed = time_estimator(date_col =timestamp)
constremove = ConstantFeatureEstimator()
ohe = OneHotEncnoderEstimator()
stationarity = StationarityCheckEstimator()
lagger = laggerEstimator(max_lag= 30)
seasonality = seasonalityEstimator(target = target_col)
nans_removal = dropnansEstimator()

# constructing the pipeline
pipe_line = Pipeline(stages= [missing,outlier,ohe,stationarity,nans_removal,lagger,seasonality,timed,constremove,nans_removal])


In [132]:
#fitting the pipeline to the train data 
fitted = pipe_line.fit(train_df)

          Month  Passengers
0    1949-01-01       112.0
1    1949-02-01       118.0
2    1949-03-01       132.0
3    1949-04-01       129.0
4    1949-05-01       121.0
..          ...         ...
110  1958-03-01       362.0
111  1958-04-01       348.0
112  1958-05-01       363.0
113  1958-06-01       435.0
114  1958-07-01       491.0

[115 rows x 2 columns]


In [133]:
# transform the train and test
trans_train = fitted.transform(train_df)
trans_test = fitted.transform(test_df)

In [134]:
trans_train.show()

+----------+------------------+-------------------+----------------------+----------------------+----------------------+-------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+-------------+
|     Month|        Passengers|   Passengers_found|lag_Passengers_found_1|lag_Passengers_found_2|lag_Passengers_found_4|          Feature_1|          Feature_2|           Feature_3|          Feature_4|           Feature_5|           Feature_6|           Feature_7|           Feature_8|           Feature_9|          Feature_10|          Feature_11|          Feature_12|hour_of_day|day_of_week|month_of_year|
+----------+------------------+-------------------+----------------------+----------------------+----------------------+-------------------+-------------------+--------------------+---

In [135]:
trans_test.show()

+----------+----------+----------------+----------------------+----------------------+----------------------+-------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+-------------+
|     Month|Passengers|Passengers_found|lag_Passengers_found_1|lag_Passengers_found_2|lag_Passengers_found_4|          Feature_1|          Feature_2|           Feature_3|          Feature_4|           Feature_5|           Feature_6|           Feature_7|           Feature_8|           Feature_9|          Feature_10|          Feature_11|          Feature_12|hour_of_day|day_of_week|month_of_year|
+----------+----------+----------------+----------------------+----------------------+----------------------+-------------------+-------------------+--------------------+-------------------+----------------