In [1]:
#  Model training using Random Forest
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ctr-prediction').getOrCreate()
spark_df = spark.read.csv('hdfs://lakhwinder/preprocessed_500000.csv', header = True, inferSchema = True)
spark_df = spark_df.drop('_c0')
cols = spark_df.columns
#Handling Categorical Data
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
catergoricalFeature = ["banner_pos", "site_category", "app_category","device_type", "device_conn_type"]
stages = [] # stages in our Pipeline
for catergoricalFeat in catergoricalFeature:
    # StringIndexer for category Indexing
    strIndexer = StringIndexer(inputCol=catergoricalFeat, outputCol=catergoricalFeat + "Index").setHandleInvalid("skip")
    # Using OneHotEncoder
    ohencoder = OneHotEncoderEstimator(inputCols=[strIndexer.getOutputCol()], outputCols=[catergoricalFeat + "classVec"])
    # Add stages for pipeline
    stages += [strIndexer, ohencoder]

label_output = StringIndexer(inputCol="click", outputCol="label")
stages += [label_output]

numericCols = ["hour"]
assemblerInputs = [c + "classVec" for c in catergoricalFeature] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(spark_df)
preppedDataDF = pipelineModel.transform(spark_df)

train, test = preppedDataDF.randomSplit([0.90, 0.10], seed = 2)
from pyspark.ml.classification import RandomForestClassifier

#Calculating Time take to train the model
from time import *
start_time = time()

#Applying Random Forest Algorithm
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)

end_time = time()
elapsed_time = end_time - start_time
print("Time to train the Model LR : %.3f seconds" % elapsed_time)

Time to train the Model LR : 51.810 seconds


In [2]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
accuracy_test = evaluator.evaluate(predictions)
print("Test Area Under ROC: %s" % (accuracy_test))

Test Area Under ROC: 0.59742228126983
