In [9]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
import numpy as np
from pyspark.ml import Pipeline
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import *


In [38]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Time Series').getOrCreate()
df = spark.read.csv('/content/train.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- store: integer (nullable = true)
 |-- item: integer (nullable = true)
 |-- sales: integer (nullable = true)



In [39]:
schema = StructType([
StructField("date", DateType()),
StructField("store", IntegerType()),
StructField("item", IntegerType()),
StructField("sales", IntegerType())])



In [40]:
df = (df.withColumn('yearday', F.dayofyear(F.col("date"))).withColumn('year', F.year(F.col('date'))))


In [41]:
from pyspark.mllib.linalg import SparseVector, DenseVector

In [42]:
df_final=df.toPandas()
df_final.nunique()

date       1826
store        10
item         50
sales       213
yearday     366
year          5
dtype: int64

In [43]:
numeric_col= ["sales"]

In [44]:
imputer = Imputer(inputCols=numeric_col, outputCols=["{}_imputed".format(c) for c in numeric_col])


In [45]:
categorical_col= ["store", "item","year"]
indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexedd".format(c), handleInvalid = 'skip') for c in categorical_col]

In [46]:
encoders = [OneHotEncoder(dropLast=True,inputCol=indexer.getOutputCol(),outputCol="{0}_encodedd".format(indexer.getOutputCol())) for indexer in indexers]

In [52]:
assembler = VectorAssembler(inputCols= [encoder.getOutputCol() for encoder in encoders] + \
 [x +'_imputed' for x in numeric_col] + ['yearday'],
  outputCol="Features")


In [53]:
pca = PCA(k=2, inputCol="Features", outputCol="pcaFeatures")

In [54]:
pipeline = Pipeline(stages =[imputer] + indexers + encoders +  [assembler]+ [pca])

In [55]:
model= pipeline.fit(df)
final_dataset = model.transform(df)


In [56]:
target = 'sales'

In [63]:
gbt = GBTRegressor(featuresCol = 'Features', labelCol=target)
dt = DecisionTreeRegressor(featuresCol = 'Features', labelCol=target)
lr = LinearRegression(featuresCol = 'Features', labelCol=target)

In [75]:
final_dataset.groupBy("year").count().show(truncate=False)

+----+------+
|year|count |
+----+------+
|2015|182500|
|2013|182500|
|2014|182500|
|2016|183000|
|2017|182500|
+----+------+



In [76]:
X_train = (final_dataset.filter(F.col('date').between("2013-01-01", "2016-06-01"))
    .withColumn(target, F.log1p(F.col(target)))
    )

X_test = (final_dataset.filter(F.col('DATE') > "2016-06-01")
    .withColumn(target, F.log1p(F.col(target))))

In [77]:
fitted = gbt.fit(X_train)

In [78]:
X_test.show()

+----------+-----+----+------------------+-------+----+-------------+--------------+-------------+-------------+-----------------------+----------------------+----------------------+--------------------+--------------------+
|      date|store|item|             sales|yearday|year|sales_imputed|store_indexedd|item_indexedd|year_indexedd|store_indexedd_encodedd|item_indexedd_encodedd|year_indexedd_encodedd|            Features|         pcaFeatures|
+----------+-----+----+------------------+-------+----+-------------+--------------+-------------+-------------+-----------------------+----------------------+----------------------+--------------------+--------------------+
|2016-06-02|    1|   1| 3.258096538021482|    154|2016|           25|           0.0|          0.0|          0.0|          (9,[0],[1.0])|        (49,[0],[1.0])|         (4,[0],[1.0])|(64,[0,9,58,62,63...|[154.586738440685...|
|2016-06-03|    1|   1|3.6635616461296463|    155|2016|           38|           0.0|          0.0|  

In [79]:
yhat = (fitted.transform(X_test).withColumn("prediction", F.expm1(F.col("prediction"))).withColumn(target, F.expm1(F.col(target))))

In [80]:
yhat.show()

+----------+-----+----+------------------+-------+----+-------------+--------------+-------------+-------------+-----------------------+----------------------+----------------------+--------------------+--------------------+------------------+
|      date|store|item|             sales|yearday|year|sales_imputed|store_indexedd|item_indexedd|year_indexedd|store_indexedd_encodedd|item_indexedd_encodedd|year_indexedd_encodedd|            Features|         pcaFeatures|        prediction|
+----------+-----+----+------------------+-------+----+-------------+--------------+-------------+-------------+-----------------------+----------------------+----------------------+--------------------+--------------------+------------------+
|2016-06-02|    1|   1|25.000000000000004|    154|2016|           25|           0.0|          0.0|          0.0|          (9,[0],[1.0])|        (49,[0],[1.0])|         (4,[0],[1.0])|(64,[0,9,58,62,63...|[154.586738440685...| 25.49962830379753|
|2016-06-03|    1|   1| 

In [81]:
eval_ = RegressionEvaluator(labelCol= target, predictionCol= "prediction", metricName="rmse")

rmse = eval_.evaluate(yhat)

print('rmse is %.2f' %rmse)

mae = eval_.evaluate(yhat, {eval_.metricName: "mae"})
print('mae is %.2f' %mae)

r2 = eval_.evaluate(yhat, {eval_.metricName: "r2"})
print('r2 is %.2f' %r2)


rmse is 4.78
mae is 1.81
r2 is 0.98


In [84]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [83]:
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [5, 8, 10, 12])
             .addGrid(gbt.maxBins, [64])
             .build())

cv = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=eval_,
                          numFolds=3)  
cvModel = cv.fit(X_train)

In [85]:
yhat = (cvModel.transform(X_test)
    .withColumn("prediction", F.expm1(F.col("prediction")))
    .withColumn(target, F.expm1(F.col(target)))
    )


In [86]:
eval_ = RegressionEvaluator(labelCol= target, predictionCol= "prediction", metricName="rmse")

rmse = eval_.evaluate(yhat)

print('rmse is %.2f' %rmse)

mae = eval_.evaluate(yhat, {eval_.metricName: "mae"})
print('mae is %.2f' %mae)

r2 = eval_.evaluate(yhat, {eval_.metricName: "r2"})
print('r2 is %.2f' %r2)


fi = fitted.featureImportances.toArray()

rmse is 3.30
mae is 0.91
r2 is 0.99
