In [43]:
import re
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.evaluation import RegressionEvaluator
import logging

In [11]:
def get_unique_value_count(df):
    count_dict = {}
    na_count_dict = {}
    total_cols = len(df.columns)
    for idx, col in enumerate(df.columns):
        print("--"*50)
        print(f"At field: {col}, {idx+1} out of  {total_cols}")
        unique_value_count = df.agg(F.countDistinct(col)).collect()[0][0]
        df_temp = df[df[col].isNull()]
        na_value_count = df_temp.count()
        
        if unique_value_count < 25:
            unique_values = df.select(col).distinct().collect()
            print(unique_values)
        count_dict[col] = unique_value_count
        na_count_dict[col] = na_value_count
    return count_dict, na_count_dict

In [39]:
if __name__  == "__main__":
    spark = SparkSession.builder.getOrCreate()
    df_train_test = spark.read.csv("data/train.csv", header=True)
    print(get_unique_value_count(df_train_test))
    
    df_train_test = df_train_test.drop(*["Product_Category_2", "Product_Category_3"])
    CATEGORICAL_FIELDS = df_train_test.columns[2:-1]
    
    df_train_test = df_train_test.withColumn("Purchase", df_train_test["Purchase"].cast(FloatType()))
    df_train, df_test = df_train_test.randomSplit([0.7, 0.3])
    
    # preprocessing
    string_indexer = StringIndexer(inputCols=CATEGORICAL_FIELDS,
                             outputCols=list(map(lambda x: x+"_LE", CATEGORICAL_FIELDS)))
    le_model = string_indexer.fit(df_train.select(*CATEGORICAL_FIELDS))
    one_hot_encoder = OneHotEncoder(inputCols=[col+"_LE" for col in CATEGORICAL_FIELDS],
                               outputCols=[col+"_OHE" for col in CATEGORICAL_FIELDS],
                               dropLast=True)
    vector_assembler = VectorAssembler(inputCols=[col+"_OHE" for col in CATEGORICAL_FIELDS],
                outputCol="features")
    # training
    # algorithm
    dt = DecisionTreeRegressor(labelCol="Purchase", featuresCol="features", maxDepth=10)
    
    pipeline = Pipeline(stages=[string_indexer, one_hot_encoder, vector_assembler, dt])
    #evaluation
    evaluator = RegressionEvaluator(predictionCol="prediction",
                                    labelCol="Purchase",
                                    metricName="r2")
    #params
    param_grid = ParamGridBuilder().addGrid(param = dt.maxDepth, 
                                            values= [3, 5, 7, 10, 15]).build()
    crossval = CrossValidator(estimator=pipeline, 
                  estimatorParamMaps=param_grid,
                  evaluator=evaluator)
    model = crossval.fit(df_train)
    # predictions
    df_train_predictions = model.transform(df_train).select(["Purchase", "prediction"])
    # test predictions
    df_test_predictions = model.transform(df_test).select(["Purchase", "prediction"])
    
    train_score = evaluator.evaluate(df_train_predictions)
    test_score = evaluator.evaluate(df_test_predictions)
    print("--"*50)
    print("Metrics")
    print(train_score, test_score)
    

----------------------------------------------------------------------------------------------------
At field: User_ID, 1 out of  12
----------------------------------------------------------------------------------------------------
At field: Product_ID, 2 out of  12
----------------------------------------------------------------------------------------------------
At field: Gender, 3 out of  12
[Row(Gender='F'), Row(Gender='M')]
----------------------------------------------------------------------------------------------------
At field: Age, 4 out of  12
[Row(Age='18-25'), Row(Age='26-35'), Row(Age='0-17'), Row(Age='46-50'), Row(Age='51-55'), Row(Age='36-45'), Row(Age='55+')]
----------------------------------------------------------------------------------------------------
At field: Occupation, 5 out of  12
[Row(Occupation='7'), Row(Occupation='15'), Row(Occupation='11'), Row(Occupation='3'), Row(Occupation='8'), Row(Occupation='16'), Row(Occupation='0'), Row(Occupation='5'), Row

# Persisting model

In [41]:
model.save("models/blackfriday_purchase_prediction")

# Loading Model

In [44]:
test_model = CrossValidatorModel.load("models/blackfriday_purchase_prediction/")