#### Notebook for data preparation.
#### Current pipeline:
* Feature extraction
  * Cast date
  * Extract day
  * Extract Month
  * Extract Year
  * Extract Week day
  * Extract if day is weekend
* Normalize data
  * Min-max scaler
* Create series (each serie is 1 months, series alternate days)

In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

In [3]:
%run ./custom_transformers

In [4]:
train_data = spark.sql("select * from store_item_demand_train_csv")

In [5]:
train, validation = train_data.randomSplit([0.8,0.2], seed=1234)

In [6]:
# Feature extraction
dc = DateConverter(inputCol='date', outputCol='dateFormated')
dex = DayExtractor(inputCol='dateFormated')
mex = MonthExtractor(inputCol='dateFormated')
yex = YearExtractor(inputCol='dateFormated')
wdex = WeekDayExtractor(inputCol='dateFormated')
wex = WeekendExtractor()
mbex = MonthBeginExtractor()
meex = MonthEndExtractor()
yqex = YearQuarterExtractor()
ydex = YearDayExtractor(inputCol='dateFormated')

# Data process
#tentar fazer 'day', 'month', 'year', 'weekday', 'weekend' (as colunas derivadas) ficarem de forma dinâmica, no lugar delas ficar a saída de seu respectivo transformer
va = VectorAssembler(inputCols=['store', 'item', 'day', 'month', 'year', 'weekday', 'weekend', 'monthbegin', 'monthend', 'yearquarter', 'yearday'], outputCol="features")
# scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Serialize data
sm = SerieMaker(inputCol='scaledFeatures', dateCol='date', idCol=['store', 'item'], serieSize=10)

pipeline = Pipeline(stages=[dc, dex, mex, yex, wdex, wex, mbex, meex, yqex, ydex, va, scaler, sm])

In [7]:
pipiline_model = pipeline.fit(train)

In [8]:
train_transformed = pipiline_model.transform(train)
validation_transformed = pipiline_model.transform(validation)

In [9]:
# train_transformed = train_transformed.filter(F.col('filled_serie') == 0)
# validation_transformed = validation_transformed.filter(F.col('filled_serie') == 0)

# train_transformed = train_transformed.filter(F.col('rank') % 3 == 0)
# validation_transformed = validation_transformed.filter(F.col('rank') % 3 == 0)

In [10]:
# pipeline_path = '/dbfs/user/pipeline'
# pipiline_model.save(pipeline_path)
# pipiline_model.load(pipeline_path)

In [11]:
train_transformed.write.saveAsTable('train_transformed_10', mode='overwrite')
validation_transformed.write.saveAsTable('validation_transformed_10', mode='overwrite')

In [12]:
test_data = spark.sql("select * from store_item_demand_test_csv")
test_transformed = pipiline_model.transform(test_data)
test_transformed.write.saveAsTable('test_transformed_10', mode='overwrite')

In [13]:
print('Train raw: %s' % train.count())
print('Validation raw: %s' % validation.count())
print('Test raw: %s' % test_data.count())

In [14]:
print('Train transformed: %s' % train_transformed.count())
print('Validation transformed: %s' % validation_transformed.count())
print('Test transformed: %s' % test_transformed.count())

In [15]:
from pyspark.sql import Window
from pyspark.sql import types as T
from pyspark.sql import functions as F

# window = Window().partitionBy(['store', 'item', 'month']).orderBy('month')
window = Window().partitionBy(['store', 'item', 'month']).orderBy('month')
df = validation_transformed.withColumn('prev', F.last(F.col('sales')).over(window))

display(df.filter('store == 1').filter('item == 1').select('store', 'item', 'month', 'day', 'sales', 'prev'))

store,item,month,sales,prev
1,1,1,13,9
1,1,1,12,9
1,1,1,5,9
1,1,1,12,9
1,1,1,6,9
1,1,1,11,9
1,1,1,12,9
1,1,1,21,9
1,1,1,6,9
1,1,1,13,9


In [16]:
display(validation_transformed.filter('store == 1').filter('item == 1').select('store', 'item', 'month', 'day', 'sales'))

store,item,month,sales
1,1,1,13
1,1,1,12
1,1,1,5
1,1,1,12
1,1,1,6
1,1,2,16
1,1,2,11
1,1,2,10
1,1,2,7
1,1,2,12
