In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *
import math
ss = SparkSession.builder.getOrCreate()

## Create iris dataframe using the irisSchema

In [2]:
irisSchema = StructType([StructField("sepal_length", DoubleType(), True), 
                         StructField("sepal_width", DoubleType(), True),
                         StructField("petal_length", DoubleType(), True), 
                         StructField("petal_width", DoubleType(), True),
                         StructField("class", StringType(), True)])

In [3]:
iris = ss.read.csv('../Data/iris.csv', schema = irisSchema, header=False)

## Change "class" to 1 if it is "Iris-versicolor", otherwise 0

In [4]:
classes = ["Iris-versicolor"]

def check_class(x, codes):
    if x in codes:
        return 1
    else:
        return 0

def check_class_udf(codes):
    """Create and register udf using check_class()."""
    return udf(lambda x: check_class(x, classes), IntegerType())

In [5]:
iris_binary_class = iris.withColumn("class_int", check_class_udf(classes)("class"))\
                        .drop("class")\
                        .withColumnRenamed("class_int", "class")

In [6]:
iris_binary_class.show()

+------------+-----------+------------+-----------+-----+
|sepal_length|sepal_width|petal_length|petal_width|class|
+------------+-----------+------------+-----------+-----+
|         5.1|        3.5|         1.4|        0.2|    0|
|         4.9|        3.0|         1.4|        0.2|    0|
|         4.7|        3.2|         1.3|        0.2|    0|
|         4.6|        3.1|         1.5|        0.2|    0|
|         5.0|        3.6|         1.4|        0.2|    0|
|         5.4|        3.9|         1.7|        0.4|    0|
|         4.6|        3.4|         1.4|        0.3|    0|
|         5.0|        3.4|         1.5|        0.2|    0|
|         4.4|        2.9|         1.4|        0.2|    0|
|         4.9|        3.1|         1.5|        0.1|    0|
|         5.4|        3.7|         1.5|        0.2|    0|
|         4.8|        3.4|         1.6|        0.2|    0|
|         4.8|        3.0|         1.4|        0.1|    0|
|         4.3|        3.0|         1.1|        0.1|    0|
|         5.8|

In [7]:
input_cols = ["sepal_length","sepal_width","petal_length","petal_width"]

## create a feautre column combining
## "sepal_length","sepal_width","petal_length","petal_width" and rename "class" to "label"

In [8]:
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(outputCol="features", inputCols=input_cols)

#lpoints - labeled data.
lpoints = va.transform(iris_binary_class)\
.select("features", "class")\
.withColumnRenamed("class", "label")

In [9]:
lpoints.show(5, truncate=False)

+-----------------+-----+
|features         |label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|0    |
|[4.9,3.0,1.4,0.2]|0    |
|[4.7,3.2,1.3,0.2]|0    |
|[4.6,3.1,1.5,0.2]|0    |
|[5.0,3.6,1.4,0.2]|0    |
+-----------------+-----+
only showing top 5 rows



## Convert "feature" using MinMaxScaler
https://spark.apache.org/docs/latest/ml-features.html#minmaxscaler

In [10]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

def min_max_scaler(input_df):
    df = input_df
  
    scaler = MinMaxScaler(inputCol="features", 
                          outputCol="features_Scaled")

    mm = scaler.fit(df)

    # Normalize each feature to have unit standard deviation.
    df = mm.transform(df).drop("features")
    df = df.withColumnRenamed("features_Scaled", "features")
    return df


df = min_max_scaler(lpoints).cache()

In [11]:
df.show(truncate=False)

+-----+----------------------------------------------------------------------------------+
|label|features                                                                          |
+-----+----------------------------------------------------------------------------------+
|0    |[0.22222222222222213,0.625,0.06779661016949151,0.04166666666666667]               |
|0    |[0.1666666666666668,0.41666666666666663,0.06779661016949151,0.04166666666666667]  |
|0    |[0.11111111111111119,0.5,0.05084745762711865,0.04166666666666667]                 |
|0    |[0.08333333333333327,0.4583333333333333,0.0847457627118644,0.04166666666666667]   |
|0    |[0.19444444444444448,0.6666666666666666,0.06779661016949151,0.04166666666666667]  |
|0    |[0.30555555555555564,0.7916666666666665,0.11864406779661016,0.12500000000000003]  |
|0    |[0.08333333333333327,0.5833333333333333,0.06779661016949151,0.08333333333333333]  |
|0    |[0.19444444444444448,0.5833333333333333,0.0847457627118644,0.04166666666666667]   |

## Split the data set to training (80%)  and validation (20%) randomly, using seed value, 1.

In [12]:
splits = df.randomSplit([0.8, 0.2], 1)

#cache() : the algorithm is interative and training and data sets are going to be reused many times.
train = splits[0].cache()
valid = splits[1].cache()

## Train a model, using the training data set with regParam=0.01, maxIter=10000, fitIntercept=True.

In [13]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(regParam=0.01, maxIter=10000, fitIntercept=True)
lrmodel = lr.fit(train)

## Apply the model to the validation set, and return areaUnderROC

In [14]:
validpredicts = lrmodel.transform(valid)
validpredicts.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    0|[0.05555555555555...|[-1.2655224380680...|[0.22002470028658...|       1.0|
|    0|[0.13888888888888...|[0.88335243716583...|[0.70751644644265...|       0.0|
|    0|[0.13888888888888...|[1.78618666690854...|[0.85645911411034...|       0.0|
|    0|[0.13888888888888...|[1.70525473495273...|[0.84621978807722...|       0.0|
|    0|[0.16666666666666...|[0.60888545685682...|[0.6476865178204,...|       0.0|
|    0|[0.22222222222222...|[1.66179572899768...|[0.84047891105382...|       0.0|
|    0|[0.22222222222222...|[2.15397602005891...|[0.89603973684408...|       0.0|
|    0|[0.22222222222222...|[2.82420875959698...|[0.94397008648642...|       0.0|
|    0|[0.30555555555555...|[1.59434820606471...|[0.83122698602890...|       0.0|
|    0|[0.305555

In [15]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bceval = BinaryClassificationEvaluator()
print (bceval.getMetricName() +":" + str(bceval.evaluate(validpredicts)))

areaUnderROC:0.8166666666666667


In [16]:
ss.stop()