# Spark Train Logistic Regression


Train Logistic Regression classifier with Apache SparkML

In [1]:
import warnings
warnings.filterwarnings('ignore')
from pyspark import SparkContext, SparkConf, SQLContext
import os
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import logging
logging.disable(level = 'CRITICAL')
import shutil
import site
import sys
import wget
import re

In [2]:
data_parquet = os.environ.get('data_parquet',
                              'data.parquet')  # input file name (parquet)
master = os.environ.get('master',
                        "local[*]")  # URL to Spark master
model_target = os.environ.get('model_target',
                              "model.xml")  # model output file name
data_dir = os.environ.get('data_dir',
                          'data/')  # temporary directory for data
input_columns = os.environ.get('input_columns',
                               '["x", "y", "z"]')  # input columns to consider

In [3]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [4]:
conf = SparkConf().setMaster(master)
#if sys.version[0:3] == '3.6' or sys.version[0:3] == '3.7':
conf.set("spark.jars", "Private/main/pmml-sparkml-example-executable-2.4.0.jar")

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

23/07/10 12:01:20 WARN Utils: Your hostname, davis-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/07/10 12:01:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/07/10 12:01:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/10 12:01:21 WARN DependencyUtils: Local jar /home/davis/.local/lib/python3.10/site-packages/pyspark/jars/japmml-sparkml-example-executable-2.4.0.jar does not exist, skipping.
23/07/10 12:01:22 INFO SparkContext: Running Spark version 3.4.1
23/07/10 12:01:22 INFO ResourceUtils: No custom resources configured for spark.driver.
23/07/10 12:01:22 INFO SparkContext: Submitted application: pyspark-shell
23/07/10 12:01:22 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , off

In [5]:
df = spark.read.parquet(data_dir + data_parquet)

23/07/10 12:01:24 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
23/07/10 12:01:25 INFO SharedState: Warehouse path is 'file:/home/davis/Private/main/spark-warehouse'.
23/07/10 12:01:27 INFO InMemoryFileIndex: It took 104 ms to list leaf files for 1 paths.
23/07/10 12:01:28 INFO SparkContext: Starting job: parquet at NativeMethodAccessorImpl.java:0
23/07/10 12:01:28 INFO DAGScheduler: Got job 0 (parquet at NativeMethodAccessorImpl.java:0) with 1 output partitions
23/07/10 12:01:28 INFO DAGScheduler: Final stage: ResultStage 0 (parquet at NativeMethodAccessorImpl.java:0)
23/07/10 12:01:28 INFO DAGScheduler: Parents of final stage: List()
23/07/10 12:01:28 INFO DAGScheduler: Missing parents: List()
23/07/10 12:01:28 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at parquet at NativeMethodAccessorImpl.java:0), which has no missing parents
23/07/10 12:01:28 INFO MemoryStore: Block broadcast_0 stored as values in me

In [6]:
# register a corresponding query table
df.createOrReplaceTempView('df')

In [7]:
from pyspark.sql.types import DoubleType
df = df.withColumn("x", df.x.cast(DoubleType()))
df = df.withColumn("y", df.y.cast(DoubleType()))
df = df.withColumn("z", df.z.cast(DoubleType()))

In [8]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [9]:
indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

In [47]:
#lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr = LogisticRegression(maxIter=1000, regParam=0.05, elasticNetParam=0.6)

In [48]:
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, lr])

In [49]:
model = pipeline.fit(df_train)

23/07/10 12:13:09 INFO FileSourceStrategy: Pushed Filters: 
23/07/10 12:13:09 INFO FileSourceStrategy: Post-Scan Filters: 
23/07/10 12:13:09 INFO MemoryStore: Block broadcast_407 stored as values in memory (estimated size 201.8 KiB, free 432.5 MiB)
23/07/10 12:13:09 INFO MemoryStore: Block broadcast_407_piece0 stored as bytes in memory (estimated size 35.1 KiB, free 432.4 MiB)
23/07/10 12:13:09 INFO BlockManagerInfo: Added broadcast_407_piece0 in memory on 10.0.2.15:46735 (size: 35.1 KiB, free: 434.1 MiB)
23/07/10 12:13:09 INFO SparkContext: Created broadcast 407 from collect at StringIndexer.scala:204
23/07/10 12:13:09 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/07/10 12:13:09 INFO DAGScheduler: Registering RDD 497 (collect at StringIndexer.scala:204) as input to shuffle 18
23/07/10 12:13:09 INFO DAGScheduler: Got map stage job 195 (collect at StringIndexer.scala:204) with 1 output partitions
2

In [50]:
prediction = model.transform(df_test)
df_test.show(5)

23/07/10 12:13:20 INFO FileSourceStrategy: Pushed Filters: 
23/07/10 12:13:20 INFO FileSourceStrategy: Post-Scan Filters: 
23/07/10 12:13:20 INFO MemoryStore: Block broadcast_465 stored as values in memory (estimated size 201.8 KiB, free 432.6 MiB)
23/07/10 12:13:20 INFO MemoryStore: Block broadcast_465_piece0 stored as bytes in memory (estimated size 35.1 KiB, free 432.6 MiB)
23/07/10 12:13:20 INFO BlockManagerInfo: Added broadcast_465_piece0 in memory on 10.0.2.15:46735 (size: 35.1 KiB, free: 434.1 MiB)
23/07/10 12:13:20 INFO SparkContext: Created broadcast 465 from showString at NativeMethodAccessorImpl.java:0
23/07/10 12:13:20 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/07/10 12:13:20 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
23/07/10 12:13:20 INFO DAGScheduler: Got job 223 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
23/07/10 

+---+----+----+--------------------+--------------+
|  x|   y|   z|              source|         class|
+---+----+----+--------------------+--------------+
|0.0|24.0|35.0|Accelerometer-201...| Sitdown_chair|
|0.0|26.0|42.0|Accelerometer-201...|   Brush_teeth|
|0.0|29.0|25.0|Accelerometer-201...|  Climb_stairs|
|0.0|29.0|32.0|Accelerometer-201...|Descend_stairs|
|0.0|29.0|46.0|Accelerometer-201...|   Brush_teeth|
+---+----+----+--------------------+--------------+
only showing top 5 rows



23/07/10 12:13:21 INFO Executor: Finished task 0.0 in stage 243.0 (TID 229). 2663 bytes result sent to driver
23/07/10 12:13:21 INFO TaskSetManager: Finished task 0.0 in stage 243.0 (TID 229) in 656 ms on 10.0.2.15 (executor driver) (1/1)
23/07/10 12:13:21 INFO TaskSchedulerImpl: Removed TaskSet 243.0, whose tasks have all completed, from pool 
23/07/10 12:13:21 INFO DAGScheduler: ResultStage 243 (showString at NativeMethodAccessorImpl.java:0) finished in 0.660 s
23/07/10 12:13:21 INFO DAGScheduler: Job 223 is finished. Cancelling potential speculative or zombie tasks for this job
23/07/10 12:13:21 INFO TaskSchedulerImpl: Killing all running tasks in stage 243: Stage finished
23/07/10 12:13:21 INFO DAGScheduler: Job 223 finished: showString at NativeMethodAccessorImpl.java:0, took 0.662525 s
                                                                                

In [51]:
prediction.show(5)

23/07/10 12:13:21 INFO FileSourceStrategy: Pushed Filters: 
23/07/10 12:13:21 INFO FileSourceStrategy: Post-Scan Filters: 
23/07/10 12:13:21 INFO MemoryStore: Block broadcast_467 stored as values in memory (estimated size 201.8 KiB, free 433.0 MiB)
23/07/10 12:13:21 INFO MemoryStore: Block broadcast_467_piece0 stored as bytes in memory (estimated size 35.1 KiB, free 433.0 MiB)
23/07/10 12:13:21 INFO BlockManagerInfo: Added broadcast_467_piece0 in memory on 10.0.2.15:46735 (size: 35.1 KiB, free: 434.2 MiB)
23/07/10 12:13:21 INFO SparkContext: Created broadcast 467 from showString at NativeMethodAccessorImpl.java:0
23/07/10 12:13:21 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/07/10 12:13:21 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
23/07/10 12:13:21 INFO DAGScheduler: Got job 224 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
23/07/10 

+---+----+----+--------------------+--------------+-----+---------------+--------------------+--------------------+--------------------+----------+
|  x|   y|   z|              source|         class|label|       features|       features_norm|       rawPrediction|         probability|prediction|
+---+----+----+--------------------+--------------+-----+---------------+--------------------+--------------------+--------------------+----------+
|0.0|24.0|35.0|Accelerometer-201...| Sitdown_chair|  8.0|[0.0,24.0,35.0]|[0.0,0.3809523809...|[3.15713533103328...|[0.55706803819486...|       0.0|
|0.0|26.0|42.0|Accelerometer-201...|   Brush_teeth|  6.0|[0.0,26.0,42.0]|[0.0,0.4126984126...|[2.87880414855864...|[0.48285900679348...|       0.0|
|0.0|29.0|25.0|Accelerometer-201...|  Climb_stairs|  4.0|[0.0,29.0,25.0]|[0.0,0.4603174603...|[3.55475130599705...|[0.63946982831892...|       0.0|
|0.0|29.0|32.0|Accelerometer-201...|Descend_stairs| 10.0|[0.0,29.0,32.0]|[0.0,0.4603174603...|[3.27642012352241.

23/07/10 12:13:22 INFO Executor: Finished task 0.0 in stage 244.0 (TID 230). 5455 bytes result sent to driver
23/07/10 12:13:22 INFO TaskSetManager: Finished task 0.0 in stage 244.0 (TID 230) in 905 ms on 10.0.2.15 (executor driver) (1/1)
23/07/10 12:13:22 INFO TaskSchedulerImpl: Removed TaskSet 244.0, whose tasks have all completed, from pool 
23/07/10 12:13:22 INFO DAGScheduler: ResultStage 244 (showString at NativeMethodAccessorImpl.java:0) finished in 0.910 s
23/07/10 12:13:22 INFO DAGScheduler: Job 224 is finished. Cancelling potential speculative or zombie tasks for this job
23/07/10 12:13:22 INFO TaskSchedulerImpl: Killing all running tasks in stage 244: Stage finished
23/07/10 12:13:22 INFO DAGScheduler: Job 224 finished: showString at NativeMethodAccessorImpl.java:0, took 0.912765 s
                                                                                

In [52]:
binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")

binEval.evaluate(prediction)

23/07/10 12:13:22 INFO FileSourceStrategy: Pushed Filters: 
23/07/10 12:13:22 INFO FileSourceStrategy: Post-Scan Filters: 
23/07/10 12:13:22 INFO MemoryStore: Block broadcast_469 stored as values in memory (estimated size 201.8 KiB, free 432.9 MiB)
23/07/10 12:13:22 INFO MemoryStore: Block broadcast_469_piece0 stored as bytes in memory (estimated size 35.1 KiB, free 432.9 MiB)
23/07/10 12:13:22 INFO BlockManagerInfo: Added broadcast_469_piece0 in memory on 10.0.2.15:46735 (size: 35.1 KiB, free: 434.2 MiB)
23/07/10 12:13:22 INFO SparkContext: Created broadcast 469 from rdd at MulticlassClassificationEvaluator.scala:191
23/07/10 12:13:22 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/07/10 12:13:22 INFO SparkContext: Starting job: collectAsMap at MulticlassMetrics.scala:61
23/07/10 12:13:22 INFO DAGScheduler: Registering RDD 571 (map at MulticlassMetrics.scala:52) as input to shuffle 20
23/07/10 12:1

0.3342259713016893

23/07/10 12:31:24 INFO BlockManagerInfo: Removed broadcast_470_piece0 on 10.0.2.15:46735 in memory (size: 34.5 KiB, free: 434.2 MiB)
23/07/10 12:31:24 INFO BlockManagerInfo: Removed broadcast_469_piece0 on 10.0.2.15:46735 in memory (size: 35.1 KiB, free: 434.3 MiB)
23/07/10 12:31:24 INFO BlockManagerInfo: Removed broadcast_471_piece0 on 10.0.2.15:46735 in memory (size: 2.8 KiB, free: 434.3 MiB)
23/07/10 12:39:51 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 294184 ms exceeds timeout 120000 ms
23/07/10 12:39:51 WARN SparkContext: Killing executors is not supported by current scheduler.
23/07/10 12:39:51 INFO Executor: Told to re-register on heartbeat
23/07/10 12:39:51 INFO BlockManager: BlockManager BlockManagerId(driver, 10.0.2.15, 46735, None) re-registering with master
23/07/10 12:39:51 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.2.15, 46735, None)
23/07/10 12:39:51 ERROR Inbox: Ignoring error
org.apache.spark.SparkExcep

In [None]:
pmmlBuilder = PMMLBuilder(sc, df_train, model)
pmmlBuilder.buildFile(data_dir + model_target)