In [None]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

In [67]:
from pyspark import SparkContext, SparkConf, SQLContext
import os
import numpy as np
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator as bin_eval
from pyspark.sql.types import FloatType
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import MinMaxScaler
import logging
import shutil
import site
import sys
import wget
import re

In [19]:
data_parquet = os.environ.get('data_parquet', 'data.parquet')
data_csv = os.environ.get('data_csv', 'data.csv')
master = os.environ.get('master', "local[*]")
data_dir = os.environ.get('data_dir', '/resources/labs/BD0231EN/claimed/data/')

In [20]:
conf = SparkConf().setMaster(master)
conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

In [21]:
#verify spark session
spark

In [22]:
#read parquet file
parquet_file = spark.read.parquet(data_dir + data_parquet)

22/11/07 07:14:55 INFO spark.SparkContext: Starting job: parquet at NativeMethodAccessorImpl.java:0
22/11/07 07:14:55 INFO scheduler.DAGScheduler: Got job 0 (parquet at NativeMethodAccessorImpl.java:0) with 1 output partitions
22/11/07 07:14:55 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (parquet at NativeMethodAccessorImpl.java:0)
22/11/07 07:14:55 INFO scheduler.DAGScheduler: Parents of final stage: List()
22/11/07 07:14:55 INFO scheduler.DAGScheduler: Missing parents: List()
22/11/07 07:14:55 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at parquet at NativeMethodAccessorImpl.java:0), which has no missing parents
22/11/07 07:14:55 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 104.9 KB, free 366.2 MB)
22/11/07 07:14:55 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 38.0 KB, free 366.2 MB)
22/11/07 07:14:55 INFO storage.BlockManagerInfo: Added broadcast_0_piece

In [23]:
data_csv = 'random_Forest.csv'
skip = False
if os.path.exists(data_dir + data_csv):
    skip = True

In [24]:
#Convert parquet to csv
if not skip:
    if os.path.exists(data_dir + data_csv):
        shutil.rmtree(data_dir + data_csv)
    parquet_file.coalesce(1).write.option("header", "true").csv(data_dir + data_csv)
    file = glob.glob(data_dir + data_csv + '/part-*')
    shutil.move(file[0], data_dir + data_csv + '.tmp')
    shutil.rmtree(data_dir + data_csv)
    shutil.move(data_dir + data_csv + '.tmp', data_dir + data_csv)

In [56]:
#read csv into a dataframe
df = spark.read.csv(data_dir + data_csv, header=True)
indexer = StringIndexer(inputCol="class", outputCol="class_indexed")
indexed = indexer.fit(df).transform(df)
indexed = indexed.withColumn("x", indexed.x.cast(DoubleType()))
indexed = indexed.withColumn("y", indexed.y.cast(DoubleType()))
indexed = indexed.withColumn("z", indexed.z.cast(DoubleType()))
indexed.show()

22/11/07 08:15:00 INFO datasources.FileSourceStrategy: Pruning directories with: 
22/11/07 08:15:00 INFO datasources.FileSourceStrategy: Post-Scan Filters: (length(trim(value#484, None)) > 0)
22/11/07 08:15:00 INFO datasources.FileSourceStrategy: Output Data Schema: struct<value: string>
22/11/07 08:15:00 INFO execution.FileSourceScanExec: Pushed Filters: 
22/11/07 08:15:00 INFO memory.MemoryStore: Block broadcast_57 stored as values in memory (estimated size 394.7 KB, free 365.0 MB)
22/11/07 08:15:00 INFO memory.MemoryStore: Block broadcast_57_piece0 stored as bytes in memory (estimated size 36.4 KB, free 365.0 MB)
22/11/07 08:15:00 INFO storage.BlockManagerInfo: Added broadcast_57_piece0 in memory on jupyterlab-u12rishabhsi:34389 (size: 36.4 KB, free: 366.2 MB)
22/11/07 08:15:00 INFO spark.SparkContext: Created broadcast 57 from csv at NativeMethodAccessorImpl.java:0
22/11/07 08:15:00 INFO execution.FileSourceScanExec: Planning scan with bin packing, max size: 4471191 bytes, open cos

+----+----+----+--------------------+--------+-------------+
|   x|   y|   z|              source|   class|class_indexed|
+----+----+----+--------------------+--------+-------------+
|33.0|36.0|51.0|Accelerometer-201...|Eat_meat|          5.0|
|33.0|36.0|51.0|Accelerometer-201...|Eat_meat|          5.0|
|33.0|35.0|53.0|Accelerometer-201...|Eat_meat|          5.0|
|31.0|37.0|52.0|Accelerometer-201...|Eat_meat|          5.0|
|32.0|36.0|52.0|Accelerometer-201...|Eat_meat|          5.0|
|32.0|36.0|51.0|Accelerometer-201...|Eat_meat|          5.0|
|32.0|36.0|51.0|Accelerometer-201...|Eat_meat|          5.0|
|33.0|36.0|53.0|Accelerometer-201...|Eat_meat|          5.0|
|33.0|35.0|52.0|Accelerometer-201...|Eat_meat|          5.0|
|33.0|36.0|52.0|Accelerometer-201...|Eat_meat|          5.0|
|32.0|35.0|53.0|Accelerometer-201...|Eat_meat|          5.0|
|33.0|36.0|52.0|Accelerometer-201...|Eat_meat|          5.0|
|32.0|38.0|53.0|Accelerometer-201...|Eat_meat|          5.0|
|32.0|37.0|52.0|Accelero

22/11/07 08:15:03 INFO execution.FileSourceScanExec: Planning scan with bin packing, max size: 4471191 bytes, open cost is considered as scanning 4194304 bytes.
22/11/07 08:15:03 INFO spark.SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
22/11/07 08:15:03 INFO scheduler.DAGScheduler: Got job 24 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
22/11/07 08:15:03 INFO scheduler.DAGScheduler: Final stage: ResultStage 28 (showString at NativeMethodAccessorImpl.java:0)
22/11/07 08:15:03 INFO scheduler.DAGScheduler: Parents of final stage: List()
22/11/07 08:15:03 INFO scheduler.DAGScheduler: Missing parents: List()
22/11/07 08:15:03 INFO scheduler.DAGScheduler: Submitting ResultStage 28 (MapPartitionsRDD[173] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
22/11/07 08:15:03 INFO memory.MemoryStore: Block broadcast_64 stored as values in memory (estimated size 19.0 KB, free 365.4 MB)
22/11/07 08:15:03 INFO memor

In [57]:
assembler = VectorAssembler(inputCols=['x', 'y', 'z'], outputCol='features')

In [58]:
output = assembler.transform(indexed)

In [62]:
df_model = output.select("features", "class_indexed")
df_model.show()

22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 882
22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 839
22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 921
22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 896
22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 926
22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 910
22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 871
22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 924
22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 937
22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 875
22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 929
22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 838
22/11/07 08:15:50 INFO spark.ContextCleaner: Cleaned accumulator 873
22/11/07 08:15:50 INFO storage.BlockManagerInfo: Removed broadcast_65_piece0 on jupyterlab-u12rishabhsi

+----------------+-------------+
|        features|class_indexed|
+----------------+-------------+
|[33.0,36.0,51.0]|          5.0|
|[33.0,36.0,51.0]|          5.0|
|[33.0,35.0,53.0]|          5.0|
|[31.0,37.0,52.0]|          5.0|
|[32.0,36.0,52.0]|          5.0|
|[32.0,36.0,51.0]|          5.0|
|[32.0,36.0,51.0]|          5.0|
|[33.0,36.0,53.0]|          5.0|
|[33.0,35.0,52.0]|          5.0|
|[33.0,36.0,52.0]|          5.0|
|[32.0,35.0,53.0]|          5.0|
|[33.0,36.0,52.0]|          5.0|
|[32.0,38.0,53.0]|          5.0|
|[32.0,37.0,52.0]|          5.0|
|[33.0,35.0,52.0]|          5.0|
|[32.0,36.0,53.0]|          5.0|
|[32.0,36.0,53.0]|          5.0|
|[32.0,36.0,52.0]|          5.0|
|[34.0,36.0,52.0]|          5.0|
|[33.0,36.0,52.0]|          5.0|
+----------------+-------------+
only showing top 20 rows



In [63]:
#RandomSplitting 80/20 with seed as 1
train, test = df_model.randomSplit([0.8, 0.2], seed=1)

In [74]:
#Training a Random Forest Model
#Using numTrees as 10 and maxDepth as 5
rf = RandomForestClassifier(labelCol="class_indexed", numTrees=10, maxDepth=5, seed=1).fit(train)

22/11/07 08:59:04 INFO util.Instrumentation: [6b18c417] Stage class: RandomForestClassifier
22/11/07 08:59:04 INFO util.Instrumentation: [6b18c417] Stage uid: RandomForestClassifier_10b7edad84ac
22/11/07 08:59:04 INFO datasources.FileSourceStrategy: Pruning directories with: 
22/11/07 08:59:04 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
22/11/07 08:59:04 INFO datasources.FileSourceStrategy: Output Data Schema: struct<x: string, y: string, z: string, source: string, class: string ... 3 more fields>
22/11/07 08:59:04 INFO execution.FileSourceScanExec: Pushed Filters: 
22/11/07 08:59:04 INFO memory.MemoryStore: Block broadcast_170 stored as values in memory (estimated size 394.7 KB, free 356.8 MB)
22/11/07 08:59:04 INFO memory.MemoryStore: Block broadcast_170_piece0 stored as bytes in memory (estimated size 36.4 KB, free 356.8 MB)
22/11/07 08:59:04 INFO storage.BlockManagerInfo: Added broadcast_170_piece0 in memory on jupyterlab-u12rishabhsi:34389 (size: 36.4 KB, free: 364.3 

In [75]:
rf_pred = rf.transform(test)

In [76]:
rf_accuracy = bin_eval(labelCol="class_indexed").evaluate(rf_pred)
print("Accuracy: ", rf_accuracy)

22/11/07 08:59:34 INFO datasources.FileSourceStrategy: Pruning directories with: 
22/11/07 08:59:34 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
22/11/07 08:59:34 INFO datasources.FileSourceStrategy: Output Data Schema: struct<x: string, y: string, z: string, source: string, class: string ... 3 more fields>
22/11/07 08:59:34 INFO execution.FileSourceScanExec: Pushed Filters: 
22/11/07 08:59:34 INFO memory.MemoryStore: Block broadcast_192 stored as values in memory (estimated size 394.7 KB, free 364.9 MB)
22/11/07 08:59:34 INFO memory.MemoryStore: Block broadcast_192_piece0 stored as bytes in memory (estimated size 36.4 KB, free 364.9 MB)
22/11/07 08:59:34 INFO storage.BlockManagerInfo: Added broadcast_192_piece0 in memory on jupyterlab-u12rishabhsi:34389 (size: 36.4 KB, free: 366.1 MB)
22/11/07 08:59:34 INFO spark.SparkContext: Created broadcast 192 from rdd at BinaryClassificationEvaluator.scala:81
22/11/07 08:59:34 INFO execution.FileSourceScanExec: Planning scan with bin

Accuracy:  0.5185190464568421


22/11/07 08:59:40 INFO executor.Executor: Finished task 1.0 in stage 140.0 (TID 743). 1372 bytes result sent to driver
22/11/07 08:59:40 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 140.0 (TID 745) in 160 ms on localhost (executor driver) (7/10)
22/11/07 08:59:40 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 140.0 (TID 743) in 180 ms on localhost (executor driver) (8/10)
22/11/07 08:59:40 INFO storage.ShuffleBlockFetcherIterator: Getting 8 non-empty blocks including 8 local blocks and 0 remote blocks
22/11/07 08:59:40 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 21 ms
22/11/07 08:59:40 INFO memory.MemoryStore: Block rdd_394_6 stored as values in memory (estimated size 7.5 KB, free 364.6 MB)
22/11/07 08:59:40 INFO storage.BlockManagerInfo: Added rdd_394_6 in memory on jupyterlab-u12rishabhsi:34389 (size: 7.5 KB, free: 366.0 MB)
22/11/07 08:59:40 INFO executor.Executor: Finished task 7.0 in stage 140.0 (TID 749). 1415 bytes result sent to

In [77]:
#Using numTrees as 20 and maxDepth as 7
rf = RandomForestClassifier(labelCol="class_indexed", numTrees=20, maxDepth=7, seed=1).fit(train)

22/11/07 09:00:01 INFO util.Instrumentation: [7fea7f43] Stage class: RandomForestClassifier
22/11/07 09:00:01 INFO util.Instrumentation: [7fea7f43] Stage uid: RandomForestClassifier_884223ebc959
22/11/07 09:00:02 INFO datasources.FileSourceStrategy: Pruning directories with: 
22/11/07 09:00:02 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
22/11/07 09:00:02 INFO datasources.FileSourceStrategy: Output Data Schema: struct<x: string, y: string, z: string, source: string, class: string ... 3 more fields>
22/11/07 09:00:02 INFO execution.FileSourceScanExec: Pushed Filters: 
22/11/07 09:00:02 INFO memory.MemoryStore: Block broadcast_199 stored as values in memory (estimated size 394.7 KB, free 364.2 MB)
22/11/07 09:00:02 INFO memory.MemoryStore: Block broadcast_199_piece0 stored as bytes in memory (estimated size 36.4 KB, free 364.2 MB)
22/11/07 09:00:02 INFO storage.BlockManagerInfo: Added broadcast_199_piece0 in memory on jupyterlab-u12rishabhsi:34389 (size: 36.4 KB, free: 366.0 

In [78]:
rf_pred2 = rf.transform(test)
rf_accuracy2 = bin_eval(labelCol="class_indexed").evaluate(rf_pred2)
print("Accuracy: ", rf_accuracy2)

22/11/07 09:00:47 INFO datasources.FileSourceStrategy: Pruning directories with: 
22/11/07 09:00:47 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
22/11/07 09:00:47 INFO datasources.FileSourceStrategy: Output Data Schema: struct<x: string, y: string, z: string, source: string, class: string ... 3 more fields>
22/11/07 09:00:47 INFO execution.FileSourceScanExec: Pushed Filters: 
22/11/07 09:00:47 INFO memory.MemoryStore: Block broadcast_227 stored as values in memory (estimated size 394.7 KB, free 360.2 MB)
22/11/07 09:00:47 INFO memory.MemoryStore: Block broadcast_227_piece0 stored as bytes in memory (estimated size 36.4 KB, free 360.1 MB)
22/11/07 09:00:47 INFO storage.BlockManagerInfo: Added broadcast_227_piece0 in memory on jupyterlab-u12rishabhsi:34389 (size: 36.4 KB, free: 365.2 MB)
22/11/07 09:00:47 INFO spark.SparkContext: Created broadcast 227 from rdd at BinaryClassificationEvaluator.scala:81
22/11/07 09:00:47 INFO execution.FileSourceScanExec: Planning scan with bin

Accuracy:  0.5021613432323992


22/11/07 09:00:54 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 170.0 (TID 929) in 196 ms on localhost (executor driver) (4/10)
22/11/07 09:00:54 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 170.0 (TID 927) in 199 ms on localhost (executor driver) (5/10)
22/11/07 09:00:54 INFO memory.MemoryStore: Block rdd_451_4 stored as values in memory (estimated size 41.0 KB, free 359.1 MB)
22/11/07 09:00:54 INFO storage.BlockManagerInfo: Added rdd_451_4 in memory on jupyterlab-u12rishabhsi:34389 (size: 41.0 KB, free: 364.7 MB)
22/11/07 09:00:54 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 170.0 (TID 930) in 256 ms on localhost (executor driver) (6/10)
22/11/07 09:00:54 INFO memory.MemoryStore: Block rdd_451_1 stored as values in memory (estimated size 33.2 KB, free 359.0 MB)
22/11/07 09:00:54 INFO memory.MemoryStore: Block rdd_451_6 stored as values in memory (estimated size 33.2 KB, free 359.0 MB)
22/11/07 09:00:54 INFO storage.BlockManagerInfo: Added rdd_451