## Installasi Pyspark dan Persiapan

In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz

!tar xf spark-3.4.0-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
pip install pyspark==2.4.4

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==2.4.4
  Downloading pyspark-2.4.4.tar.gz (215.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m215.7/215.7 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.7 (from pyspark==2.4.4)
  Downloading py4j-0.10.7-py2.py3-none-any.whl (197 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m197.3/197.3 kB[0m [31m16.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130372 sha256=a1162b6d1533e130440213e616b88071a66ca7307c5ff81fa31a27d5b1b82665
  Stored in directory: /root/.cache/pip/wheels/d1/bb/c7/1323feaa6ff889a2471f9c82d07a83bd2ce52e8fb12f86a45a
Successfully built pyspark
Installing collec

In [None]:
import os
os.environ["JAVA_HOME"] = "/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate();

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Organizing data in DataFrames

In [None]:
irisTxtPath = '/content/drive/MyDrive/data/iris.csv'

In [None]:
empDF = spark.read.csv(irisTxtPath, header=True)

In [None]:
empDF

DataFrame[5.1: string, 3.5: string, 1.4: string, 0.2: string, Iris-setosa: string]

In [None]:
empDF.schema

StructType([StructField('5.1', StringType(), True), StructField('3.5', StringType(), True), StructField('1.4', StringType(), True), StructField('0.2', StringType(), True), StructField('Iris-setosa', StringType(), True)])

In [None]:
empDF.printSchema()

root
 |-- 5.1: string (nullable = true)
 |-- 3.5: string (nullable = true)
 |-- 1.4: string (nullable = true)
 |-- 0.2: string (nullable = true)
 |-- Iris-setosa: string (nullable = true)



In [None]:
empDF.columns

['5.1', '3.5', '1.4', '0.2', 'Iris-setosa']

In [None]:
empDF.take(5)

[Row(5.1='4.9', 3.5='3.0', 1.4='1.4', 0.2='0.2', Iris-setosa='Iris-setosa'),
 Row(5.1='4.7', 3.5='3.2', 1.4='1.3', 0.2='0.2', Iris-setosa='Iris-setosa'),
 Row(5.1='4.6', 3.5='3.1', 1.4='1.5', 0.2='0.2', Iris-setosa='Iris-setosa'),
 Row(5.1='5.0', 3.5='3.6', 1.4='1.4', 0.2='0.2', Iris-setosa='Iris-setosa'),
 Row(5.1='5.4', 3.5='3.9', 1.4='1.7', 0.2='0.4', Iris-setosa='Iris-setosa')]

In [None]:
empDF.count()

149

In [None]:
sampleDF = empDF.sample(False, 0.1)
sampleDF.count()

18

In [None]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [None]:
irisDF = spark.read.csv(irisTxtPath, inferSchema=True)

In [None]:
# get new iris txt file to read into for example
irisDF.columns

['_c0', '_c1', '_c2', '_c3', '_c4']

In [None]:
irisDF = irisDF.select(col('_c0').alias('sepal_length'),
                       col('_c1').alias('sepal_width'),
                       col('_c2').alias('petal_length'),
                       col('_c3').alias('petal_width'),
                       col('_c4').alias('species'))

In [None]:
irisDF.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa')]

In [None]:
irisDF.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [None]:
vectorAssembler = VectorAssembler(inputCols=['sepal_length',
                                             'sepal_width', 
                                             'petal_length',
                                             'petal_width'],
                                  outputCol='features')

In [None]:
virisDF = vectorAssembler.transform(irisDF)
virisDF.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]))]

In [None]:
indexer = StringIndexer(inputCol='species', outputCol='label')
iVirisDF = indexer.fit(virisDF).transform(virisDF)
iVirisDF

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double]

### Naive Bayes Classification

In [None]:
iVirisDF.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
splits = iVirisDF.randomSplit([0.6,0.4],1)
trainDF = splits[0]
testDF = splits[1]

In [None]:
trainDF.count()

98

In [None]:
testDF.count()

52

In [None]:
iVirisDF.count()

150

In [None]:
nb = NaiveBayes(modelType='multinomial')

In [None]:
nbModel = nb.fit(trainDF)

In [None]:
predictionsDF = nbModel.transform(testDF)

In [None]:
predictionsDF.take(1)

[Row(sepal_length=4.3, sepal_width=3.0, petal_length=1.1, petal_width=0.1, species='Iris-setosa', features=DenseVector([4.3, 3.0, 1.1, 0.1]), label=0.0, rawPrediction=DenseVector([-9.9894, -11.3476, -11.902]), probability=DenseVector([0.7118, 0.183, 0.1051]), prediction=0.0)]

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

In [None]:
nbAccuracy = evaluator.evaluate(predictionsDF)
nbAccuracy

0.9807692307692307