In [1]:
from xml.etree.ElementTree import fromstring
from pyspark.sql import SparkSession
# from Transformers import data_aggregation, impute_mean, lagtransformer, logtransformer, negative_sales, test_train_split,antilogtransformer
from Preprocessing.data_manipulation import DataPreparation
from Transformers.data_aggregation import AggregateData
from Transformers.impute_mean import ImputePrice
from Transformers.negative_sales import NegativeSales
from Transformers.logtransformer import Log
from pyspark.ml import Pipeline
from Transformers.scalar_na_filler import ScallerNAFiller
from Transformers.lagtransformer import Lags
from Transformers.test_train_split import Split
from pyspark.ml.feature import VectorAssembler, StringIndexer
from Evaluator.Mape import MAPE
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from Estimator.random_forest import RandomForest

In [2]:
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("project_spark") \
        .master("local[*]") \
        .config("spark.driver.memory", "15g")\
        .getOrCreate()

In [3]:
data = DataPreparation()
df = data.get_data()

In [4]:
df = data.filter_store(df, "WI_1")
df.show(5)

+--------+-----------+--------+--------------------+-------+------+--------+-----+----------+---------+----+-----+----+-------------+------------+------------+------------+-------+-------+-------+----------+
|store_id|    item_id|wm_yr_wk|                  id|dept_id|cat_id|state_id|sales|      date|  weekday|wday|month|year| event_name_1|event_type_1|event_name_2|event_type_2|snap_CA|snap_TX|snap_WI|sell_price|
+--------+-----------+--------+--------------------+-------+------+--------+-----+----------+---------+----+-----+----+-------------+------------+------------+------------+-------+-------+-------+----------+
|    WI_1|FOODS_1_001|   11507|FOODS_1_001_WI_1_...|FOODS_1| FOODS|      WI|    0|2015-03-14| Saturday|   1|    3|2015|         null|        null|        null|        null|      0|      0|      1|      2.24|
|    WI_1|FOODS_1_001|   11507|FOODS_1_001_WI_1_...|FOODS_1| FOODS|      WI|    0|2015-03-15|   Sunday|   2|    3|2015|         null|        null|        null|        n

In [5]:
df.columns

['store_id',
 'item_id',
 'wm_yr_wk',
 'id',
 'dept_id',
 'cat_id',
 'state_id',
 'sales',
 'date',
 'weekday',
 'wday',
 'month',
 'year',
 'event_name_1',
 'event_type_1',
 'event_name_2',
 'event_type_2',
 'snap_CA',
 'snap_TX',
 'snap_WI',
 'sell_price']

# Initiating different Transformers #

In [6]:
imputeNegativePrice = ImputePrice()
negativeSales = NegativeSales(column="sales")
aggregate = AggregateData(columns=["store_id", "dept_id", "year", "month"],
                                expressions={"sales": "sum",
                                "sell_price": "avg",
                                "snap_WI": "sum"})

In [7]:
log_transform = Log(inputCols=["sales","sell_price"])

In [8]:
lag_feature_transform = Lags(lags=[1,2,3], target="sales", partitionBy=["store_id","dept_id"], orderBy=["year", "month"])

In [9]:
na_filler = ScallerNAFiller()

In [10]:
storeIndexer = StringIndexer(inputCol="store_id", outputCol="store_id_index")
yearIndexer = StringIndexer(inputCol="year", outputCol="year_index")
#departIndexer = StringIndexer(inputCol="dept_id", outputCol="dept_id_indexer")

In [18]:
inputColumns = ['month',
        'sell_price',
        'snap_WI',
        'lag_1',
        'lag_2',
        'lag_3',
        'store_id_index',
        'year_index']
assembler = VectorAssembler(inputCols=inputColumns, 
                                    outputCol="features")

#  Preprocessing/FeatureEngineering Pipeline Complete #

In [26]:
transformed = Pipeline(stages=[imputeNegativePrice, negativeSales, aggregate, 
                       log_transform, lag_feature_transform, storeIndexer, yearIndexer, na_filler, assembler]).fit(df).transform(df)

In [27]:
transformed.show()

+--------+-----------+----+-----+------------------+-------+------------------+------------------+------------------+------------------+--------------+----------+--------------------+
|store_id|    dept_id|year|month|        sell_price|snap_WI|             sales|             lag_1|             lag_2|             lag_3|store_id_index|year_index|            features|
+--------+-----------+----+-----+------------------+-------+------------------+------------------+------------------+------------------+--------------+----------+--------------------+
|    WI_1|HOUSEHOLD_2|2011|    1|1.7451427262895642|      0|5.8998973535824915|               0.0|               0.0|               0.0|           0.0|       0.0|(8,[0,1],[1.0,1.7...|
|    WI_1|HOUSEHOLD_2|2011|    2| 1.745142726289564|   5150| 8.001689978099135|5.8998973535824915|               0.0|               0.0|           0.0|       0.0|(8,[0,1,2,3],[2.0...|
|    WI_1|HOUSEHOLD_2|2011|    3| 1.745142726289565|   5150|  8.09009578318096| 

In [28]:
transformed.columns

['store_id',
 'dept_id',
 'year',
 'month',
 'sell_price',
 'snap_WI',
 'sales',
 'lag_1',
 'lag_2',
 'lag_3',
 'store_id_index',
 'year_index',
 'features']

In [29]:
transformed.select("features").show(5)

+--------------------+
|            features|
+--------------------+
|(8,[0,1],[1.0,1.7...|
|(8,[0,1,2,3],[2.0...|
|[3.0,1.7451427262...|
|[4.0,1.7451427262...|
|[5.0,1.7451427262...|
+--------------------+
only showing top 5 rows



# TEST/Train Transform #

In [30]:
test_train_splitting = Split(year=2015)

In [32]:
train, test = test_train_splitting.transform(transformed)

ValueError: Param Param(parent='Split_a96a2ee4dc8d', name='split_value', doc='Column remove negative sales from') does not belong to Split_a96a2ee4dc8d.

In [None]:
rfModel = RandomForest(featuresCol="features", labelCol="sales").fit(train)