# ETL and Machine Learning

In this lab I’ll create an Apache Spark Machine learning application as end to end use case from data acquisition, transformation, model training and deployment.

Objectives
After completing reading, you will see and hopefelly be able also to:

- Pull-in data from the HMP dataset <a href="https://github.com/wchill/HMP_Dataset">here</a>
- Create a Spark data frame from the raw data
- Store this to parquet (in Cloud Object Store)
- Read it again (from Cloud Object Store)
- Deploy this model to Train a ML-Model on that data set
- Watson Machine Learning

## 1. Pull-in data from the HMP dataset 

Now it’s time to explore data <a href="https://github.com/wchill/HMP_Dataset">here</a>. You're invited to get familiarize a little bit with it. It's important to understand data so that you can grasp thefollowing step code easily.
Let's pull the data in raw format from the source (github).

In [3]:
import findspark
findspark.init()

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
conf = SparkConf().setAppName("SparkApp_ETL_ML").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
spark = SparkSession.builder.getOrCreate()

import pandas as pd
df=pd.read_parquet("https://s3.eu-de.cloud-object-storage.appdomain.cloud/cloud-object-storage-yy-cos-standard-js4/data.parquet")

sdf = spark.createDataFrame(df)

from pyspark.sql.types import DoubleType
sdf = sdf.withColumn("x", sdf.x.cast(DoubleType()))
sdf = sdf.withColumn("y", sdf.y.cast(DoubleType()))
sdf = sdf.withColumn("z", sdf.z.cast(DoubleType()))

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

input_columns = ["x", "y", "z"]  # input columns to consider
train, test = sdf.randomSplit([0.8, 0.2], seed=1)
indexer = StringIndexer(inputCol="class", outputCol="label")
vectorAssembler = VectorAssembler(inputCols=input_columns, outputCol="features")
normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer])
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy").setPredictionCol("prediction"). \
    setLabelCol("label")
df_train = pipeline.fit(train).transform(train)
df_test = pipeline.fit(test).transform(test)

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol='features_norm', labelCol='label', maxDepth=20, numTrees=7, seed=1)
rfModel = rf.fit(df_train)

from pyspark2pmml import PMMLBuilder
model_target = "HMP_frModel.xml" 

pmmlBuilder = PMMLBuilder(sc, df_train, rfModel)

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 14 kB/s  eta 0:00:01    |███                             | 26.2 MB 4.6 MB/s eta 0:00:56     |███████▎                        | 64.2 MB 6.0 MB/s eta 0:00:37     |█████████████████▏              | 151.0 MB 7.3 MB/s eta 0:00:18     |█████████████████████▍          | 188.2 MB 5.7 MB/s eta 0:00:17     |██████████████████████████▋     | 233.8 MB 7.1 MB/s eta 0:00:07     |███████████████████████████████▎| 274.5 MB 6.1 MB/s eta 0:00:02
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 5.5 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=ff6aa6dfb729a8007bfc4d584b0bd125dccee78b79820147a37fdfc3262d0441
  Stored in dir

In [14]:
!pip3 install pyspark2pmml==0.5.1



In [12]:
from pyspark2pmml import PMMLBuilder
model_target = "HMP_frModel.xml"       # model output file name

In [18]:
spark

In [23]:
!pip3 install wget

Processing /home/mbg/.cache/pip/wheels/bd/a8/c3/3cf2c14a1837a4e04bd98631724e81f33f462d86a1d895fae0/wget-3.2-py3-none-any.whl
Installing collected packages: wget
Successfully installed wget-3.2


In [34]:
# import shutil
# import site
# import wget
# url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
#            'jpmml-sparkml-executable-1.7.2.jar')
# wget.download(url)
# # shutil.copy('jpmml-sparkml-executable-1.7.2.jar', site.getsitepackages()[0] + '/pyspark/jars/')
# shutil.copy('jpmml-sparkml-executable-1.7.2.jar', '~/.local/lib/python3.8/site-packages/pyspark/jars')


In [13]:
pmmlBuilder = PMMLBuilder(sc, df_train, rfModel)

Exception in thread "Thread-4" java.lang.ExceptionInInitializerError
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:398)
	at py4j.reflection.CurrentThreadClassLoadingStrategy.classForName(CurrentThreadClassLoadingStrategy.java:40)
	at py4j.reflection.ReflectionUtil.classForName(ReflectionUtil.java:51)
	at py4j.reflection.TypeUtil.forName(TypeUtil.java:243)
	at py4j.commands.ReflectionCommand.getUnknownMember(ReflectionCommand.java:175)
	at py4j.commands.ReflectionCommand.execute(ReflectionCommand.java:87)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Expected Apache Spark ML version 3.1, got version 3.2 (3.2.0)
	at org.jpmml.sparkml.ConverterFactory.checkVersion(ConverterFactory.java:114)
	at org.jpmml.sparkml.PMMLBuilder.init(PMML

Py4JError: org.jpmml.sparkml.PMMLBuilder does not exist in the JVM

In [None]:
pmmlBuilder.buildFile(model_target)