# Trucking Demo with IBM DSX

Task involve Data Ingestion, Data Formatting, Exploratory Analysis and Model Building. Data Science involves a typical sequence of tasks: 
- aquiring data,
- cleaning it, 
- analyzing it for relationships, and
- building a model. 

**Note:** Here we are submitting spark jobs locally.

## First: Inpect the data on HDFS in remote HDP cluster.

In [1]:
# events data
!curl -i -L "http://172.26.232.240:50070/webhdfs/v1/tmp/enrichedEvents?op=OPEN" | tail -n 10

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 63570  100 63570    0     0   743k      0 --:--:-- --:--:-- --:--:--  743k
Overspeed,"Y","hours",45,2773,-90.07,35.68,0,1,1
Lane Departure,"Y","hours",45,2773,-90.04,35.19,1,1,0
Normal,"Y","hours",45,2773,-90.68,35.12,1,0,0
Normal,"Y","hours",45,2773,-91.14,34.96,0,0,0
Normal,"Y","hours",45,2773,-91.93,34.81,0,0,0
Normal,"Y","hours",45,2773,-92.31,34.78,0,1,0
Normal,"Y","hours",45,2773,-92.09,34.8,0,0,0
Normal,"Y","hours",45,2773,-91.93,34.81,0,0,0
Normal,"Y","hours",45,2773,-90.68,35.12,0,0,0
Normal,"Y","hours",45,2773,-91.74,34.89,0,0,0


In [2]:
# training Data
!curl -i -L "http://172.26.232.240:50070/webhdfs/v1/tmp/trainingData?op=OPEN" | tail -n 10

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 33084  100 33084    0     0   489k      0 --:--:-- --:--:-- --:--:--  489k
1, 0 0 0.45 0.2773 1 1 1
1, 0 0 0.45 0.2773 1 0 0
0, 0 0 0.45 0.2773 1 0 0
0, 0 0 0.45 0.2773 0 0 0
0, 0 0 0.45 0.2773 0 1 0
0, 0 0 0.45 0.2773 0 0 0
0, 0 0 0.45 0.2773 0 0 1
0, 0 0 0.45 0.2773 0 0 1
0, 0 0 0.45 0.2773 0 1 0
0, 0 0 0.45 0.2773 0 0 0


## Second: Exploratory Data Analysis 

In [3]:
# import library and set a spark session:
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
#sc = SparkContext()
sparkSession = SparkSession(sc).builder.getOrCreate()

### Events Data

In [4]:
# Events Data : from HDFS
#eventsFile = sc.textFile("hdfs://172.26.232.240:8020/tmp/enrichedEvents") # Using this option will load data as pipelined RDD which will need transformation
eventsFile = sparkSession.read.csv("hdfs://172.26.232.240:8020/tmp/enrichedEvents", header = "false", inferSchema = "false")  # this will load it as Spark DataFrame

In [5]:
print(eventsFile.count())
type(eventsFile)

1359


pyspark.sql.dataframe.DataFrame

In [6]:
# see the data
eventsFile.show(5)

+--------------+---+-----+---+----+------+-----+---+---+---+
|           _c0|_c1|  _c2|_c3| _c4|   _c5|  _c6|_c7|_c8|_c9|
+--------------+---+-----+---+----+------+-----+---+---+---+
|        Normal|  N|miles| 70|3300|-95.01|36.73|  0|  1|  1|
|Lane Departure|  N|miles| 70|3300|-91.99|37.94|  0|  0|  0|
|Lane Departure|  N|miles| 70|3300|-92.08|37.81|  0|  1|  1|
|Lane Departure|  N|miles| 70|3300| -95.5|36.37|  1|  1|  1|
|Lane Departure|  N|miles| 70|3300|-94.23|37.09|  1|  1|  1|
+--------------+---+-----+---+----+------+-----+---+---+---+
only showing top 5 rows



In [7]:
# infer schema
eventsFile.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)



#### Assigning Names to fields

In [8]:
old_col_names = eventsFile.columns
new_col_names =['eventTyp', 'isCertified', 'paymentScheme', 'hoursDriven', 'milesDriven', 'latitude', 'longitude', 'isFoggy', 'isRainy', 'isWindy']
# Renaming the columns
eventsdata = reduce(lambda eventsFile, idx: eventsFile.withColumnRenamed(old_col_names[idx], new_col_names[idx]), range(len(old_col_names)), eventsFile)
eventsdata.printSchema()

root
 |-- eventTyp: string (nullable = true)
 |-- isCertified: string (nullable = true)
 |-- paymentScheme: string (nullable = true)
 |-- hoursDriven: string (nullable = true)
 |-- milesDriven: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- isFoggy: string (nullable = true)
 |-- isRainy: string (nullable = true)
 |-- isWindy: string (nullable = true)



#### Type conversion for Columns

In [9]:
data=eventsdata.withColumn("latitude", eventsdata["latitude"].cast("float")).withColumn("longitude", eventsdata["longitude"].cast("float")).withColumn("hoursDriven", eventsdata["hoursDriven"].cast("int")).withColumn("isFoggy", eventsdata["isFoggy"].cast("int")).withColumn("isRainy", eventsdata["isRainy"].cast("int")).withColumn("isWindy", eventsdata["isWindy"].cast("int")).withColumn("milesDriven", eventsdata["milesDriven"].cast("int"))

In [10]:
print(type(data))
# view final schema
data.printSchema()

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- eventTyp: string (nullable = true)
 |-- isCertified: string (nullable = true)
 |-- paymentScheme: string (nullable = true)
 |-- hoursDriven: integer (nullable = true)
 |-- milesDriven: integer (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- isFoggy: integer (nullable = true)
 |-- isRainy: integer (nullable = true)
 |-- isWindy: integer (nullable = true)



#### Register table for Enriched Events

In [11]:
data.registerTempTable("enrichedEvents")

In [12]:
# Viewing the data
data.show(5)

+--------------+-----------+-------------+-----------+-----------+--------+---------+-------+-------+-------+
|      eventTyp|isCertified|paymentScheme|hoursDriven|milesDriven|latitude|longitude|isFoggy|isRainy|isWindy|
+--------------+-----------+-------------+-----------+-----------+--------+---------+-------+-------+-------+
|        Normal|          N|        miles|         70|       3300|  -95.01|    36.73|      0|      1|      1|
|Lane Departure|          N|        miles|         70|       3300|  -91.99|    37.94|      0|      0|      0|
|Lane Departure|          N|        miles|         70|       3300|  -92.08|    37.81|      0|      1|      1|
|Lane Departure|          N|        miles|         70|       3300|   -95.5|    36.37|      1|      1|      1|
|Lane Departure|          N|        miles|         70|       3300|  -94.23|    37.09|      1|      1|      1|
+--------------+-----------+-------------+-----------+-----------+--------+---------+-------+-------+-------+
only showi

In [13]:
print("shape",data.count(),data.columns)

('shape', 1359, ['eventTyp', 'isCertified', 'paymentScheme', 'hoursDriven', 'milesDriven', 'latitude', 'longitude', 'isFoggy', 'isRainy', 'isWindy'])


### Training Data

In [14]:
# Training Data : from HDFS
trainingFile = sparkSession.read.csv("hdfs://172.26.232.240:8020/tmp/trainingData", header = "false", inferSchema = "false")  # this will load it as Spark DataFrame

In [15]:
print(type(trainingFile), trainingFile.count())

(<class 'pyspark.sql.dataframe.DataFrame'>, 1359)


In [16]:
# columna= ["violation", 'isCertified', 'paymentScheme', 'hoursDriven', 'milesDriven', 'isFoggy', 'isRainy', 'isWindy'])
trainingFile.show(5)

+---+-------------------+
|_c0|                _c1|
+---+-------------------+
|  0| 0 0 0.7 0.33 0 0 1|
|  1| 0 0 0.7 0.33 1 1 0|
|  1| 0 0 0.7 0.33 1 1 1|
|  1| 0 0 0.7 0.33 1 0 0|
|  1| 0 0 0.7 0.33 1 0 1|
+---+-------------------+
only showing top 5 rows



## Third: Exploratory analysis    

In [17]:
#import brunel
#df = eventsRDD1.toPandas()
# Leaving this for later time

In [18]:
#!pip list

## Fourth: building regression model for violation prediction

#### Unique truck events

In [19]:
truck_events= list(data.toPandas()['eventTyp'].unique())
truck_events

[u'Normal',
 u'Lane Departure',
 u'Overspeed',
 u'Unsafe following distance',
 u'Unsafe tail distance']

#### Transforming eventType column

*eventType -> ifViolated*

- *N - 'Normal'*

- *Y - 'Lane Departure', 'Overspeed','Unsafe following distance', 'Unsafe tail distance']*

In [20]:
# transform column eventType
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'eventTyp'
udf = UserDefinedFunction(lambda x: 'N' if x=="Normal" else 'Y', StringType())
data_transformed=data.select(*[udf(column).alias(name) if column == name else column for column in data.columns])

In [21]:
data_transformed.printSchema()

root
 |-- eventTyp: string (nullable = true)
 |-- isCertified: string (nullable = true)
 |-- paymentScheme: string (nullable = true)
 |-- hoursDriven: integer (nullable = true)
 |-- milesDriven: integer (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- isFoggy: integer (nullable = true)
 |-- isRainy: integer (nullable = true)
 |-- isWindy: integer (nullable = true)



In [22]:
violations = list(data_transformed.toPandas()['eventTyp'].unique())
violations

[u'N', u'Y']

In [23]:
data_transformed.show(5)

+--------+-----------+-------------+-----------+-----------+--------+---------+-------+-------+-------+
|eventTyp|isCertified|paymentScheme|hoursDriven|milesDriven|latitude|longitude|isFoggy|isRainy|isWindy|
+--------+-----------+-------------+-----------+-----------+--------+---------+-------+-------+-------+
|       N|          N|        miles|         70|       3300|  -95.01|    36.73|      0|      1|      1|
|       Y|          N|        miles|         70|       3300|  -91.99|    37.94|      0|      0|      0|
|       Y|          N|        miles|         70|       3300|  -92.08|    37.81|      0|      1|      1|
|       Y|          N|        miles|         70|       3300|   -95.5|    36.37|      1|      1|      1|
|       Y|          N|        miles|         70|       3300|  -94.23|    37.09|      1|      1|      1|
+--------+-----------+-------------+-----------+-----------+--------+---------+-------+-------+-------+
only showing top 5 rows



#### Building model > RandomForest Classifier

In [24]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
# Using random forest classification rather than plain logistic regression.
from pyspark.ml.classification import RandomForestClassifier

In [25]:
# Prepare string variables so that they can be used by the decision tree algorithm
# StringIndexer encodes a string column of labels to a column of label indices
SI1 = StringIndexer(inputCol='isCertified',outputCol='isCertifiedEncoded')
SI2 = StringIndexer(inputCol='paymentScheme',outputCol='paymentSchemeEncoded')

#encode the Label column
labelIndexer = StringIndexer(inputCol='eventTyp', outputCol='label').fit(data_transformed)

# Pipelines API requires that input variables are passed in  a vector
assembler = VectorAssembler(inputCols=["isCertifiedEncoded", "paymentSchemeEncoded", "hoursDriven", "milesDriven", "latitude", \
                                       "longitude", "isFoggy", "isRainy", "isWindy"], outputCol="features")

In [26]:
# instantiate the algorithm, take the default settings
rf=RandomForestClassifier(labelCol="label", featuresCol="features")

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

pipeline = Pipeline(stages=[SI1,SI2,labelIndexer, assembler, rf, labelConverter])

In [27]:
# Split data into train and test datasets
train, test = data_transformed.randomSplit([0.8,0.2], seed=6)
train.cache()

DataFrame[eventTyp: string, isCertified: string, paymentScheme: string, hoursDriven: int, milesDriven: int, latitude: float, longitude: float, isFoggy: int, isRainy: int, isWindy: int]

In [28]:
test.cache()

DataFrame[eventTyp: string, isCertified: string, paymentScheme: string, hoursDriven: int, milesDriven: int, latitude: float, longitude: float, isFoggy: int, isRainy: int, isWindy: int]

In [29]:
train.printSchema()

root
 |-- eventTyp: string (nullable = true)
 |-- isCertified: string (nullable = true)
 |-- paymentScheme: string (nullable = true)
 |-- hoursDriven: integer (nullable = true)
 |-- milesDriven: integer (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- isFoggy: integer (nullable = true)
 |-- isRainy: integer (nullable = true)
 |-- isWindy: integer (nullable = true)



In [30]:
train.show(5)

+--------+-----------+-------------+-----------+-----------+--------+---------+-------+-------+-------+
|eventTyp|isCertified|paymentScheme|hoursDriven|milesDriven|latitude|longitude|isFoggy|isRainy|isWindy|
+--------+-----------+-------------+-----------+-----------+--------+---------+-------+-------+-------+
|       N|          N|        miles|          0|          0|  -94.59|     39.1|      0|      0|      0|
|       N|          N|        miles|          0|          0|  -94.58|    37.03|      0|      0|      0|
|       N|          N|        miles|          0|          0|  -94.46|    37.16|      1|      1|      1|
|       N|          N|        miles|          0|          0|  -94.38|    38.99|      0|      0|      1|
|       N|          N|        miles|          0|          0|  -91.94|    41.71|      0|      0|      0|
+--------+-----------+-------------+-----------+-----------+--------+---------+-------+-------+-------+
only showing top 5 rows



In [31]:
# Build model. The fitted model from a Pipeline is a PipelineModel, which consists of fitted models and transformers, corresponding to the pipeline stages.
model = pipeline.fit(train)

### score test dataset

In [32]:
results = model.transform(test)

In [33]:
type(results)

pyspark.sql.dataframe.DataFrame

In [34]:
results=results.select(results["eventTyp"],results["label"],results["predictedLabel"],results["prediction"],results["probability"])
results.toPandas().head(5)

Unnamed: 0,eventTyp,label,predictedLabel,prediction,probability
0,N,0.0,N,0.0,"[0.71386659273, 0.28613340727]"
1,N,0.0,N,0.0,"[0.715791195904, 0.284208804096]"
2,N,0.0,Y,1.0,"[0.37125, 0.62875]"
3,N,0.0,N,0.0,"[0.565553100666, 0.434446899334]"
4,N,0.0,N,0.0,"[0.829561037174, 0.170438962826]"


In [35]:
results.printSchema()

root
 |-- eventTyp: string (nullable = true)
 |-- label: double (nullable = true)
 |-- predictedLabel: string (nullable = true)
 |-- prediction: double (nullable = true)
 |-- probability: vector (nullable = true)



### Model evaluation

In [36]:
print ('Precision model = {:.2f}.'.format(results.filter(results.label == results.prediction).count() / float(results.count())))

Precision model = 0.92.


In [37]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
print 'Area under ROC curve = {:.2f}.'.format(evaluator.evaluate(results))

Area under ROC curve = 0.68.


## Fifth: Save Model in ML repositor

In [38]:
from repository.mlrepositoryclient import MLRepositoryClient
from repository.mlrepositoryartifact import MLRepositoryArtifact

In [39]:
service_path = 'https://internal-nginx-svc.ibm-private-cloud.svc.cluster.local:12443'
ml_repository_client = MLRepositoryClient()

In [40]:
type(model)

pyspark.ml.pipeline.PipelineModel

### Create the model artifact (abstraction layer)

In [41]:
model_artifact = MLRepositoryArtifact(model, training_data=train, name="Predict_Violations v1.0")

#### Save pipeline and model artifacts to in Machine Learning repository

In [42]:
saved_model = ml_repository_client.models.save(model_artifact)

### Saved model properties

In [43]:
print "modelType: " + saved_model.meta.prop("modelType")
print "creationTime: " + str(saved_model.meta.prop("creationTime"))
print "modelVersionHref: " + saved_model.meta.prop("modelVersionHref")
print "label: " + saved_model.meta.prop("label")

modelType: sparkml-model-2.0
creationTime: 2017-10-20 21:43:35.773000+00:00
modelVersionHref: https://internal-nginx-svc.ibm-private-cloud.svc.cluster.local:12443/v2/artifacts/models/a3cce819-b358-4ab7-9254-00188765d1cd/versions/e33313ff-7e3e-4f22-bd49-787c4ee94e69
label: eventTyp


## Sixth: Deploy and Test model

### Test with UI