# Clone Claimed from Github

In [1]:
%%bash
rm -Rf component-library
rm -Rf claimed
rm -Rf claimed-component-library
rm -Rf data
rm -Rf HMP_Dataset
git clone https://github.com/WazirRohiman/claimed-component-library.git

Cloning into 'claimed-component-library'...
Checking out files: 100% (434/434), done.


# Python Version Check

In [2]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

37
Starting installation...
Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)


# Python imports

In [36]:
import fnmatch
import os
import pandas as pd
from pathlib import Path
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
import random
import re
import shutil
import sys
import logging
import glob
import site
import wget

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

from pyspark.sql.types import DoubleType


Exception ignored in: <function JavaWrapper.__del__ at 0x7f9d58c46560>
Traceback (most recent call last):
  File "/home/jupyterlab/conda/envs/python/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'


# Input HMP data

## Setting paths and file names

In [5]:
# path and file name for output (default: data.csv)
data_csv = os.environ.get('data_csv', 'data.csv')

# url of master (default: local mode)
master = os.environ.get('master', "local[*]")

# temporal data storage for local execution
data_dir = os.environ.get('data_dir', '../../data/')

# sample on input data to increase processing speed 0..1 (default: 1.0)
sample = os.environ.get('sample', '1.0')

# destination path and parquet file name (default: data.parquet)
output_data_parquet = os.environ.get('output_data_parquet', 'data.parquet')

# data_parquet path and parquet file name (default: data.parquet)
data_parquet = os.environ.get('data_parquet', 'data.parquet')

# target condensed parquet file (default: data_condensed.parquet)
data_parquet_condensed = os.environ.get('data_parquet', 'data_condensed.parquet')

# target condensed parquet file (default: data_condensed.parquet)
data_csv_condensed = os.environ.get('data_csv', 'data_condensed.csv')

#for random forest
model_target = os.environ.get('model_target', "model.xml")  # model output file name

input_columns = os.environ.get('input_columns', '["x", "y", "z"]')  # input columns to consider


Parameters for HMP input

In [6]:
# override parameters received from a potential call using %run magic
parameters = list(
    map(
        lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1,
                sys.argv
            )
        )
    )
)

for parameter in parameters:
    exec(parameter)

# cast parameters to appropriate type
sample = float(sample)

# Create Spark context

In [7]:
sc = SparkContext \
    .getOrCreate(SparkConf() \
    .setMaster(master))

spark = SparkSession \
    .builder \
    .getOrCreate()

23/03/14 05:24:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Pull the data in raw format from the source (github)

In [8]:
!rm -Rf HMP_Dataset
!git clone https://github.com/wchill/HMP_Dataset

Cloning into 'HMP_Dataset'...
remote: Enumerating objects: 865, done.[K
remote: Total 865 (delta 0), reused 0 (delta 0), pack-reused 865[K
Receiving objects: 100% (865/865), 1010.96 KiB | 267.00 KiB/s, done.
Checking out files: 100% (848/848), done.


In [9]:
schema = StructType([
    StructField("x", IntegerType(), True),
    StructField("y", IntegerType(), True),
    StructField("z", IntegerType(), True)])

This step takes a while, it parses through all files and folders and creates a temporary dataframe for each file which gets appended to an overall data-frame "df". In addition, a column called "class" is added to allow for straightforward usage in Spark afterwards in a supervised machine learning scenario for example.

In [10]:
d = 'HMP_Dataset/'

# filter list for all folders containing data (folders that don't start with .)
file_list_filtered = [s for s in os.listdir(d)
                      if os.path.isdir(os.path.join(d, s)) & ~fnmatch.fnmatch(s, '.*')]

# create pandas data frame for all the data

df = None

for category in file_list_filtered:
    data_files = os.listdir('HMP_Dataset/' + category)

    # create a temporary pandas data frame for each data file
    for data_file in data_files:
        if sample < 1.0:
            if random.random() > sample:
                print('Skipping: ' + data_file)
                continue
        print(data_file)
        temp_df = spark.read. \
            option("header", "false"). \
            option("delimiter", " "). \
            csv('HMP_Dataset/' + category + '/' + data_file, schema=schema)

        # create a column called "source" storing the current CSV file
        temp_df = temp_df.withColumn("source", lit(data_file))

        # create a column called "class" storing the current data folder
        temp_df = temp_df.withColumn("class", lit(category))

        if df is None:
            df = temp_df
        else:
            df = df.union(temp_df)

Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt
Accelerometer-2011-04-11-13-29-54-brush_teeth-f1.txt
Accelerometer-2011-05-30-08-35-11-brush_teeth-f1.txt
Accelerometer-2011-05-30-09-36-50-brush_teeth-f1.txt
Accelerometer-2011-05-30-10-34-16-brush_teeth-m1.txt
Accelerometer-2011-05-30-21-10-57-brush_teeth-f1.txt
Accelerometer-2011-05-30-21-55-04-brush_teeth-m2.txt
Accelerometer-2011-05-31-15-16-47-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-42-22-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-45-50-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-45-27-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-48-05-brush_teeth-f1.txt
Accelerometer-2011-03-24-10-24-39-climb_stairs-f1.txt
Accelerometer-2011-03-24-10-25-44-climb_stairs-f1.txt
Accelerometer-2011-03-29-09-55-46-climb_stairs-f1.txt
Accelerometer-2011-04-05-18-21-22-climb_stairs-f1.txt
Accelerometer-2011-04-05-18-32-29-climb_stairs-f1.txt
Accelerometer-2011-04-11-11-44-35-climb_stairs-f1.txt
Accelerometer-2011-04-11-11-57-50-climb_

Write dataframe to file in CSV format

In [11]:
if Path(data_dir + data_csv).exists():
    shutil.rmtree(data_dir + data_csv)
    
df.write.option("header", "true").csv(data_dir + data_csv)

                                                                                

# Convert CSV files to Parquet using Spark

In [12]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [13]:
skip = False

if os.path.exists(data_dir + output_data_parquet):
    skip = True

In [14]:
if not skip:
    df = spark.read.option('header', 'true').csv(data_dir + data_csv)

In [15]:
if not skip:
    df.write.parquet(data_dir + output_data_parquet)

23/03/14 05:45:05 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (983,695,348 bytes) of heap memory
Scaling row group sizes to 91.61% for 8 writers
23/03/14 05:45:19 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (983,695,348 bytes) of heap memory
Scaling row group sizes to 91.61% for 8 writers
23/03/14 05:45:21 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (983,695,348 bytes) of heap memory
Scaling row group sizes to 91.61% for 8 writers


# Condense Parquet files using Spark

Condenses a partitioned folder containing PARQUET files into a single PARQUET file by re-partitioning it to
one and extracting the single PARQUET file

In [16]:
# override parameters received from a potential call using %run magic
parameters = list(
    map(
        lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1,
                sys.argv
            )
        )
    )
)

for parameter in parameters:
    exec(parameter)

In [17]:
df = spark.read.parquet(data_dir + data_parquet)

df = df.repartition(1)

df.write.parquet(data_dir + data_parquet_condensed)

                                                                                

Move the condensed Parquet file outside of the .parquet folder

In [18]:
!mv {data_dir}/{data_parquet_condensed}/`ls {data_dir}/{data_parquet_condensed} |grep .parquet` {data_dir}/{data_parquet_condensed}_tmp

In [19]:
!rm -Rf  {data_dir}/{data_parquet_condensed}

In [20]:
!mv {data_dir}/{data_parquet_condensed}_tmp {data_dir}/{data_parquet_condensed}

# Convert condensed parquet file to CSV file with header using Spark

In [21]:
skip = False
if os.path.exists(data_dir + data_csv_condensed):
    skip = True

In [22]:
if not skip:
    df = spark.read.parquet(data_dir + data_parquet_condensed)

In [23]:
if not skip:
    if os.path.exists(data_dir + data_csv_condensed):
        shutil.rmtree(data_dir + data_csv_condensed)
    df.coalesce(1).write.option("header", "true").csv(data_dir + data_csv_condensed)
    file = glob.glob(data_dir + data_csv_condensed + '/part-*')
    shutil.move(file[0], data_dir + data_csv_condensed + '.tmp')
    shutil.rmtree(data_dir + data_csv_condensed)
    shutil.move(data_dir + data_csv_condensed + '.tmp', data_dir + data_csv_condensed)

                                                                                

# Spark Train Random Forest Classifier Model

Train Random Forest Classifier Model with Apache SparkML

In [24]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [25]:
conf = SparkConf().setMaster(master)
#if sys.version[0:3] == '3.6' or sys.version[0:3] == '3.7':
conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

In [26]:
df = spark.read.format("csv").option("header", "true").load(os.path.join(data_dir, data_csv_condensed))
df.limit(5).collect()

[Row(x='33', y='36', z='51', source='Accelerometer-2011-03-24-13-21-39-eat_meat-f1.txt', class='Eat_meat'),
 Row(x='33', y='36', z='51', source='Accelerometer-2011-03-24-13-21-39-eat_meat-f1.txt', class='Eat_meat'),
 Row(x='33', y='35', z='53', source='Accelerometer-2011-03-24-13-21-39-eat_meat-f1.txt', class='Eat_meat'),
 Row(x='31', y='37', z='52', source='Accelerometer-2011-03-24-13-21-39-eat_meat-f1.txt', class='Eat_meat'),
 Row(x='32', y='36', z='52', source='Accelerometer-2011-03-24-13-21-39-eat_meat-f1.txt', class='Eat_meat')]

In [27]:
from pyspark.sql.types import DoubleType
df = df.withColumn("x", df["x"].cast("double"))
df = df.withColumn("y", df["y"].cast("double"))
df = df.withColumn("z", df["z"].cast("double"))

In [28]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

Use different hyperparameters and report the best performing hyperparameter combinations

Setting up the indexer, vector assembler and normalizer

In [31]:
indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval('["x", "y", "z"]'), outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

Check if index is correct

In [32]:
pipeline = Pipeline(stages=[indexer])

df_ind = pipeline.fit(df_train).transform(df_train)
df_ind.limit(5).collect()
df_ind.sort('label').select('class','label').distinct().collect()

                                                                                

[Row(class='Walk', label=0.0),
 Row(class='Getup_bed', label=1.0),
 Row(class='Drink_glass', label=2.0),
 Row(class='Pour_water', label=3.0),
 Row(class='Climb_stairs', label=4.0),
 Row(class='Eat_meat', label=5.0),
 Row(class='Brush_teeth', label=6.0),
 Row(class='Standup_chair', label=7.0),
 Row(class='Sitdown_chair', label=8.0),
 Row(class='Comb_hair', label=9.0),
 Row(class='Descend_stairs', label=10.0),
 Row(class='Use_telephone', label=11.0),
 Row(class='Liedown_bed', label=12.0),
 Row(class='Eat_soup', label=13.0)]

Setting up evaluator

In [33]:
binEval = MulticlassClassificationEvaluator(). \
    setMetricName("accuracy"). \
    setPredictionCol("prediction"). \
    setLabelCol("label")

In [34]:
iterations = []
'''
for n_trees in treeCounts:
    for depth in maxDepth:
        rf = RandomForestClassifier(labelCol="label", numTrees=n_trees, maxDepth=depth, predictionCol="prediction")
        
        pipeling = Pipeline(stages=[indexer, vectorAssembler, normalizer, rf])
        
        model = pipeline.fit(df_train)
        prediction = model.transform(df_test)
        accuracy = binEval.evaluate(prediction)
        iterations.append((n_trees, depth, accuracy))
        
'''

for curNTrees in [10, 20]:
    for curMaxDepth in [5, 7]:
        rf = RandomForestClassifier(numTrees=curNTrees, maxDepth=curMaxDepth, seed=1)
        
        pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, rf])

        model = pipeline.fit(df_train)

        prediction = model.transform(df_train)

        binEval = MulticlassClassificationEvaluator().setMetricName("accuracy").setPredictionCol("prediction").setLabelCol("label")

        curAccuracy = binEval.evaluate(prediction)
        
        iterations.append((curNTrees, curMaxDepth, curAccuracy))

                                                                                

In [37]:
df = pd.DataFrame(list(iterations))

In [38]:
df.rename(columns={0: 'Number of Trees', 1: 'Maximum Depth', 2:'Accuracy'}, inplace=True)
df.index.names= ['Combination']
df

Unnamed: 0_level_0,Number of Trees,Maximum Depth,Accuracy
Combination,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,10,5,0.439178
1,10,7,0.464036
2,20,5,0.44368
3,20,7,0.466432


Parameters producing the maximum accuracy

In [39]:
df.loc[df['Accuracy'].idxmax()]

Number of Trees    20.000000
Maximum Depth       7.000000
Accuracy            0.466432
Name: 3, dtype: float64