In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Project").getOrCreate()

In [2]:
spark

In [3]:
mydf = spark.read\
  .format('csv')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load('user_log.csv')    

In [4]:
mydf.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- cat_id: integer (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- brand_id: double (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- action: integer (nullable = true)
 |-- age_range: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- province: string (nullable = true)



In [5]:
mydf.dropna(how='any')

In [6]:
## import the packages and methods we need
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.feature import Binarizer
from pyspark.sql.types import DoubleType

In [7]:
mydf.groupBy('user_id','merchant_id','age_range','action','gender').count().orderBy('count').take(10)

[Row(user_id=404803, merchant_id=1535, age_range=6, action=2, gender='female', count=1),
 Row(user_id=348498, merchant_id=1724, age_range=0, action=2, gender='female', count=1),
 Row(user_id=404803, merchant_id=1535, age_range=4, action=0, gender='female', count=1),
 Row(user_id=209797, merchant_id=461, age_range=3, action=3, gender='female', count=1),
 Row(user_id=185159, merchant_id=3634, age_range=5, action=0, gender='female', count=1),
 Row(user_id=179739, merchant_id=1805, age_range=5, action=2, gender='female', count=1),
 Row(user_id=185159, merchant_id=606, age_range=6, action=0, gender='female', count=1),
 Row(user_id=313749, merchant_id=2206, age_range=6, action=0, gender='female', count=1),
 Row(user_id=185159, merchant_id=831, age_range=1, action=0, gender='female', count=1),
 Row(user_id=350592, merchant_id=4330, age_range=1, action=0, gender='female', count=1)]

In [8]:
newdf = mydf.groupBy('user_id','merchant_id','age_range','action','gender').count()

In [9]:
newdf.take(5)

[Row(user_id=185858, merchant_id=1636, age_range=4, action=0, gender='female', count=1),
 Row(user_id=375886, merchant_id=3716, age_range=3, action=0, gender='female', count=1),
 Row(user_id=375886, merchant_id=4047, age_range=4, action=0, gender='female', count=1),
 Row(user_id=253184, merchant_id=1428, age_range=5, action=0, gender='female', count=1),
 Row(user_id=308768, merchant_id=759, age_range=1, action=0, gender='female', count=1)]

In [10]:
newdf.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- age_range: integer (nullable = true)
 |-- action: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- count: long (nullable = false)



In [11]:
## since we are gonna use Logistic regression, we need to create a new column called if_return to identify whether or not
## a customer repeatedly buy items from a certain merchant
binarizer = Binarizer(threshold=1, inputCol="label", outputCol="labels")

In [12]:
newdf = newdf.withColumn("label", newdf["count"].cast(DoubleType()))

In [13]:
newdf = binarizer.transform(newdf)

In [14]:
newdf.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- age_range: integer (nullable = true)
 |-- action: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- count: long (nullable = false)
 |-- label: double (nullable = false)
 |-- labels: double (nullable = true)



#### ***Split into training and testing***

In [16]:
splitted_data = newdf.randomSplit([0.85, 0.15], 2333)
train_data = splitted_data[0]
test_data = splitted_data[1]

print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))

Number of training records: 29214817
Number of testing records : 5158740


#### ***train logistic model***

In [17]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline, Model

In [18]:
stringIndexer_age_range = StringIndexer(inputCol="age_range", outputCol="age_range_id")
stringIndexer_gender = StringIndexer(inputCol="gender", outputCol="gender_id")
stringIndexer_action = StringIndexer(inputCol="action", outputCol="action_id")

In [19]:
label_fit = StringIndexer(inputCol="labels", outputCol="labels_id").fit(newdf)

In [28]:
input_col = [
            'age_range_id',
            'gender_id',
            "age_range_id"
]

In [29]:
## In the following step, create a feature vector by using the vectorAssembler method
vectorAssembler_features = VectorAssembler(
    inputCols=input_col, 
    outputCol="features")

In [30]:
## define estimator, use LogisticRegression estimator
lr = LogisticRegression(featuresCol='features', labelCol= 'labels')

In [31]:
## build the pipeline
pipeline_lr = Pipeline(stages=[
                                stringIndexer_action,
                                stringIndexer_age_range,
                                stringIndexer_gender,
                                vectorAssembler_features,
                                lr])

In [32]:
## train model
model_lr = pipeline_lr.fit(train_data)

In [36]:
## prediction
predictions = model_lr.transform(test_data)

In [37]:
prediction_label = predictions.select('prediction','labels').rdd

In [38]:
evaluator = BinaryClassificationMetrics(prediction_label)

In [41]:
accuracy = evaluator.areaUnderROC
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.547


### Random Forest

In [45]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline, Model

In [46]:
rf = RandomForestClassifier(labelCol="labels", featuresCol="features", numTrees=20)

In [48]:
## build the pipeline
pipeline_rf = Pipeline(stages=[
                                stringIndexer_action,
                                stringIndexer_age_range,
                                stringIndexer_gender,
                                vectorAssembler_features,
                                rf])

In [49]:
## train model
model_rf = pipeline_rf.fit(train_data)

In [50]:
## prediction
predictions = model_rf.transform(test_data)

In [51]:
prediction_label = predictions.select('prediction','labels').rdd

In [52]:
evaluator = BinaryClassificationMetrics(prediction_label)

In [55]:
accuracy = evaluator.areaUnderROC
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.561


In [56]:
spark.stop()