In [1]:
!apt-get install openjdk-8-jdk-headless

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly installed, 0 to remove and 8 not upgraded.
Need to get 39.7 MB of archives.
After this operation, 144 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 libxtst6 amd64 2:1.2.3-1build4 [13.4 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 openjdk-8-jre-headless amd64 8u372-ga~us1-0ubuntu1~22.04 [30.8 MB]
Get:3 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 openjdk-8-jdk-headless amd64 8u372-ga~us1-0ubuntu1~22.04 [8,860 kB]
Fe

In [2]:
# installing hadoop
!wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz

--2023-07-23 12:51:32--  https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 272637746 (260M) [application/x-gzip]
Saving to: ‘spark-3.2.1-bin-hadoop2.7.tgz’


2023-07-23 12:51:47 (17.4 MB/s) - ‘spark-3.2.1-bin-hadoop2.7.tgz’ saved [272637746/272637746]



In [3]:
#unzinpping the folder
!tar xf /content/spark-3.2.1-bin-hadoop2.7.tgz

In [4]:
# installing findspark library
!pip install -q findspark

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"

In [6]:
# locating spark system
import findspark
findspark.init()
findspark.find()

'/content/spark-3.2.1-bin-hadoop2.7'

In [7]:
# setting the emvoironment
import pyspark
import numpy as np
import pandas as pd

In [8]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate();
spark = SparkSession(sc)

In [9]:
flamingo = spark.read.csv('/content/drive/MyDrive/Big Data Management/datasets/flamingo-data/game-clicks.csv', sep=',', header=True, inferSchema=True, nullValue='NA')

flamingo.show(5)

+--------------+-------+---------+----------+-----+
|     timestamp|clickId|teamLevel|count_hits|isHit|
+--------------+-------+---------+----------+-----+
|6/16/2016 8:11| 750154|        8|         8|    0|
|6/16/2016 8:11| 750503|        7|         5|    0|
|6/16/2016 8:11| 750692|        6|         2|    0|
|6/16/2016 8:11| 750788|        4|         4|    0|
|6/16/2016 8:11| 750133|        8|         0|    0|
+--------------+-------+---------+----------+-----+
only showing top 5 rows



In [10]:
flamingo.count()

3497

In [11]:
# Remove the useless features
flamingo1 = flamingo.drop('clickId')
flamingo2 = flamingo1.drop('timestamp')

flamingo2.show()


+---------+----------+-----+
|teamLevel|count_hits|isHit|
+---------+----------+-----+
|        8|         8|    0|
|        7|         5|    0|
|        6|         2|    0|
|        4|         4|    0|
|        8|         0|    0|
|        7|         5|    0|
|        8|         5|    0|
|        1|         8|    0|
|        5|         6|    0|
|        8|         9|    0|
|        8|         6|    0|
|        7|         9|    1|
|        8|         6|    0|
|        8|        14|    0|
|        8|         7|    0|
|        8|         8|    0|
|        7|        21|    0|
|        8|         4|    0|
|        5|        10|    0|
|        8|         2|    0|
+---------+----------+-----+
only showing top 20 rows



In [13]:
from pyspark.sql.functions import round


# Create 'label' column indicating it is a hit (1) or not(0)
flamingo2 = flamingo2.withColumn('label', (flamingo2.isHit >= 1).cast('integer'))

flamingo3 = flamingo2.drop('isHit')

# Check first five records
flamingo3.show(5)

+---------+----------+-----+
|teamLevel|count_hits|label|
+---------+----------+-----+
|        8|         8|    0|
|        7|         5|    0|
|        6|         2|    0|
|        4|         4|    0|
|        8|         0|    0|
+---------+----------+-----+
only showing top 5 rows



In [14]:
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=[
    'teamLevel', 'count_hits',
], outputCol='features')

# Consolidate predictor columns
flamingo_assembled = assembler.transform(flamingo3)

# Check the resulting column
flamingo_assembled.select('features', 'label').show(5, truncate=False)

+---------+-----+
|features |label|
+---------+-----+
|[8.0,8.0]|0    |
|[7.0,5.0]|0    |
|[6.0,2.0]|0    |
|[4.0,4.0]|0    |
|[8.0,0.0]|0    |
+---------+-----+
only showing top 5 rows



**Setup the naive bayes**

In [16]:
#performing the train test split, this will done for the both classifiers 80 and 20
(train, test) = flamingo_assembled.randomSplit([0.8, 0.2])

In [18]:
# testing the predicting model
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
nb = nb.fit(train)

pred = nb.transform(test)
pred.show(3)

+---------+----------+-----+---------+--------------------+--------------------+----------+
|teamLevel|count_hits|label| features|       rawPrediction|         probability|prediction|
+---------+----------+-----+---------+--------------------+--------------------+----------+
|        1|         0|    0|[1.0,0.0]|[-1.3061058772919...|[0.89326098705566...|       0.0|
|        1|         2|    0|[1.0,2.0]|[-2.0270467018149...|[0.89479032542860...|       0.0|
|        1|         2|    0|[1.0,2.0]|[-2.0270467018149...|[0.89479032542860...|       0.0|
+---------+----------+-----+---------+--------------------+--------------------+----------+
only showing top 3 rows



In [20]:
# now creating confusing matrix and accuracy  for the naive bayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix
evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")
acc = evaluator.evaluate(pred)

print("Prediction Accuracy: ", acc)

y_pred=pred.select("prediction").collect()
y_orig=pred.select("label").collect()

cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)

Prediction Accuracy:  0.8611474219317357
Confusion Matrix:
[[616   0]
 [ 64   0]]


**now setup the decision tree**

In [22]:
# Split into training and test sets in a 80:20 ratio
flamingo_train, flamingo_test = flamingo_assembled.randomSplit([0.8, 0.2], seed=42)

# Check that training set has around 80% of records
training_ratio = flamingo_train.count() / flamingo_assembled.count()
print(training_ratio)

0.8149842722333429


**build decesion tree**

In [23]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(flamingo_train)

# Create predictions for the testing data and take a look at the predictions

prediction = tree_model.transform(flamingo_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0    |0.0       |[0.8989473684210526,0.10105263157894737]|
|0    |0.0       |[0.8989473684210526,0.10105263157894737]|
|0    |0.0       |[0.8989473684210526,0.10105263157894737]|
|1    |0.0       |[0.8989473684210526,0.10105263157894737]|
|0    |0.0       |[0.8989473684210526,0.10105263157894737]|
+-----+----------+----------------------------------------+
only showing top 5 rows



In [24]:
# Create a confusion matrix
prediction.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN = prediction.filter('prediction = 0 AND label = prediction').count()
TP = prediction.filter('prediction = 1 AND label = prediction').count()
FN = prediction.filter('prediction = 0 AND label = 1').count()
FP = prediction.filter('prediction = 1 AND label = 0').count()

# Accuracy measures the proportion of correct predictions
accuracy = (TN + TP) / (TN + TP + FN + FP)
print(accuracy)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   71|
|    0|       0.0|  576|
+-----+----------+-----+

0.8902627511591963


In [30]:
!jupyter nbconvert --to html /content/Machine_learning_classification_ALgorithm_1_algorithm_2 (1).ipynb

/bin/bash: -c: line 1: syntax error near unexpected token `('
/bin/bash: -c: line 1: `jupyter nbconvert --to html /content/Machine_learning_classification_ALgorithm_1_algorithm_2 (1).ipynb'
