In [0]:
# Train Product Ranker using Spark GBT
# Using Dummy Data

In [0]:
import operator

from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import GBTClassifier,RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler,OneHotEncoder, StandardScaler


from pyspark.sql import functions as F
from pyspark.sql.functions import udf,struct,col
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql.types import IntegerType,FloatType,ArrayType,StringType,MapType,DoubleType

sparse_values = udf(lambda v: v.values.tolist(), ArrayType(DoubleType()))

In [0]:
train_samples = (
    spark.read.parquet('stage_2_train.parquet')
)
test_samples = (
    spark.read.parquet('stage_2_test.parquet')
)

In [0]:
train_samples.count(), test_samples.count()

In [1]:
# selected features
assemblerInputs = [
'product_l2_score',
'product_l2_index',
'product_l1_score',
'product_l1_index',
'product_l3_score',
'product_l3_index',
'rwf'
]

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

standard_scaler = StandardScaler(inputCol='features',outputCol='scaled')

gbt = GBTClassifier(labelCol="order_flag", featuresCol="features",maxIter= 40, maxDepth=6)

order_flag_prediction_pipeline = Pipeline(
    stages=[
        assembler,
        gbt
    ]
)

product_probability_fit = order_flag_prediction_pipeline.fit(train_samples)

product_probability = (
    product_probability_fit
    .transform(train_samples)
    .withColumn("pred", sparse_values("probability"))
    .withColumn('predicted_prob', F.col('pred')[1])
)

evaluator = BinaryClassificationEvaluator(rawPredictionCol='predicted_prob', labelCol='order_flag', metricName='areaUnderROC')

print("AUC train data  : ", evaluator.evaluate(product_probability))

test_product_probability = (
    product_probability_fit
    .transform(test_samples)
    .withColumn("pred", sparse_values("probability"))
    .withColumn('predicted_prob', F.col('pred')[1])
)

test_evaluator = BinaryClassificationEvaluator(rawPredictionCol='predicted_prob', labelCol='order_flag', metricName='areaUnderROC')

print("AUC test data  : ", test_evaluator.evaluate(test_product_probability))

#Print feature importance
feature_importances = product_probability_fit.stages[-1].featureImportances
list1 = sorted(list(zip(assemblerInputs, feature_importances)), key=operator.itemgetter(1), reverse=True)
for x in list1:
    print(x)