In [1]:
import pandas as pd
import numpy as np

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, udf, transform, round

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

from pyspark.sql.functions import UserDefinedFunction

In [2]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import when

In [3]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
df = spark.read.csv('iris.csv', header=True)

In [5]:
df = df.withColumn("sepal_length",col("sepal_length").cast("decimal(10,2)"))
df = df.withColumn("sepal_width",col("sepal_width").cast("decimal(10,2)"))
df = df.withColumn("petal_length",col("petal_length").cast("decimal(10,2)"))
df = df.withColumn("petal_width",col("petal_width").cast("decimal(10,2)"))

In [6]:
assembler = VectorAssembler(
    inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"],
    outputCol="features")

In [7]:
indexer = StringIndexer(inputCol="species", outputCol="label")
indexed = indexer.fit(df).transform(df)
indexed.show()

+------------+-----------+------------+-----------+-------+-----+
|sepal_length|sepal_width|petal_length|petal_width|species|label|
+------------+-----------+------------+-----------+-------+-----+
|        5.10|       3.50|        1.40|       0.20| setosa|  0.0|
|        4.90|       3.00|        1.40|       0.20| setosa|  0.0|
|        4.70|       3.20|        1.30|       0.20| setosa|  0.0|
|        4.60|       3.10|        1.50|       0.20| setosa|  0.0|
|        5.00|       3.60|        1.40|       0.20| setosa|  0.0|
|        5.40|       3.90|        1.70|       0.40| setosa|  0.0|
|        4.60|       3.40|        1.40|       0.30| setosa|  0.0|
|        5.00|       3.40|        1.50|       0.20| setosa|  0.0|
|        4.40|       2.90|        1.40|       0.20| setosa|  0.0|
|        4.90|       3.10|        1.50|       0.10| setosa|  0.0|
|        5.40|       3.70|        1.50|       0.20| setosa|  0.0|
|        4.80|       3.40|        1.60|       0.20| setosa|  0.0|
|        4

In [8]:
output = assembler.transform(indexed)
output.show(truncate=False)

+------------+-----------+------------+-----------+-------+-----+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|species|label|features         |
+------------+-----------+------------+-----------+-------+-----+-----------------+
|5.10        |3.50       |1.40        |0.20       |setosa |0.0  |[5.1,3.5,1.4,0.2]|
|4.90        |3.00       |1.40        |0.20       |setosa |0.0  |[4.9,3.0,1.4,0.2]|
|4.70        |3.20       |1.30        |0.20       |setosa |0.0  |[4.7,3.2,1.3,0.2]|
|4.60        |3.10       |1.50        |0.20       |setosa |0.0  |[4.6,3.1,1.5,0.2]|
|5.00        |3.60       |1.40        |0.20       |setosa |0.0  |[5.0,3.6,1.4,0.2]|
|5.40        |3.90       |1.70        |0.40       |setosa |0.0  |[5.4,3.9,1.7,0.4]|
|4.60        |3.40       |1.40        |0.30       |setosa |0.0  |[4.6,3.4,1.4,0.3]|
|5.00        |3.40       |1.50        |0.20       |setosa |0.0  |[5.0,3.4,1.5,0.2]|
|4.40        |2.90       |1.40        |0.20       |setosa |0.0  |[4.4,2.9,1.

In [9]:
pd_df = output.toPandas()
pd_df

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species,label,features
0,5.10,3.50,1.40,0.20,setosa,0.0,"[5.1, 3.5, 1.4, 0.2]"
1,4.90,3.00,1.40,0.20,setosa,0.0,"[4.9, 3.0, 1.4, 0.2]"
2,4.70,3.20,1.30,0.20,setosa,0.0,"[4.7, 3.2, 1.3, 0.2]"
3,4.60,3.10,1.50,0.20,setosa,0.0,"[4.6, 3.1, 1.5, 0.2]"
4,5.00,3.60,1.40,0.20,setosa,0.0,"[5.0, 3.6, 1.4, 0.2]"
...,...,...,...,...,...,...,...
145,6.70,3.00,5.20,2.30,virginica,2.0,"[6.7, 3.0, 5.2, 2.3]"
146,6.30,2.50,5.00,1.90,virginica,2.0,"[6.3, 2.5, 5.0, 1.9]"
147,6.50,3.00,5.20,2.00,virginica,2.0,"[6.5, 3.0, 5.2, 2.0]"
148,6.20,3.40,5.40,2.30,virginica,2.0,"[6.2, 3.4, 5.4, 2.3]"


In [10]:
train = spark.createDataFrame(pd.concat([pd_df[0:40], pd_df[50:90], pd_df[100:140]]))

In [11]:
test = spark.createDataFrame(pd.concat([pd_df[40:50], pd_df[90:100], pd_df[140:150]]))

In [12]:
lr = LogisticRegression(featuresCol='features', labelCol='label')
model = lr.fit(train)

In [13]:
predictions = model.transform(test)

In [14]:
predictions = predictions.withColumn("sepal_length", round(col("sepal_length"), 1))
predictions = predictions.withColumn("sepal_width", round(col("sepal_width"), 1))
predictions = predictions.withColumn("petal_length", round(col("petal_length"), 1))
predictions = predictions.withColumn("petal_width", round(col("petal_width"), 1))

In [15]:
pd_predictions = predictions.toPandas()
pd_predictions[::]

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species,label,features,rawPrediction,probability,prediction
0,5.0,3.5,1.3,0.3,setosa,0.0,"[5.0, 3.5, 1.3, 0.3]","[46.87256845326954, 6.345610850428187, -53.218...","[1.0, 2.508221838098697e-18, 3.397352102323663...",0.0
1,4.5,2.3,1.3,0.3,setosa,0.0,"[4.5, 2.3, 1.3, 0.3]","[15.559291134275485, 17.434746468157627, -32.9...","[0.1329117559168728, 0.8670882440831272, 1.089...",1.0
2,4.4,3.2,1.3,0.2,setosa,0.0,"[4.4, 3.2, 1.3, 0.2]","[43.36377979091983, 7.284644621182931, -50.648...","[0.9999999999999998, 2.143042006804204e-16, 1....",0.0
3,5.0,3.5,1.6,0.6,setosa,0.0,"[5.0, 3.5, 1.6, 0.6]","[39.62006215277764, 5.885782895487623, -45.505...","[0.9999999999999978, 2.2355720310467022e-15, 1...",0.0
4,5.1,3.8,1.9,0.4,setosa,0.0,"[5.1, 3.8, 1.9, 0.4]","[48.811399188421845, 2.8140700358586876, -51.6...","[1.0, 1.0558780623082706e-20, 2.40337903653242...",0.0
5,4.8,3.0,1.4,0.3,setosa,0.0,"[4.8, 3.0, 1.4, 0.3]","[33.05731480008957, 10.897805000688356, -43.95...","[0.9999999997621807, 2.378193415439408e-10, 3....",0.0
6,5.1,3.8,1.6,0.2,setosa,0.0,"[5.1, 3.8, 1.6, 0.2]","[54.36842223940347, 3.221974255386346, -57.590...","[1.0, 6.128862407592577e-23, 2.381786467039574...",0.0
7,4.6,3.2,1.4,0.2,setosa,0.0,"[4.6, 3.2, 1.4, 0.2]","[41.53689886957623, 7.977049790180082, -49.513...","[0.9999999999999973, 2.661599653439496e-15, 2....",0.0
8,5.3,3.7,1.5,0.2,setosa,0.0,"[5.3, 3.7, 1.5, 0.2]","[51.14595964472402, 5.206544687196427, -56.352...","[1.0, 1.1188338106554354e-20, 2.06067869900423...",0.0
9,5.0,3.3,1.4,0.2,setosa,0.0,"[5.0, 3.3, 1.4, 0.2]","[42.166794102839624, 8.475103863630576, -50.64...","[0.9999999999999976, 2.3328394080186602e-15, 4...",0.0


In [16]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9665831244778613