In [1]:
 

import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
#sc =SparkContext()

spark = SparkSession.builder.appName('ML').getOrCreate()
sc = spark.sparkContext


In [2]:
from pyspark.sql import SQLContext
 
 
df = spark.read.csv("data-behaviour.csv", header=True, inferSchema= True)
 


In [3]:
df.show(5, truncate = False)
df.printSchema()

+---+---+-------------+-------------+--------------+----------------+------------+-------------+---------------+--------+------+-------------+-------------+------------+-------+-------+
|x  |age|feeding_env  |genetic_score|diet          |owner_evaluation|diet_formula|behaviour    |group_behaviour|race    |sex   |health_eval_1|health_eval_2|feeding_time|farm   |weight |
+---+---+-------------+-------------+--------------+----------------+------------+-------------+---------------+--------+------+-------------+-------------+------------+-------+-------+
|1  |25 |bio-protocol3|226802       |prot++fib+min+|7               |fomula2     |stressed--   |social+        |Gascon2 |Male  |0            |0            |40          |farm408|<=100kg|
|2  |38 |bio-protocol3|89814        |prot++fib++   |9               |fomula7     |active+      |social---      |Chinese1|Male  |0            |0            |50          |farm408|<=100kg|
|3  |28 |mixed        |336951       |carb-         |12              |f

In [4]:

 
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

# List of continuous features
CONTI_FEATURES  = ['age','health_eval_1', 'owner_evaluation', 'health_eval_2', 'feeding_time']
# Convert the type
df = convertColumn(df, CONTI_FEATURES, FloatType())
# Check the dataset
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- feeding_env: string (nullable = true)
 |-- genetic_score: integer (nullable = true)
 |-- diet: string (nullable = true)
 |-- owner_evaluation: float (nullable = true)
 |-- diet_formula: string (nullable = true)
 |-- behaviour: string (nullable = true)
 |-- group_behaviour: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- health_eval_1: float (nullable = true)
 |-- health_eval_2: float (nullable = true)
 |-- feeding_time: float (nullable = true)
 |-- farm: string (nullable = true)
 |-- weight: string (nullable = true)



In [5]:
#Select columns
df.select('age','genetic_score').show(5)			

+----+-------------+
| age|genetic_score|
+----+-------------+
|25.0|       226802|
|38.0|        89814|
|28.0|       336951|
|44.0|       160323|
|18.0|       103497|
+----+-------------+
only showing top 5 rows



In [6]:
#Count by group
df.groupBy("diet").count().sort("count",ascending=True).show()			

+--------------+-----+
|          diet|count|
+--------------+-----+
|        prot--|   83|
|    prot++min+|  247|
|        carb++|  509|
|          met+|  594|
|   prot++min--|  657|
|         vitA+|  834|
|   prot++lip--|  955|
|         prot+| 1389|
|         carb-| 1601|
|prot++fib+min+| 1812|
|  prot++carb--| 2061|
|         lip++| 2657|
|        carb--| 8025|
|     met+cyst+|10878|
|   prot++fib++|16540|
+--------------+-----+



In [7]:
#Describe the data
df.describe().show()

+-------+------------------+------------------+------------------+------------------+------+------------------+------------+----------+---------------+--------+------+------------------+------------------+------------------+-----+-------+
|summary|                 x|               age|       feeding_env|     genetic_score|  diet|  owner_evaluation|diet_formula| behaviour|group_behaviour|    race|   sex|     health_eval_1|     health_eval_2|      feeding_time| farm| weight|
+-------+------------------+------------------+------------------+------------------+------+------------------+------------+----------+---------------+--------+------+------------------+------------------+------------------+-----+-------+
|  count|             48842|             48842|             48842|             48842| 48842|             48842|       48842|     48842|          48842|   48842| 48842|             48842|             48842|             48842|48842|  48842|
|   mean|           24421.5| 38.643585438761

In [9]:
df.describe('health_eval_1').show()

+-------+------------------+
|summary|      health_eval_1|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655418|
|    min|               0.0|
|    max|           99999.0|
+-------+------------------+



In [9]:
#Crosstab computation
#In some occasion, it can be interesting to see the descriptive statistics between two pairwise columns. For instance, you can count the number of people with weight below or above 50k by diet level.
#This operation is called a crosstab. 
df.crosstab('age', 'sex').sort("age_sex").show()			

+-------+------+----+
|age_sex|Female|Male|
+-------+------+----+
|   17.0|   295| 300|
|   18.0|   427| 435|
|   19.0|   514| 539|
|   20.0|   535| 578|
|   21.0|   502| 594|
|   22.0|   521| 657|
|   23.0|   547| 782|
|   24.0|   478| 728|
|   25.0|   466| 729|
|   26.0|   422| 731|
|   27.0|   429| 803|
|   28.0|   446| 834|
|   29.0|   420| 803|
|   30.0|   406| 872|
|   31.0|   426| 899|
|   32.0|   373| 880|
|   33.0|   379| 956|
|   34.0|   402| 901|
|   35.0|   366| 971|
|   36.0|   423| 925|
+-------+------+----+
only showing top 20 rows



In [10]:
df.drop('owner_evaluation').columns

['x',
 'age',
 'feeding_env',
 'genetic_score',
 'diet',
 'diet_formula',
 'behaviour',
 'group_behaviour',
 'race',
 'sex',
 'health_eval_1',
 'health_eval_2',
 'feeding_time',
 'farm',
 'weight']

In [11]:
#Filter data
df.filter(df.age > 40).count()

20211

In [13]:
#Step 2) Data preprocessing
#Add age square
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age-square", col("age")**2)

df.printSchema()
 

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- feeding_env: string (nullable = true)
 |-- genetic_score: integer (nullable = true)
 |-- diet: string (nullable = true)
 |-- owner_evaluation: float (nullable = true)
 |-- diet_formula: string (nullable = true)
 |-- behaviour: string (nullable = true)
 |-- group_behaviour: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- health_eval_1: float (nullable = true)
 |-- health_eval_2: float (nullable = true)
 |-- feeding_time: float (nullable = true)
 |-- farm: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- age-square: double (nullable = true)



In [15]:
COLUMNS = ['age', 'age-square', 'feeding_env', 'genetic_score', 'diet', 'owner_evaluation', 'diet_formula',
           'behaviour', 'group_behaviour', 'race', 'sex', 'health_eval_1', 'health_eval_2',
           'feeding_time', 'farm','weight']
df = df.select(COLUMNS)
df.first()


Row(age=25.0, age-square=625.0, feeding_env='bio-protocol3', genetic_score=226802, diet='prot++fib+min+', owner_evaluation=7.0, diet_formula='fomula2', behaviour='stressed--', group_behaviour='social+', race='Gascon2', sex='Male', health_eval_1=0.0, health_eval_2=0.0, feeding_time=40.0, farm='farm408', weight='<=100kg')

In [16]:
df.groupby('farm').agg({'farm': 'count'}).sort(asc("count(farm)")).show()
from pyspark.sql.functions import col

print(df.filter(col('farm') == 'farm12').count())
print(df.filter(col('farm') != 'farm12').count())

df_remove = df.filter(col('farm') !=	'farm12')
df_remove.groupby('farm').agg({'farm': 'count'}).sort(asc("count(farm)")).show()



+--------------------+-----------+
|                farm|count(farm)|
+--------------------+-----------+
|            farm3267|         21|
|Outlying-US(Guam-...|         23|
|             farm328|         30|
|             farm327|         37|
|              farm40|         51|
|               farm5|         65|
|             farm323|         67|
|              farm33|         68|
|             farm321|         75|
|            farm3264|         87|
|             farm402|        103|
|             farm322|        106|
|              farm39|        108|
|             farm409|        116|
|               farm9|        122|
|             farm326|        126|
|              farm19|        155|
|             farm400|        184|
|             farm325|        242|
|              farm36|        250|
+--------------------+-----------+
only showing top 20 rows

0
48842
+--------------------+-----------+
|                farm|count(farm)|
+--------------------+-----------+
|            farm3267

In [17]:
#Step 3) Build a data processing pipeline
#First of all, you select the string column to index.
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
stringIndexer = StringIndexer(inputCol="feeding_env", outputCol="feeding_env_encoded")

In [18]:
#Fit the data and transform it
model = stringIndexer.fit(df) 
indexed = model.transform(df)
 
encoder = OneHotEncoder(dropLast=False, inputCol="feeding_env_encoded", outputCol="workclass_vec")
encoder.setDropLast(False)
ohe = encoder.fit(indexed)
encoded = ohe.transform(indexed)
encoded.show(2)

+----+----------+-------------+-------------+--------------+----------------+------------+----------+---------------+--------+----+-------------+-------------+------------+-------+-------+-------------------+-------------+
| age|age-square|  feeding_env|genetic_score|          diet|owner_evaluation|diet_formula| behaviour|group_behaviour|    race| sex|health_eval_1|health_eval_2|feeding_time|   farm| weight|feeding_env_encoded|workclass_vec|
+----+----------+-------------+-------------+--------------+----------------+------------+----------+---------------+--------+----+-------------+-------------+------------+-------+-------+-------------------+-------------+
|25.0|     625.0|bio-protocol3|       226802|prot++fib+min+|             7.0|     fomula2|stressed--|        social+| Gascon2|Male|          0.0|          0.0|        40.0|farm408|<=100kg|                0.0|(9,[0],[1.0])|
|38.0|    1444.0|bio-protocol3|        89814|   prot++fib++|             9.0|     fomula7|   active+|      s

In [19]:
#Build the pipeline
#Encode the categorical data
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
CATE_FEATURES = ['feeding_env', 'diet', 'diet_formula', 'behaviour', 'group_behaviour', 'race', 'sex', 'farm','weight']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    
    

In [20]:
#2. Index the label feature 
# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="weight", outputCol="newlabel")
stages += [label_stringIdx]


In [21]:
#3. Add continuous variable 
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

In [22]:
#4. Assemble the steps.
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]			

In [23]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)


In [24]:
model.take(1)

[Row(age=25.0, age-square=625.0, feeding_env='bio-protocol3', genetic_score=226802, diet='prot++fib+min+', owner_evaluation=7.0, diet_formula='fomula2', behaviour='stressed--', group_behaviour='social+', race='Gascon2', sex='Male', health_eval_1=0.0, health_eval_2=0.0, feeding_time=40.0, farm='farm408', weight='<=100kg', feeding_envIndex=0.0, feeding_envclassVec=SparseVector(8, {0: 1.0}), dietIndex=5.0, dietclassVec=SparseVector(14, {5: 1.0}), diet_formulaIndex=1.0, diet_formulaclassVec=SparseVector(6, {1: 1.0}), behaviourIndex=6.0, behaviourclassVec=SparseVector(14, {6: 1.0}), group_behaviourIndex=2.0, group_behaviourclassVec=SparseVector(5, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(4, {1: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), farmIndex=0.0, farmclassVec=SparseVector(25, {0: 1.0}), weightIndex=0.0, weightclassVec=SparseVector(1, {0: 1.0}), newlabel=0.0, features=SparseVector(83, {0: 1.0, 13: 1.0, 23: 1.0, 34: 1.0, 44: 1.0, 48: 1.0, 51: 1.0, 52: 1.0, 77:

In [27]:
#Step 4) Build the classifier: logistic

# the computation faster, you convert model to a DataFrame.

#You need to select newlabel and features from model using map.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

#You are ready to create the train data as a DataFrame. You use the sqlContext
sqlContext = SQLContext(sc)
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])			

#Check the second row

df_train.show(20)			


+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  1.0|[0.0,0.0,1.0,0.0,...|
|  1.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,1.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,1.0,...|
|  1.0|[0.0,1.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  1.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,1.0,...|
|  1.0|[1.0,0.0,0.0,0.0,...|
|  1.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
|  1.0|[1.0,0.0,0.0,0.0,...|
+-----+--------------------+
only showing top 20 rows



In [29]:
#Create a train/test set

#You split the dataset 80/20 with randomSplit.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

#Let's count how many animals with weight below/above 50k in both training and test set
print("-------")
train_data.groupby('label').agg({'label': 'count'}).show()

print("-------")

test_data.groupby('label').agg({'label': 'count'}).show()			

-------
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       29695|
|  1.0|        9330|
+-----+------------+

-------
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        7460|
|  1.0|        2357|
+-----+------------+



In [30]:
#Build the logistic regressor

 
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

#You can see the coefficients from the regression

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

 


Coefficients: [-0.031721754703633705,-0.046050456282018926,0.016849281639062078,-0.12673294214055283,-0.027771853561445556,0.20791567156831212,0.15335028817708612,-0.23327172280506156,-0.11810799681898179,-0.03995810640274398,0.16362133114249713,0.2711573185369212,0.0034731472384288927,-0.1744675521134223,0.016435815893512465,-0.18270790429639847,-0.2040479195348422,0.36998449004987444,-0.1460989894403246,0.3640855774700959,-0.16705498370284466,-0.1911950566602864,0.29254408683280525,-0.24492727724865668,-0.12678671546460077,-0.1516469134206673,-0.11795523739266717,-0.1149863547396856,0.1810848196865257,-0.022237968325231953,0.2307301458294434,-0.07721130303743942,0.03698545739001802,-0.19997837897089632,-0.1237675033916426,-0.12715680051501446,-0.05123639541641801,-0.1765412210319142,-0.19964982666293882,0.09416038266192625,0.08255669206077007,-0.19442141068754945,0.24806764880550516,-0.13784042657078444,-0.22414949188760735,-0.1725646037904694,0.2996613832516706,0.03496100776428215,-

In [31]:
#Step 5) Train and evaluate the model

 

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

 

predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [32]:
selected = predictions.select("label", "prediction", "probability")
selected.show(20)			



+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.95137424981967...|
|  0.0|       0.0|[0.95951459760685...|
|  0.0|       0.0|[0.86601705569820...|
|  0.0|       0.0|[0.56853336294609...|
|  0.0|       0.0|[0.60611001210972...|
|  0.0|       0.0|[0.93937565220683...|
|  0.0|       0.0|[0.94765619611021...|
|  0.0|       0.0|[0.60026513504351...|
|  0.0|       0.0|[0.90130382345788...|
|  0.0|       0.0|[0.90829619609276...|
|  0.0|       0.0|[0.83975005171014...|
|  0.0|       0.0|[0.90288325974483...|
|  0.0|       0.0|[0.85963065749982...|
|  0.0|       0.0|[0.89592839513254...|
|  0.0|       0.0|[0.88843828485160...|
|  0.0|       0.0|[0.88229165578475...|
|  0.0|       0.0|[0.91691713528470...|
|  0.0|       0.0|[0.90407934072028...|
|  0.0|       0.0|[0.88839898695408...|
|  0.0|       0.0|[0.72651835008274...|
+-----+----------+--------------------+
only showing top 20 rows



In [33]:
#Evaluate the model
'''
You need to look at the accuracy metric to see how well (or bad) the model performs.
Currently, there is no API to compute the accuracy measure in Spark. 
The default value is the ROC, receiver operating characteristic curve. 
It is a different metrics that take into account the false positive rate.

Before you look at the ROC, let's construct the accuracy measure. You are more familiar with this metric. 
The accuracy measure is the sum of the correct prediction over the total number of observations. 
'''

#You create a DataFrame with the label and the `prediction.

cm = predictions.select("label", "prediction")			

#You can check the number of class in the label and the prediction
print("cm.groupby('label').agg({'label': 'count'}).show()")
cm.groupby('label').agg({'label': 'count'}).show()			

print("cm.groupby('prediction').agg({'prediction': 'count'}).show()")
cm.groupby('prediction').agg({'prediction': 'count'}).show()

cm.groupby('label').agg({'label': 'count'}).show()
+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        7460|
|  1.0|        2357|
+-----+------------+

cm.groupby('prediction').agg({'prediction': 'count'}).show()
+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             7740|
|       1.0|             2077|
+----------+-----------------+



In [34]:
"""
For instance, in the test set, there is 2393 animals with an weight above 100kg and 7423 below. The classifier,
however, predicted 2109 animals with weight above 50k.

You can compute the accuracy by computing the count when the label are correctly classified over the total number of rows.
"""
cm.filter(cm.label == cm.prediction).count() / cm.count()			

0.9714780482835897

In [32]:
#You can wrap everything together and write a function to compute the accuracy. 
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
 

Model accuracy: 97.086%


In [35]:
"""

ROC metrics

The module BinaryClassificationEvaluator includes the ROC measures. 
The Receiver Operating Characteristic curve is another common tool used 
with binary classification. It is very similar to the precision/recall curve,
but instead of plotting precision versus recall, the ROC curve shows
the true positive rate (i.e. recall) against the false positive rate. 
The false positive rate is the ratio of negative instances that are incorrectly
classified as positive. It is equal to one minus the true negative rate.
The true negative rate is also called specificity.
Hence the ROC curve plots sensitivity (recall) versus 1 - specificity 
"""

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())
print(evaluator.evaluate(predictions))


0.9995942438301971
areaUnderROC
0.9995942438301971


In [None]:
#Step 6) Tune the hyperparameter
"""
Last but not least, you can tune the hyperparameters. Similar to scikit learn you create a parameter grid, and you add the parameters you want to tune.

To reduce the time of the computation, you only tune the regularization parameter with only two values.
"""
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

#Finally, you evaluate the model with using the cross valiation method with 5 folds. It takes alot of time to train.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)



In [36]:

# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

#Time to train model: 978.807 seconds

#The best regularization hyperparameter is 0.01, with an accuracy of 85.316 percent.

print("Model accuracy: ")

accuracy_m(model = cvModel)

#You can extract the recommended parameter by chaining cvModel.bestModel with extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()




Time to train model: 1139.828 seconds
Model accuracy: 
Model accuracy: 99.990%


{Param(parent='LogisticRegression_9416b3699939', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_9416b3699939', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_9416b3699939', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_9416b3699939', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_9416b3699939', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_9416b3699939', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LogisticRegression_9416b3699939', name='maxBlockSizeInMB', doc='maximum memory in MB for s