<a href="https://colab.research.google.com/github/eksalailia/BigData_Genap_2022/blob/main/BigData_UAS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Setting PySpark

In [1]:
# Install java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Apache Spark binary:
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

# Unzip file
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

# Install findspark:
!pip install -q findspark

# Install pyspark
!pip install pyspark

In [2]:
# Menambahkan variable
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
import findspark
findspark.init()

## Initialize SparkSession

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("Hands-on PySpark on Google Colab") \
        .getOrCreate()

In [6]:
spark

## Read Data

In [7]:
!wget -q https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv -P sample_data/

In [24]:
filepath = "sample_data/winequality-red.csv"
spark_df = spark.read.format('csv').options(header='true', inferSchema='true', delimiter=";").load(filepath)
spark_df.show(5, truncate=False)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|7.4          |0.7             |0.0        |1.9           |0.076    |11.0               |34.0                |0.9978 |3.51|0.56     |9.4    |5      |
|7.8          |0.88            |0.0        |2.6           |0.098    |25.0               |67.0                |0.9968 |3.2 |0.68     |9.8    |5      |
|7.8          |0.76            |0.04       |2.3           |0.092    |15.0               |54.0                |0.997  |3.26|0.65     |9.8    |5      |
|11.2         |0.28            |0.56       |1.9           |0.075    |17.0               |60.0       

## Introduction

In [26]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

# StringIndexer memberikan label untuk tiap kategori
# OneHotEncoder membuat vektor pengkodean
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# VectorAssembler digunakan untuk membuat vector dari fungsi. Pemodelan mengambil vektor sebagai input
from pyspark.ml.feature import VectorAssembler

# DecisionTreeClassifier digunakan untuk klasifikasi masalah
from pyspark.ml.classification import DecisionTreeClassifier

In [29]:
# Membuat column kategori untuk menjelaskan tujuan
spark_df = spark_df.withColumn("alcohol", F.when(F.col("alcohol") > 10.5, "Tinggi").otherwise("Rendah"))
spark_df.show(3, truncate=False)

spark_df.groupby("alcohol").count().show(), spark_df.select("quality").distinct().show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|7.4          |0.7             |0.0        |1.9           |0.076    |11.0               |34.0                |0.9978 |3.51|0.56     |Rendah |5      |
|7.8          |0.88            |0.0        |2.6           |0.098    |25.0               |67.0                |0.9968 |3.2 |0.68     |Rendah |5      |
|7.8          |0.76            |0.04       |2.3           |0.092    |15.0               |54.0                |0.997  |3.26|0.65     |Rendah |5      |
+-------------+----------------+-----------+--------------+---------+-------------------+-----------

(None, None)

In [28]:
(train_df, test_df) = spark_df.randomSplit([0.8, 0.2], 11)
print("Jumlah sampel training : " + str(train_df.count()))
print("Jumlah sampel testing: " + str(test_df.count()))

Jumlah sampel training : 1279
Jumlah sampel testing: 320


## Modeling

In [12]:
train_df.show(3, truncate=False)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|4.6          |0.52            |0.15       |2.1           |0.054    |8.0                |65.0                |0.9934 |3.9 |0.56     |High   |4      |
|4.7          |0.6             |0.17       |2.3           |0.058    |17.0               |106.0               |0.9932 |3.85|0.6      |High   |6      |
|4.9          |0.42            |0.0        |2.1           |0.048    |16.0               |42.0                |0.99154|3.71|0.74     |High   |7      |
+-------------+----------------+-----------+--------------+---------+-------------------+-----------

In [13]:
alcohol_indexer = StringIndexer(inputCol="alcohol", outputCol="alcoholIndex")
alcohol_indexer = alcohol_indexer.fit(train_df)
train_df = alcohol_indexer.transform(train_df)
train_df.show(3, truncate=False)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|alcoholIndex|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+------------+
|4.6          |0.52            |0.15       |2.1           |0.054    |8.0                |65.0                |0.9934 |3.9 |0.56     |High   |4      |1.0         |
|4.7          |0.6             |0.17       |2.3           |0.058    |17.0               |106.0               |0.9932 |3.85|0.6      |High   |6      |1.0         |
|4.9          |0.42            |0.0        |2.1           |0.048    |16.0               |42.0                |0.99154|3.71|0.74     |High   |7      |1.0         |
+-------------+-------

In [14]:
print(train_df.columns)

['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcohol', 'quality', 'alcoholIndex']


In [15]:
inputCols = ['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcoholIndex']

outputCol = "features"
vector_assembler = VectorAssembler(inputCols = inputCols, outputCol = outputCol)
train_df = vector_assembler.transform(train_df)

In [16]:
train_df.show(3, truncate=False)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+------------+--------------------------------------------------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|alcoholIndex|features                                                |
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+------------+--------------------------------------------------------+
|4.6          |0.52            |0.15       |2.1           |0.054    |8.0                |65.0                |0.9934 |3.9 |0.56     |High   |4      |1.0         |[4.6,0.52,0.15,2.1,0.054,8.0,65.0,0.9934,3.9,0.56,1.0]  |
|4.7          |0.6             |0.17       |2.3           |0.058    |17.0               |106.0               |0.9932 |3.

In [17]:
modeling_df = train_df.select(['features', 'quality'])
modeling_df.show(3, truncate=False)

+--------------------------------------------------------+-------+
|features                                                |quality|
+--------------------------------------------------------+-------+
|[4.6,0.52,0.15,2.1,0.054,8.0,65.0,0.9934,3.9,0.56,1.0]  |4      |
|[4.7,0.6,0.17,2.3,0.058,17.0,106.0,0.9932,3.85,0.6,1.0] |6      |
|[4.9,0.42,0.0,2.1,0.048,16.0,42.0,0.99154,3.71,0.74,1.0]|7      |
+--------------------------------------------------------+-------+
only showing top 3 rows



In [18]:
# Membuat model DecisionTreeClassifier 
dt_model = DecisionTreeClassifier(labelCol="quality", featuresCol="features")

# Training model dengan Training Data
dt_model = dt_model.fit(modeling_df)

In [19]:
predictions = dt_model.transform(modeling_df)
predictions.show(5, truncate=False)

+--------------------------------------------------------+-------+----------------------------------------+----------------------------------------------------------------------------------------------------------------------+----------+
|features                                                |quality|rawPrediction                           |probability                                                                                                           |prediction|
+--------------------------------------------------------+-------+----------------------------------------+----------------------------------------------------------------------------------------------------------------------+----------+
|[4.6,0.52,0.15,2.1,0.054,8.0,65.0,0.9934,3.9,0.56,1.0]  |4      |[0.0,0.0,0.0,3.0,4.0,31.0,11.0,1.0,0.0] |[0.0,0.0,0.0,0.06,0.08,0.62,0.22,0.02,0.0]                                                                            |5.0       |
|[4.7,0.6,0.17,2.3,0.058,17.0,106.0,0.9932,3.85,

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluatorDT = MulticlassClassificationEvaluator(labelCol="quality")
area_under_curve = evaluatorDT.evaluate(predictions)

print(area_under_curve)

0.6096614519053557


## Test predictions

In [21]:
# Pada data uji - tranformasi data uji menggunakan semua transformator dan estimasi dalam urutan yang sama
test_df = alcohol_indexer.transform(test_df)
test_df = vector_assembler.transform(test_df)
test_predictions = dt_model.transform(test_df)

test_predictions.show(3, truncate=False)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+------------+---------------------------------------------------------+----------------------------------------+---------------------------------------------------------------------------------------------------+----------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|alcoholIndex|features                                                 |rawPrediction                           |probability                                                                                        |prediction|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+------------+---------------------------------------------------------+----------------------------------

In [22]:
area_under_curve = evaluatorDT.evaluate(test_predictions)
area_under_curve

0.5816548284862043