# Machine learning with Spark

Now that we have a brief idea of Spark and SQLContext, we are ready to build our first Machine learning program.
we will proceed as follow:

    Step 1) Basic operation with PySpark
    Step 2) Data preprocessing
    Step 3) Build a data processing pipeline
    Step 4) Build the classifier
    Step 5) Train and evaluate the model
    Step 6) Tune the hyperparameter

In [1]:
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.context import SparkContext 
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()#create spark session 
sc = spark.sparkContext#create sparkContext
from pyspark.sql.types import  (StructType, 
                                StructField, 
                                DateType, 
                                BooleanType,
                                DoubleType,
                                IntegerType,
                                StringType,
                                DecimalType,
                                LongType,
                                ArrayType,
                                TimestampType)
sample_schema = StructType([StructField("id",  IntegerType(), True),
                            StructField("timeAtServer", DoubleType(), True),
                            StructField("aircraft",  IntegerType(), True ),
                            StructField("latitude", DoubleType(), True),
                            StructField("longitude", DoubleType(), True  ),
                            StructField("baroAltitude", DoubleType(), True ),
                            StructField("geoAltitude", DoubleType(), True ),
                            StructField("numM",IntegerType(), True),
                            StructField("measurements", StringType(), True)           
                            ])
sample_aircarft = spark.read.csv("C:/PFE/TEST/training_1_category_1.csv",
                       header = True, 
                        schema = sample_schema)
#filter only right data :
sample_aircarft_filtred=sample_aircarft.filter((sample_aircarft.latitude !=0)
                                               |(sample_aircarft.longitude != 0)
                                               | (sample_aircarft.baroAltitude != 0)
                                               | (sample_aircarft.geoAltitude!= 0)
                                               | (sample_aircarft.geoAltitude!= 'null')
                                               | (sample_aircarft.measurements != 'NA'))
sample_aircarft =sample_aircarft.orderBy('aircraft','timeAtServer')
sensors_schema = StructType([StructField("serial", LongType(), True),
                            StructField("latitudes", DoubleType(), True ),
                            StructField("longitudes",DoubleType(), True),
                            StructField("height",DoubleType(), True),
                            StructField("type", StringType(), True)
                            ])
sensors = spark.read.csv("C:/PFE/TEST/sensors.csv",
                       header = True, 
                        schema = sensors_schema)
sample_aircarft.na.drop(how = 'all')
sample_aircarft.dropDuplicates()
sensors.na.drop(how = 'all')
sensors.dropDuplicates()
sensors_filtred=sensors.filter((sensors.latitudes !=0) & (sensors.longitudes != 0))
#let's check our work !
sample_aircarft_filtred.filter((sample_aircarft_filtred["latitude"] == "") | sample_aircarft_filtred["latitude"].isNull() | isnan(sample_aircarft_filtred["latitude"])).count()
#Amazing  we have 0 null , nan  values !!!! #eleminating null data 
sample_aircarft.na.drop(how = 'all')
sample_aircarft.dropDuplicates()
from pyspark.sql.functions import *
sample_aircarft_filtred = sample_aircarft_filtred.withColumn("ArrayOfString",
                                                             split(col("measurements"), "\],\s*\[")
                                                             .cast(ArrayType(StringType()))
                                                             .alias("ArrayOfString"))                                           
sample_aircarft_filtred = sample_aircarft_filtred.withColumn("sensors_informations",
                                                             explode_outer('ArrayOfString'))
sample_aircarft_filtred=sample_aircarft_filtred.withColumn("sensors_informations", 
                                                           regexp_replace(col("sensors_informations"), "[\\[\\]]", ""))
sample_aircarft_filtred=sample_aircarft_filtred.withColumn("sensors_informations",
                                                           split(col("sensors_informations"), ",\s*")
                                                           .cast(ArrayType(IntegerType())).alias("sensors informations"))
sample_aircarft_filtred = sample_aircarft_filtred.withColumn("serial_F",
                                                             sample_aircarft_filtred["sensors_informations"]
                                                             .getItem(0).cast(DoubleType()))
sample_aircarft_filtred = sample_aircarft_filtred.withColumn("timestamp",
                                                             sample_aircarft_filtred["sensors_informations"]
                                                             .getItem(1).cast(LongType()))
sample_aircarft_filtred= sample_aircarft_filtred.withColumn("signalstrength",
                                                            sample_aircarft_filtred["sensors_informations"]
                                                            .getItem(2).cast(DoubleType()))
#==>Data integration: Using multiple databases
#spatial join sesnors data + aircrafts data , i used inner join 
joined_aircarft_sensors=sample_aircarft_filtred.join(sensors_filtred,
                                                     sample_aircarft_filtred.serial_F == sensors_filtred.serial,
                                                     'inner')

# regourp our sensors informations from joined data in the same row  : 

In [2]:
joined_aircarft_sensors.select('aircraft',"timeAtServer",'baroAltitude','serial_F',
                               'latitudes','longitudes','signalstrength',
                               'timestamp','height','latitude','longitude').show(3, False)

+--------+------------+------------+--------+---------+----------+--------------+---------+-------+----------------+----------------+
|aircraft|timeAtServer|baroAltitude|serial_F|latitudes|longitudes|signalstrength|timestamp|height |latitude        |longitude       |
+--------+------------+------------+--------+---------+----------+--------------+---------+-------+----------------+----------------+
|1787    |0.0         |6400.8      |463.0   |49.471601|7.696532  |4.0           |963309455|273.985|49.5238952636719|7.80282271535773|
|1787    |0.0         |6400.8      |424.0   |49.42498 |7.75332   |27.0          |963315122|277.015|49.5238952636719|7.80282271535773|
|1787    |0.0         |6400.8      |412.0   |49.287572|7.603982  |0.0           |963373222|410.652|49.5238952636719|7.80282271535773|
+--------+------------+------------+--------+---------+----------+--------------+---------+-------+----------------+----------------+
only showing top 3 rows



### 1-method using 'ARRAY' function:

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
joined_aircarft_sensors =joined_aircarft_sensors.orderBy('aircraft','timeAtServer')
columns = [F.col("serial_F"),F.col("latitudes"),
           F.col("longitudes"),F.col("signalstrength"),
            F.col("height")] 
Grouped_aircarft_sensors = joined_aircarft_sensors.withColumn('sensor_infos',F.array(columns))

In [4]:
Grouped_aircarft_sensors.filter('aircraft=1787').select('aircraft','timeAtServer','baroAltitude',
                                'sensor_infos','latitude','longitude').show(3, False)

+--------+------------+------------+------------------------------------------+----------------+----------------+
|aircraft|timeAtServer|baroAltitude|sensor_infos                              |latitude        |longitude       |
+--------+------------+------------+------------------------------------------+----------------+----------------+
|1787    |0.0         |6400.8      |[463.0, 49.471601, 7.696532, 4.0, 273.985]|49.5238952636719|7.80282271535773|
|1787    |0.0         |6400.8      |[412.0, 49.287572, 7.603982, 0.0, 410.652]|49.5238952636719|7.80282271535773|
|1787    |0.0         |6400.8      |[424.0, 49.42498, 7.75332, 27.0, 277.015] |49.5238952636719|7.80282271535773|
+--------+------------+------------+------------------------------------------+----------------+----------------+
only showing top 3 rows



### 2-Group all sensors informations from all stations using 'collect_list':

In [5]:
Grouped_aircarft_sensors=Grouped_aircarft_sensors.orderBy('aircraft','timeAtServer')
my_window = Window.partitionBy('aircraft','timeAtServer').orderBy("timeAtServer")
Grouped_aircarft_sensors=Grouped_aircarft_sensors.withColumn('All_sensor_infos', 
                                                            collect_list('sensor_infos'              
                                                            ).over(my_window))

In [6]:
Grouped_aircarft_sensors.filter('aircraft=1787').select('baroAltitude','All_sensor_infos','latitude','longitude').show(3, False)

+------------+-----------------------------------------------------------------------------------------------------------------------------------+----------------+----------------+
|baroAltitude|All_sensor_infos                                                                                                                   |latitude        |longitude       |
+------------+-----------------------------------------------------------------------------------------------------------------------------------+----------------+----------------+
|6400.8      |[[463.0, 49.471601, 7.696532, 4.0, 273.985], [424.0, 49.42498, 7.75332, 27.0, 277.015], [412.0, 49.287572, 7.603982, 0.0, 410.652]]|49.5238952636719|7.80282271535773|
|6400.8      |[[463.0, 49.471601, 7.696532, 4.0, 273.985], [424.0, 49.42498, 7.75332, 27.0, 277.015], [412.0, 49.287572, 7.603982, 0.0, 410.652]]|49.5238952636719|7.80282271535773|
|6400.8      |[[463.0, 49.471601, 7.696532, 4.0, 273.985], [424.0, 49.42498, 7.75332, 27.0, 277

### 3- Removing duplicates row: 

In [7]:
Grouped_aircarft_sensors=Grouped_aircarft_sensors.dropDuplicates(['aircraft', 'timeAtServer'])

In [8]:
Grouped_aircarft_sensors.filter('aircraft=1787').select('baroAltitude','All_sensor_infos','latitude','longitude').show(3, False)

+------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+----------------+
|baroAltitude|All_sensor_infos                                                                                                                                                                                                                                                                                                              |latitude        |longitude       |
+------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [9]:
import geohash2
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd
from math import radians, cos, sin, asin, sqrt
from geopy.distance import geodesic
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
#to create geohash positions we  need float num (lat, long)
Grouped_aircarft_sensors.baroAltitude.cast(DoubleType())
Grouped_aircarft_sensors.latitude.cast(DoubleType()) 
Grouped_aircarft_sensors.longitude.cast(DoubleType()) 

Column<b'CAST(longitude AS DOUBLE)'>

In [10]:
#to create geohash positions we  need float num (lat, long)
#we will create geohash with precision of  4 bit , error +20 km,-20km
def eval_results(x,y):
    try:
        return geohash2.encode(x,y,precision=4)
    except:
        return (None)
udf1 = F.udf(eval_results,StringType())
udf2= F.udf(lambda x,y,z,t: geodesic((x,y),(z,t)))

import numpy as np
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
Grouped_aircarft_sensors=Grouped_aircarft_sensors.withColumn('infos_Flatten',
                                                             flatten(Grouped_aircarft_sensors.All_sensor_infos).alias('infos_Flatten'))
# will pad our sequaences of data , max =13 * 5 =65 senors in our data 
pad_fix_length = F.udf(
    lambda arr: arr[:65] + [0] * (65 - len(arr[:65])), 
    ArrayType(DoubleType())
)

Grouped_aircarft_sensors=Grouped_aircarft_sensors.withColumn('infos_Flatten_pad',
                                                             pad_fix_length(Grouped_aircarft_sensors.infos_Flatten).alias('infos_Flatten_pad'))

# last thing convert our array to vectir 
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = F.udf(lambda l: Vectors.dense(l), VectorUDT())
Grouped_aircarft_sensors=Grouped_aircarft_sensors.withColumn('VectorUDT', list_to_vector_udf('infos_Flatten_pad').alias("VectorUDT"))
Grouped_aircarft_sensors=Grouped_aircarft_sensors.withColumn('Geohash_aircraft', udf1('latitude','longitude').cast(StringType()))

In [11]:
Grouped_aircarft_sensors.printSchema()

root
 |-- id: integer (nullable = true)
 |-- timeAtServer: double (nullable = true)
 |-- aircraft: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- baroAltitude: double (nullable = true)
 |-- geoAltitude: double (nullable = true)
 |-- numM: integer (nullable = true)
 |-- measurements: string (nullable = true)
 |-- ArrayOfString: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- sensors_informations: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- serial_F: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- signalstrength: double (nullable = true)
 |-- serial: long (nullable = true)
 |-- latitudes: double (nullable = true)
 |-- longitudes: double (nullable = true)
 |-- height: double (nullable = true)
 |-- type: string (nullable = true)
 |-- sensor_infos: array (nullable = false)
 |    |-- element: double (containsNull = true)
 |-- All_sensor_infos: a

# Classification using Geoash_aircraft class:
   ### Feature Engineering in pyspark 
The most commonly used data pre-processing techniques in approaches in Spark are as follows

 1) VectorAssembler

 2)Bucketing

 3)Scaling and normalization

 4) Working with categorical features

 5) Text data transformers

 6) Feature Manipulation

 7) PCA

In [12]:
# Create features colmun
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["baroAltitude",'VectorUDT'],outputCol="featuresc")
assembler=assembler.transform(Grouped_aircarft_sensors)
assembler.show(3)

+---+-----------------+--------+----------------+----------------+------------+-----------+----+--------------------+--------------------+--------------------+--------+----------+--------------+------+---------+----------+---------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+
| id|     timeAtServer|aircraft|        latitude|       longitude|baroAltitude|geoAltitude|numM|        measurements|       ArrayOfString|sensors_informations|serial_F| timestamp|signalstrength|serial|latitudes|longitudes|   height|     type|        sensor_infos|    All_sensor_infos|       infos_Flatten|   infos_Flatten_pad|           VectorUDT|Geohash_aircraft|           featuresc|
+---+-----------------+--------+----------------+----------------+------------+-----------+----+--------------------+--------------------+--------------------+--------+----------+--------------+------+---------+----------+------

In [13]:
#Take a sample :
assembler=assembler.sample(False,0.4)
assembler=assembler.select('featuresc', 'Geohash_aircraft')
assembler.printSchema()

root
 |-- featuresc: vector (nullable = true)
 |-- Geohash_aircraft: string (nullable = true)



In [15]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
# I provide two other ways to build the features and labels
# method 1 (good for small feature):
def transData_A(row):
    return Row(label=row['Geohash_aircraft'],features=Vectors.dense(row['baroAltitude'],row['VectorUDT']))
# Method 2 (good for large features):
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[0]),r[-1]]).toDF(['featuresc','label'])
transformed= transData(assembler)
transformed.show(3)

+--------------------+-----+
|           featuresc|label|
+--------------------+-----+
|[2400.3,440.0,50....| u0vg|
|[2407.92,101.0,50...| u0vg|
|[2438.4,440.0,50....| u0vg|
+--------------------+-----+
only showing top 3 rows



### General Pipeline to deal with CategoricalCols:
Note: You are strongly encouraged to try my **get_dummy** function for dealing with the categorical data
in complex dataset.

In [16]:
def get_dummy(df,categoricalCols,continuousCols,labelCol):
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder,VectorAssembler
    from pyspark.sql.functions import col
    indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in categoricalCols ]
     # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers ]
    
    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]+ continuousCols, outputCol="features")
    
    pipeline = Pipeline(stages=indexers + encoders + [assembler])
    model=pipeline.fit(df)
    data = model.transform(df)
    data = data.withColumn('label',col(labelCol))
    return data.select('features','label')

catcols = []
num_cols = ['featuresc']
labelCol ='label'
data = get_dummy(transformed,catcols,num_cols,labelCol)
data.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(66,[0,1,2,3,4,5,...| u0vg|
|(66,[0,1,2,3,4,5,...| u0vg|
|(66,[0,1,2,3,5,6,...| u0vg|
+--------------------+-----+
only showing top 3 rows



### Deal with Categorical Label and Variables

In [17]:
from pyspark.ml.feature import StringIndexer
# Index labels, adding metadata to the label column
#filter only right data :
data=data.select('features','label').dropna()
data=data.filter((data.label !=0)
                            & (data.label!= 'null')
                            & (data.label!= '')
                            & (data.label != 'na')
                            & (data.label != None)
                             )
data=data.dropna()
data=data.na.drop()
labelIndexer = StringIndexer(inputCol='label',outputCol='indexedLabel', handleInvalid="skip")#options are "keep", "error" or "skip")
labelIndexer.fit(data).transform(data).show(3, True)

+--------------------+-----+------------+
|            features|label|indexedLabel|
+--------------------+-----+------------+
|(66,[0,1,2,3,4,5,...| u0vg|        33.0|
|(66,[0,1,2,3,4,5,...| u0vg|        33.0|
|(66,[0,1,2,3,5,6,...| u0vg|        33.0|
+--------------------+-----+------------+
only showing top 3 rows



# Get all lables in a list 

In [38]:
labels = labelIndexer.fit(data).labels
labels [:3]

['u0yh', 'u0qj', 'u10j']

### Split the data to training and test data sets

In [39]:
data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: string (nullable = true)



In [40]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.8, 0.2])
trainingData.show(3,False)
testData.show(3,False)

+-------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                           |label|
+-------------------------------------------------------------------------------------------------------------------+-----+
|(66,[0,1,2,3,4,5,6,7,8,9,10],[228.6,101.0,50.048584,8.487752,43.0,92.620644,440.0,50.048573,8.487899,34.0,82.381]) |u0yh |
|(66,[0,1,2,3,4,5,6,7,8,9,10],[251.46,101.0,50.048584,8.487752,51.0,92.620644,440.0,50.048573,8.487899,26.0,82.381])|u0yh |
|(66,[0,1,2,3,4,5,6,7,8,9,10],[259.08,101.0,50.048584,8.487752,50.0,92.620644,440.0,50.048573,8.487899,27.0,82.381])|u0yh |
+-------------------------------------------------------------------------------------------------------------------+-----+
only showing top 3 rows

+----------------------------------------------------------------------------------------------------------

### NaiveBayes



###  Pipeline Architecture:
  ###### Preporcessing Data+Model +label converter

In [41]:
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import MinMaxScaler
labelIndexer = StringIndexer(inputCol='label',outputCol='indexedLabel', handleInvalid="skip")

# Let us create an object of MinMaxScaler class
MinMaxScaler=MinMaxScaler().setInputCol("features").setOutputCol("Scaled_features")

from pyspark.ml.classification import GBTClassifier

from pyspark.ml.classification import NaiveBayes

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
# Train a GBT model.

nb =NaiveBayes(labelCol="indexedLabel", featuresCol="Scaled_features")
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=labels)
# Chain indexers and tree in a Pipeline
pipeline2 = Pipeline(stages=[ labelIndexer,MinMaxScaler ,nb,labelConverter])
# Train model
model2 = pipeline2.fit(trainingData)

### Make predictions

In [42]:
# Make predictions.
predictions = model2.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)

+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|(66,[0,1,2,3,4,5,...| u0yh|          u0yh|
|(66,[0,1,2,3,4,5,...| u0yh|          u0yh|
|(66,[0,1,2,3,4,5,...| u0yh|          u0yh|
|(66,[0,1,2,3,4,5,...| u0yh|          u0yh|
|(66,[0,1,2,3,4,5,...| u0yh|          u0yh|
+--------------------+-----+--------------+
only showing top 5 rows



### Evaluation

In [44]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))


Test Error = 0.979011


In [47]:
print("Accuracy = %g" % (accuracy))

Accuracy = 0.0209893


In [48]:
rfModel = model2.stages[-2]
print(rfModel) # summary only

NaiveBayes_b25a191d6a55


In [49]:
spark.stop()