In [None]:
## INSTALL SPARK

!apt-get clean
!rm -rf /var/lib/apt/lists/*
!apt-get update
!apt-get upgrade

# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

import findspark
findspark.init()

In [97]:
from pyspark.sql import SparkSession

# 1. load data
spark = SparkSession.builder.getOrCreate()

df_train = spark.read.option("inferSchema", "true").csv("sample_data/Training.csv", header=True)
df_test = spark.read.option("inferSchema", "true").csv("sample_data/Testing.csv", header=True)

In [98]:
df_train.show()

+-----------+----------+------------+------+--------+-----------+-----+---------+
|CapDiameter|StemHeight|StemDiameter|Spores|BlackDot|  Accessory|Color|Poisonous|
+-----------+----------+------------+------+--------+-----------+-----+---------+
|      17880|     11540|        7849|  High|       6|White Gills|  Red|        1|
|       9320|      9305|       15931|  High|       5|       None|Brown|        0|
|      12638|      1471|        2925|   Low|       4|       None|Brown|        0|
|      13513|      6894|       16852|  High|       3|       None|White|        0|
|       4838|      2979|       17055|Medium|       3|White Gills|  Red|        1|
|       7409|      1568|       17718|  High|       0|       None|Brown|        0|
|      14691|      1381|       12840|  High|       1|       None|Brown|        0|
|      13623|      4617|        4983|   Low|       8|White Gills|  Red|        1|
|       9524|      7680|       10188|   Low|       8|White Gills|Brown|        1|
|      12209|   

In [99]:
df_test.show()

+-----------+----------+------------+------+--------+-----------+-----+---------+
|CapDiameter|StemHeight|StemDiameter|Spores|BlackDot|  Accessory|Color|Poisonous|
+-----------+----------+------------+------+--------+-----------+-----+---------+
|       7219|       477|        1513|   Low|       5|       None|Brown|        0|
|       4396|       689|         977|   Low|       4|       None|Brown|        0|
|       2575|       598|        1468|   Low|       1|       Ring|  Red|        0|
|       1931|      1701|         644|   Low|       4|White Gills|Brown|        0|
|       4465|      1087|         158|   Low|       5|       Ring|Brown|        0|
|       8445|      1364|         603|   Low|       6|       None|White|        0|
|       8612|       998|         534|Medium|       3|       None|Brown|        0|
|       3038|       326|        1532|   Low|       4|       None|Brown|        0|
|       3015|       828|        1268|Medium|       5|       None|Brown|        0|
|       3731|   

In [100]:
# 2. Select Features
df_train = df_train.select("Spores", "Accessory", "color", "Poisonous")
df_test = df_train.select("Spores", "Accessory", "color", "Poisonous")

In [101]:
from pyspark.sql.functions import when
from pyspark.ml.feature import VectorAssembler, StandardScaler

# 3. Preprocessing
def preprocessing(data):
  # 3. remove missing values
  data = data.na.drop()

  # 4. Transform Data
  data = data.withColumn("Spores", when(data['Spores'] == "Low", 0).when(data['Spores'] == "Medium", 1).otherwise(2))
  data = data.withColumn("Accessory", when(data['Accessory'] == "White Gills", 1).when(data['Accessory'] == "Ring", 2).otherwise(0))
  data = data.withColumn("color", when(data['color'] == "White", 0).when(data['color'] == "Brown", 1).otherwise(2))

  # 5. Normalization
  cols = data.columns
  cols.remove("Poisonous")
  data = VectorAssembler(inputCols = cols, outputCol="Features").transform(data)

  scaler = StandardScaler(inputCol = "Features", outputCol="Scaled_Features")
  data = scaler.fit(data).transform(data)

  return data

In [102]:
df_train = preprocessing(df_train)

In [103]:
df_test = preprocessing(df_test)

In [104]:
df_train.show()

+------+---------+-----+---------+-------------+--------------------+
|Spores|Accessory|color|Poisonous|     Features|     Scaled_Features|
+------+---------+-----+---------+-------------+--------------------+
|     2|        1|    2|        1|[2.0,1.0,2.0]|[2.23321947561635...|
|     2|        0|    1|        0|[2.0,0.0,1.0]|[2.23321947561635...|
|     0|        0|    1|        0|[0.0,0.0,1.0]|[0.0,0.0,1.425022...|
|     2|        0|    0|        0|[2.0,0.0,0.0]|[2.23321947561635...|
|     1|        1|    2|        1|[1.0,1.0,2.0]|[1.11660973780817...|
|     2|        0|    1|        0|[2.0,0.0,1.0]|[2.23321947561635...|
|     2|        0|    1|        0|[2.0,0.0,1.0]|[2.23321947561635...|
|     0|        1|    2|        1|[0.0,1.0,2.0]|[0.0,1.4666842005...|
|     0|        1|    1|        1|[0.0,1.0,1.0]|[0.0,1.4666842005...|
|     1|        2|    2|        1|[1.0,2.0,2.0]|[1.11660973780817...|
|     2|        0|    2|        0|[2.0,0.0,2.0]|[2.23321947561635...|
|     0|        1|  

In [105]:
df_test.show()

+------+---------+-----+---------+-------------+--------------------+
|Spores|Accessory|color|Poisonous|     Features|     Scaled_Features|
+------+---------+-----+---------+-------------+--------------------+
|     2|        1|    2|        1|[2.0,1.0,2.0]|[2.23321947561635...|
|     2|        0|    1|        0|[2.0,0.0,1.0]|[2.23321947561635...|
|     0|        0|    1|        0|[0.0,0.0,1.0]|[0.0,0.0,1.425022...|
|     2|        0|    0|        0|[2.0,0.0,0.0]|[2.23321947561635...|
|     1|        1|    2|        1|[1.0,1.0,2.0]|[1.11660973780817...|
|     2|        0|    1|        0|[2.0,0.0,1.0]|[2.23321947561635...|
|     2|        0|    1|        0|[2.0,0.0,1.0]|[2.23321947561635...|
|     0|        1|    2|        1|[0.0,1.0,2.0]|[0.0,1.4666842005...|
|     0|        1|    1|        1|[0.0,1.0,1.0]|[0.0,1.4666842005...|
|     1|        2|    2|        1|[1.0,2.0,2.0]|[1.11660973780817...|
|     2|        0|    2|        0|[2.0,0.0,2.0]|[2.23321947561635...|
|     0|        1|  

In [106]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# 6. Generate Model
model = LogisticRegression(featuresCol="Scaled_Features", labelCol="Poisonous", maxIter=10).fit(df_train)
prediction = model.transform(df_test)

# transform df_train pasti lebih gede akurasinya karena udah pernah di pelajari

In [107]:
prediction.show()

+------+---------+-----+---------+-------------+--------------------+--------------------+--------------------+----------+
|Spores|Accessory|color|Poisonous|     Features|     Scaled_Features|       rawPrediction|         probability|prediction|
+------+---------+-----+---------+-------------+--------------------+--------------------+--------------------+----------+
|     2|        1|    2|        1|[2.0,1.0,2.0]|[2.23321947561635...|[-0.3317139036092...|[0.41782366253835...|       1.0|
|     2|        0|    1|        0|[2.0,0.0,1.0]|[2.23321947561635...|[2.72488650084214...|[0.93847926565163...|       0.0|
|     0|        0|    1|        0|[0.0,0.0,1.0]|[0.0,0.0,1.425022...|[0.59964515183811...|[0.64557511836318...|       0.0|
|     2|        0|    0|        0|[2.0,0.0,0.0]|[2.23321947561635...|[4.74385627470264...|[0.99137011083530...|       0.0|
|     1|        1|    2|        1|[1.0,1.0,2.0]|[1.11660973780817...|[-1.3943345781112...|[0.19871666767591...|       1.0|
|     2|        

In [108]:
# 7. model testing and evaluation

# evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="Poisonous")
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="Poisonous")

print("Accuracy: {}".format(evaluator.evaluate(prediction)))

Accuracy: 0.8514811871750675
