# Spark Train Logistic Regression


Train Logistic Regression classifier with Apache SparkML

In [3]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

37
Starting installation...
Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)


In [4]:
from pyspark import SparkContext, SparkConf, SQLContext
import os
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import logging
import shutil
import site
import sys
import wget
import re

In [5]:
if sys.version[0:3] == '3.9':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.8':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.7':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
elif sys.version[0:3] == '3.6':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
else:
    raise Exception('Currently only python 3.6 , 3.7, 3,8 and 3.9 is supported, in case '
                    'you need a different version please open an issue at '
                    'https://github.com/IBM/claimed/issues')

In [6]:
# Task 4

In [7]:
data_parquet = os.environ.get('data_parquet',
                              'data.parquet')  # input file name (parquet)
master = os.environ.get('master',
                        "local[*]")  # URL to Spark master
model_target = os.environ.get('model_target',
                              "model.xml")  # model output file name
data_dir = os.environ.get('data_dir',
                          '../../data/')  # temporary directory for data
input_columns = os.environ.get('input_columns',
                               '["x", "y", "z"]')  # input columns to consider

In [8]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [9]:
conf = SparkConf().setMaster(master)
#if sys.version[0:3] == '3.6' or sys.version[0:3] == '3.7':
conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

22/07/14 12:46:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [10]:
df = spark.read.parquet(data_dir + data_parquet)

                                                                                

In [11]:
# register a corresponding query table
df.createOrReplaceTempView('df')

In [12]:
from pyspark.sql.types import DoubleType
df = df.withColumn("x", df.x.cast(DoubleType()))
df = df.withColumn("y", df.y.cast(DoubleType()))
df = df.withColumn("z", df.z.cast(DoubleType()))

In [13]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [14]:
indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

In [17]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [18]:
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])

In [19]:
model = pipeline.fit(df_train)

22/07/14 12:47:36 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/07/14 12:47:36 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [20]:
prediction = model.transform(df_train)

In [21]:
binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")

binEval.evaluate(prediction)

                                                                                

0.2064989224439531

In [22]:
pmmlBuilder = PMMLBuilder(sc, df_train, model)
pmmlBuilder.buildFile(data_dir + model_target)

'/resources/labs/BD0231EN/claimed/component-library/train/../../data/model.xml'

In [None]:
# Task 8 HyperParameter Tuning

In [18]:
maxIter = [10, 100, 1000]
regParam = [0.01, 0.5, 2.0]
elasticNetParam = [0.0, 0.5, 1.0]

In [None]:
maxPred = 0

for i in range(len(maxIter)):
    for j in range(len(regParam)):
        for k in range(len(elasticNetParam)):
            lr = LogisticRegression(maxIter=maxIter[i], regParam=regParam[j], elasticNetParam=elasticNetParam[k])
            pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])
            model = pipeline.fit(df_train)
            prediction = model.transform(df_train)
            binEval = MulticlassClassificationEvaluator(). \
                setMetricName("accuracy"). \
                setPredictionCol("prediction"). \
                setLabelCol("label")
            pred = binEval.evaluate(prediction)
            if pred > maxPred:
                maxPred = pred
            print(maxIter[i], regParam[j], elasticNetParam[k], pred)

print(maxPred)



10 0.01 0.0 0.3281784103538924




10 0.01 0.5 0.33383072181053136


                                                                                

10 0.01 1.0 0.3172097346297162


                                                                                

10 0.5 0.0 0.22113163810648967


                                                                                

10 0.5 0.5 0.20640875254410007


                                                                                

10 0.5 1.0 0.20640875254410007


                                                                                

10 2.0 0.0 0.20640875254410007


                                                                                

10 2.0 0.5 0.20640875254410007


                                                                                

10 2.0 1.0 0.20640875254410007


                                                                                

100 0.01 0.0 0.3475065719668868


                                                                                

100 0.01 0.5 0.34441585782714956


                                                                                

100 0.01 1.0 0.3418486613904294


                                                                                

100 0.5 0.0 0.2409637205031369


                                                                                

100 0.5 0.5 0.20640875254410007


                                                                                

100 0.5 1.0 0.20640875254410007


                                                                                

100 2.0 0.0 0.20640875254410007


                                                                                

100 2.0 0.5 0.20640875254410007


                                                                                

100 2.0 1.0 0.20640875254410007


                                                                                

1000 0.01 0.0 0.34709783622014617


                                                                                

In [None]:
# Task 9 Resample data splits

In [25]:
splits = df.randomSplit([0.7, 0.3])
df_train = splits[0]
df_test = splits[1]
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_train)
binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")

binEval.evaluate(prediction)

                                                                                

0.20660937729780882

In [23]:
splits = df.randomSplit([0.9, 0.1])
df_train = splits[0]
df_test = splits[1]
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_train)
binEval = MulticlassClassificationEvaluator(). \
                setMetricName("accuracy"). \
                setPredictionCol("prediction"). \
                setLabelCol("label")
binEval.evaluate(prediction)

                                                                                

0.2066463347894009