# Data Classification Analysis

In [None]:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz 
!tar -xvf spark-3.0.3-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"
import findspark
findspark.init()


In [None]:
import pyspark

In [None]:
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

In [None]:
# Libraries
from pyspark.sql.functions import mean, stddev, col, log
from pyspark.sql.functions import to_date, dayofweek, to_timestamp
from pyspark.sql import types
from pyspark.sql.functions import col, udf
from datetime import datetime
from pyspark.sql.types import DateType
from pyspark.sql.functions import year, month
from pyspark.sql.functions import dayofmonth, weekofyear
from pyspark.sql.functions import split, explode
from pyspark.sql.functions import coalesce, first, lit
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql.functions import regexp_extract, col


In [None]:
import numpy             as np
import pandas            as pd
import matplotlib.pyplot as plt
import seaborn           as sns
import plotly.graph_objs as go
import re
import string
from wordcloud   import WordCloud, STOPWORDS
from sklearn.metrics import classification_report,accuracy_score,confusion_matrix


import warnings
warnings.filterwarnings("ignore")

In [None]:
spark = SparkSession.builder.appName('decision_tree').getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Read combined_data.csv
path = "/content/drive/MyDrive/BCU/Assignment_Flamingo/"
data = pd.read_csv(path+'/DataSet/combined-data/combined-data.csv')

### Dataset combined-data.csv

In [None]:
#Number of rows and columns in the dataset
data.shape

(4619, 8)

In [None]:
# Remove rows with NaN values
data = data.dropna()

data.head()

Unnamed: 0,userId,userSessionId,teamLevel,platformType,count_gameclicks,count_hits,count_buyId,avg_price
4,937,5652,1,android,39,0,1.0,1.0
11,1623,5659,1,iphone,129,9,1.0,10.0
13,83,5661,1,android,102,14,1.0,5.0
17,121,5665,1,android,39,4,1.0,3.0
18,462,5666,1,android,90,10,1.0,3.0


In [None]:
#Number of rows and columns in the new dataset
data.shape

(1411, 8)

### Attribute Creation

In [None]:
# HighRollers(1) purchase items > $5.00, PennyPincher(0) purchase items <= $5.00 .

data['label'] = data['avg_price'].apply(lambda x: 1 if x > 5 else 0)
data.head()

Unnamed: 0,userId,userSessionId,teamLevel,platformType,count_gameclicks,count_hits,count_buyId,avg_price,label
4,937,5652,1,android,39,0,1.0,1.0,0
11,1623,5659,1,iphone,129,9,1.0,10.0,1
13,83,5661,1,android,102,14,1.0,5.0,0
17,121,5665,1,android,39,4,1.0,3.0,0
18,462,5666,1,android,90,10,1.0,3.0,0


In [None]:
data['label'].value_counts()

0    836
1    575
Name: label, dtype: int64

## Data Partitioning and Modeling

In [None]:
data.columns

Index(['userId', 'userSessionId', 'teamLevel', 'platformType',
       'count_gameclicks', 'count_hits', 'count_buyId', 'avg_price', 'label'],
      dtype='object')

In [None]:
# Create the final training dataset (filtered from the dataset: userId, userSessionId, avg_price)
df = data[['teamLevel', 'platformType', 'count_gameclicks', 'count_hits', 'count_buyId', 'label']]
df.head()

Unnamed: 0,teamLevel,platformType,count_gameclicks,count_hits,count_buyId,label
4,1,android,39,0,1.0,0
11,1,iphone,129,9,1.0,1
13,1,android,102,14,1.0,0
17,1,android,39,4,1.0,0
18,1,android,90,10,1.0,0


In [None]:
p_df = spark.createDataFrame(df)
p_df.show(3)

+---------+------------+----------------+----------+-----------+-----+
|teamLevel|platformType|count_gameclicks|count_hits|count_buyId|label|
+---------+------------+----------------+----------+-----------+-----+
|        1|     android|              39|         0|        1.0|    0|
|        1|      iphone|             129|         9|        1.0|    1|
|        1|     android|             102|        14|        1.0|    0|
+---------+------------+----------------+----------+-----------+-----+
only showing top 3 rows



In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.functions import datediff

In [None]:
# Create an indexer
indexer = StringIndexer(inputCol='platformType', outputCol='platformType_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(p_df)

# Indexer creates a new column with numeric index values
data_indexed = indexer_model.transform(p_df)


In [None]:
data_indexed.show(3)

+---------+------------+----------------+----------+-----------+-----+----------------+
|teamLevel|platformType|count_gameclicks|count_hits|count_buyId|label|platformType_idx|
+---------+------------+----------------+----------+-----------+-----+----------------+
|        1|     android|              39|         0|        1.0|    0|             1.0|
|        1|      iphone|             129|         9|        1.0|    1|             0.0|
|        1|     android|             102|        14|        1.0|    0|             1.0|
+---------+------------+----------------+----------+-----------+-----+----------------+
only showing top 3 rows



In [None]:
# Create an encoder
encoder = OneHotEncoder(inputCol="platformType_idx", outputCol="platformType_vec", dropLast=True)
data_indexed = encoder.fit(data_indexed).transform(data_indexed)


In [None]:
data_indexed.show(3)

+---------+------------+----------------+----------+-----------+-----+----------------+----------------+
|teamLevel|platformType|count_gameclicks|count_hits|count_buyId|label|platformType_idx|platformType_vec|
+---------+------------+----------------+----------+-----------+-----+----------------+----------------+
|        1|     android|              39|         0|        1.0|    0|             1.0|   (4,[1],[1.0])|
|        1|      iphone|             129|         9|        1.0|    1|             0.0|   (4,[0],[1.0])|
|        1|     android|             102|        14|        1.0|    0|             1.0|   (4,[1],[1.0])|
+---------+------------+----------------+----------+-----------+-----+----------------+----------------+
only showing top 3 rows



In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler


In [None]:
data_indexed.columns

['teamLevel',
 'platformType',
 'count_gameclicks',
 'count_hits',
 'count_buyId',
 'label',
 'platformType_idx',
 'platformType_vec']

In [None]:
# Create an assembler object
assembler = VectorAssembler(inputCols=[
'teamLevel', 'count_gameclicks', 'count_hits', 'count_buyId', 'platformType_vec'],
 outputCol='features')

In [None]:
data_pre = assembler.transform(data_indexed)

In [None]:
# Check the resulting column
data_pre.select('features', 'label').show(5, truncate=False)

+------------------------------------+-----+
|features                            |label|
+------------------------------------+-----+
|(8,[0,1,3,5],[1.0,39.0,1.0,1.0])    |0    |
|[1.0,129.0,9.0,1.0,1.0,0.0,0.0,0.0] |1    |
|[1.0,102.0,14.0,1.0,0.0,1.0,0.0,0.0]|0    |
|[1.0,39.0,4.0,1.0,0.0,1.0,0.0,0.0]  |0    |
|[1.0,90.0,10.0,1.0,0.0,1.0,0.0,0.0] |0    |
+------------------------------------+-----+
only showing top 5 rows



In [None]:
final_data = data_pre.select( "features", "label" )
final_data.count()


1411

### BUILD MODEL

In [None]:
# Train-test data
train_data, test_data = final_data.randomSplit([0.8,0.2])

### DECISION TREE

In [None]:
# Import the Decision Tree Classifier class
from pyspark.ml.classification import DecisionTreeClassifier


In [None]:
# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier(featuresCol='features', labelCol='label', predictionCol='prediction')


In [None]:
# Fit the model to the data and call this tree model
tree_model = tree.fit(train_data)


In [None]:
# Print the decision tree
print(tree_model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_f41048184ac6, depth=5, numNodes=29, numClasses=2, numFeatures=8
  If (feature 4 in {0.0})
   If (feature 7 in {1.0})
    Predict: 0.0
   Else (feature 7 not in {1.0})
    If (feature 3 <= 1.5)
     If (feature 2 <= 4.5)
      If (feature 1 <= 68.5)
       Predict: 0.0
      Else (feature 1 > 68.5)
       Predict: 1.0
     Else (feature 2 > 4.5)
      Predict: 0.0
    Else (feature 3 > 1.5)
     Predict: 0.0
  Else (feature 4 not in {0.0})
   If (feature 3 <= 1.5)
    If (feature 2 <= 12.5)
     If (feature 1 <= 36.5)
      Predict: 1.0
     Else (feature 1 > 36.5)
      If (feature 1 <= 49.5)
       Predict: 0.0
      Else (feature 1 > 49.5)
       Predict: 1.0
    Else (feature 2 > 12.5)
     If (feature 0 <= 1.5)
      If (feature 1 <= 74.5)
       Predict: 1.0
      Else (feature 1 > 74.5)
       Predict: 0.0
     Else (feature 0 > 1.5)
      Predict: 1.0
   Else (feature 3 > 1.5)
    If (feature 1 <= 358.5)
     Predict: 1

In [None]:
## Dánh giá kêt quá

# Check test dataset
test_model = tree_model.transform(test_data)


In [None]:
# Inspect results
test_model.select('label', 'prediction', 'probability').show(3, False)


+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0    |0.0       |[0.9230769230769231,0.07692307692307693]|
|1    |0.0       |[0.8202247191011236,0.1797752808988764] |
|0    |0.0       |[0.8202247191011236,0.1797752808988764] |
+-----+----------+----------------------------------------+
only showing top 3 rows



In [None]:
## Dánh giá kêt quá

# Create a conrusion matrix
test_model.groupBy('label', 'prediction').count().show()


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   34|
|    0|       1.0|   15|
|    0|       0.0|  166|
|    1|       1.0|  105|
+-----+----------+-----+



In [None]:
# Calculate the elements of the confusion matrix
TN = test_model.filter('prediction = 0 AND label = prediction').count()
TP = test_model.filter('prediction = 1 AND label = prediction').count()
FN = test_model.filter('prediction = 0 AND label != prediction').count()
FP = test_model.filter('prediction = 1 AND label != prediction').count()


In [None]:
print('TN: {}, TP: {}, FN: {}, FP: {}'.format(str(TN), str(TP), str(FN), str(FP)))

TN: 166, TP: 105, FN: 34, FP: 15


In [None]:
total_samples = TN + TP + FN + FP

TN_percent = (TN / total_samples) * 100
TP_percent = (TP / total_samples) * 100
FN_percent = (FN / total_samples) * 100
FP_percent = (FP / total_samples) * 100

print('TN_percent: {:.2f}%'.format(TN_percent))
print('TP_percent: {:.2f}%'.format(TP_percent))
print('FN_percent: {:.2f}%'.format(FN_percent))
print('FP_percent: {:.2f}%'.format(FP_percent))

TN_percent: 51.88%
TP_percent: 32.81%
FN_percent: 10.62%
FP_percent: 4.69%


In [None]:
# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

0.846875


In [None]:
# Save model
tree_model.save('tree_model_flamingo')

### RANDOM FOREST

In [None]:
## Xây dung model
from pyspark.ml.classification import RandomForestClassifier

In [None]:
rfc = RandomForestClassifier(featuresCol='features', labelCol='label', predictionCol='prediction')

In [None]:
# Fit the model to the data and call this rfc model
rfc_model = rfc.fit(train_data)

# Find the number of trees and the relative importance of features
print("Number of trees:", rfc_model.getNumTrees)
print("Relative importance of features:", rfc_model.featureImportances)

Number of trees: 20
Relative importance of features: (8,[0,1,2,3,4,5,6,7],[0.01809367029557833,0.01660299721718844,0.014061993972637705,0.025058564939057205,0.7504755058950416,0.10864993634899589,0.052057802487568326,0.014999528843932302])


In [None]:
## Dánh giá kêt quá


In [None]:
# Check test dataset
rfc_test_model = rfc_model.transform(test_data)
# Inspect results
rfc_test_model.select('label', 'prediction', 'probability').show(3, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0    |0.0       |[0.8643267455314649,0.13567325446853506]|
|1    |0.0       |[0.8875204582476058,0.11247954175239408]|
|0    |0.0       |[0.8517130981650084,0.14828690183499155]|
+-----+----------+----------------------------------------+
only showing top 3 rows



In [None]:
## Random Forest

# create a contusion matrix
rfc_test_model.groupBy('label', 'prediction').count().show()


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   29|
|    0|       1.0|   15|
|    0|       0.0|  166|
|    1|       1.0|  110|
+-----+----------+-----+



In [None]:
# Calculate the elements of the confusion matrix
TN = rfc_test_model.filter('prediction = 0 AND label = prediction').count()
TP = rfc_test_model.filter('prediction = 1 AND label = prediction'). count()
FN = rfc_test_model.filter('prediction = 0 AND label != prediction').count()
FP = rfc_test_model.filter('prediction = 1 AND label != prediction').count()
print('TN: {}, TP: {}, FN: {}, FP: {}'.format(str(TN), str(TP), str(FN), str(FP)))


TN: 166, TP: 110, FN: 29, FP: 15


In [None]:
total_samples = TN + TP + FN + FP

TN_percent = (TN / total_samples) * 100
TP_percent = (TP / total_samples) * 100
FN_percent = (FN / total_samples) * 100
FP_percent = (FP / total_samples) * 100

print('TN_percent: {:.2f}%'.format(TN_percent))
print('TP_percent: {:.2f}%'.format(TP_percent))
print('FN_percent: {:.2f}%'.format(FN_percent))
print('FP_percent: {:.2f}%'.format(FP_percent))

TN_percent: 51.88%
TP_percent: 34.38%
FN_percent: 9.06%
FP_percent: 4.69%


In [None]:
# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

0.8625


In [None]:
# Save model
rfc_model.save('rfc_model_flamingo')

### So sánh kêt qua 2 model: Decision Tree model, Random Forest model

In [None]:
# Vói Multiclass Classification Evaluator so sánh acc
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [None]:
dtc_predictions = tree_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)


In [None]:
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="label",
predictionCol="prediction", metricName="accuracy")

In [None]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)


In [None]:
print("Results")
print('-'*60)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*60)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*60)


Results
------------------------------------------------------------
A single decision tree has an accuracy of: 84.69%
------------------------------------------------------------
A random forest ensemble has an accuracy of: 86.25%
------------------------------------------------------------


### Vói BinaryClassification Evaluator so sánh areaUnderROC

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Compare AUC on testing data
evaluator = BinaryClassificationEvaluator()


In [None]:
dtc_auc_2 = evaluator.evaluate(dtc_predictions)
rfc_auc_2 = evaluator.evaluate(rfc_predictions)


In [None]:
print("Results")
print('-'*60)
print('A single decision tree has an AUC of: {0:2.2f}%'.format(dtc_auc_2*100))
print('-'*60)
print('A random forest ensemble has an AUC of: {0:2.2f}%'.format(rfc_auc_2*100))
print('-'*60)


Results
------------------------------------------------------------
A single decision tree has an AUC of: 80.28%
------------------------------------------------------------
A random forest ensemble has an AUC of: 86.92%
------------------------------------------------------------
