**Load PySpark: structured data using 'df' for HPC**

In [2]:
pip install pyspark




In [5]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("SparkML") \
    .getOrCreate()

# Test Spark session
df = spark.createDataFrame([(1, "foo"), (2, "bar")], ["id", "value"]) #[rows],[header]
df.show()

+---+-----+
| id|value|
+---+-----+
|  1|  foo|
|  2|  bar|
+---+-----+



**Clone HMP Dataset from GitHub:**

In [6]:
! git clone https://github.com/wchill/HMP_Dataset.git #use https

Cloning into 'HMP_Dataset'...
remote: Enumerating objects: 865, done.[K
remote: Total 865 (delta 0), reused 0 (delta 0), pack-reused 865[K
Receiving objects: 100% (865/865), 1010.96 KiB | 12.04 MiB/s, done.


In [7]:
ls HMP_Dataset/ #check files in HMP folder

[0m[01;34mBrush_teeth[0m/   [01;34mDescend_stairs[0m/  [01;34mEat_soup[0m/   impdata.py    [01;34mPour_water[0m/     [01;34mStandup_chair[0m/
[01;34mClimb_stairs[0m/  [01;34mDrink_glass[0m/     final.py    [01;34mLiedown_bed[0m/  README.txt      [01;34mUse_telephone[0m/
[01;34mComb_hair[0m/     [01;34mEat_meat[0m/        [01;34mGetup_bed[0m/  MANUAL.txt    [01;34mSitdown_chair[0m/  [01;34mWalk[0m/


In [8]:
ls HMP_Dataset/Brush_teeth #check files in specific folders

Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt
Accelerometer-2011-04-11-13-29-54-brush_teeth-f1.txt
Accelerometer-2011-05-30-08-35-11-brush_teeth-f1.txt
Accelerometer-2011-05-30-09-36-50-brush_teeth-f1.txt
Accelerometer-2011-05-30-10-34-16-brush_teeth-m1.txt
Accelerometer-2011-05-30-21-10-57-brush_teeth-f1.txt
Accelerometer-2011-05-30-21-55-04-brush_teeth-m2.txt
Accelerometer-2011-05-31-15-16-47-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-42-22-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-45-50-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-45-27-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-48-05-brush_teeth-f1.txt


In [11]:
# check accelerometer data: 'x' 'y' 'z'
cat HMP_Dataset/Brush_teeth/Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt | head -n 5

22 49 35
22 49 35
22 52 35
22 52 35
21 52 34


**Recursively traverse through each folders and create Apache 'df' from .txt & create 'union of all the files:**

In [13]:
#define schema for 'df' -> StructType: defines schema for 'df' (names & dtype of col); StructField: define cols (name, type)
#IntegerType: dtype of col
from pyspark.sql.types import StructType, StructField, IntegerType
schema=StructType([
    StructField("x",IntegerType(), True),
    StructField("y",IntegerType(), True),
    StructField("z",IntegerType(), True),
]) #True: each column can have 'null-values'

**Filter files: not containing underscores**

In [14]:
import os
file_list=os.listdir('HMP_Dataset')
file_list

['Sitdown_chair',
 'Liedown_bed',
 'Pour_water',
 'README.txt',
 'Use_telephone',
 'Eat_soup',
 'Walk',
 'impdata.py',
 'Climb_stairs',
 'final.py',
 'Getup_bed',
 'Brush_teeth',
 'Descend_stairs',
 'Standup_chair',
 'MANUAL.txt',
 'Eat_meat',
 '.idea',
 'Comb_hair',
 'Drink_glass',
 '.git']

In [15]:
#filtered folder lists
file_list_filtered=[s for s in file_list if '_' in s]
file_list_filtered

['Sitdown_chair',
 'Liedown_bed',
 'Pour_water',
 'Use_telephone',
 'Eat_soup',
 'Climb_stairs',
 'Getup_bed',
 'Brush_teeth',
 'Descend_stairs',
 'Standup_chair',
 'Eat_meat',
 'Comb_hair',
 'Drink_glass']

In [None]:
#check file 'type' in each folders
for folder in file_list_filtered:
  file=os.path.join('HMP_Dataset', folder)
  try:
    files=os.listdir(file)
    print(files)
  except FileNotFoundError:
    print('File not found')

**Create Data Frame:**

In [24]:
#lit: create new col in df where all the row values are same for that particular df
from pyspark.sql.functions import lit
#iterate over the 'file_list_filtered'
df=None #empty df to append
for category in file_list_filtered: #category: folders
  data_files=os.listdir('HMP_Dataset/'+category) #.txt files in each folders
  for data_file in data_files: # data files in each folder (each .txt files)
    #create temporary df: format -> header, delimiterm,
    temp_df=spark.read.option("header", "false").option("delimiter", " ").csv("HMP_Dataset/"+category+'/'+data_file,schema=schema)

    temp_df=temp_df.withColumn("class", lit(category)) #append category (folder_name) -> as class
    temp_df=temp_df.withColumn("source", lit(data_file)) #append data_file (file_name) -> as source
    #append df vertically
    if df is None:
      df=temp_df
    else:
      df=df.union(temp_df) #append vertically

In [25]:
#objective: predict 'class' from 'x', 'y' & 'z' (accelerometer data)
df.show() # x, y, z: accelerometer data; class: category; Accelerometer: source

+---+---+---+-------------+--------------------+
|  x|  y|  z|        class|              source|
+---+---+---+-------------+--------------------+
| 20| 41| 47|Sitdown_chair|Accelerometer-201...|
| 20| 41| 48|Sitdown_chair|Accelerometer-201...|
| 20| 57| 33|Sitdown_chair|Accelerometer-201...|
| 21| 41| 47|Sitdown_chair|Accelerometer-201...|
| 20| 40| 47|Sitdown_chair|Accelerometer-201...|
| 22| 40| 47|Sitdown_chair|Accelerometer-201...|
| 22| 40| 47|Sitdown_chair|Accelerometer-201...|
| 20| 41| 47|Sitdown_chair|Accelerometer-201...|
| 21| 41| 47|Sitdown_chair|Accelerometer-201...|
| 21| 41| 46|Sitdown_chair|Accelerometer-201...|
| 21| 42| 47|Sitdown_chair|Accelerometer-201...|
| 21| 42| 48|Sitdown_chair|Accelerometer-201...|
| 20| 42| 47|Sitdown_chair|Accelerometer-201...|
| 21| 41| 47|Sitdown_chair|Accelerometer-201...|
| 20| 41| 47|Sitdown_chair|Accelerometer-201...|
| 21| 41| 47|Sitdown_chair|Accelerometer-201...|
| 22| 41| 47|Sitdown_chair|Accelerometer-201...|
| 22| 41| 48|Sitdown

**Transform Data:**

In [26]:
#1. StringIndexer:
#convert categorical class to numerical unique value for ML algorithm:
from pyspark.ml.feature import StringIndexer
indexer=StringIndexer(inputCol='class', outputCol='classIndex')
indexed=indexer.fit(df).transform(df)

indexed.show() #classIndex -> numerical equivalent of categorical column : 'class'

+---+---+---+-------------+--------------------+----------+
|  x|  y|  z|        class|              source|classIndex|
+---+---+---+-------------+--------------------+----------+
| 20| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|
| 20| 41| 48|Sitdown_chair|Accelerometer-201...|       7.0|
| 20| 57| 33|Sitdown_chair|Accelerometer-201...|       7.0|
| 21| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|
| 20| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|
| 22| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|
| 22| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|
| 20| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|
| 21| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|
| 21| 41| 46|Sitdown_chair|Accelerometer-201...|       7.0|
| 21| 42| 47|Sitdown_chair|Accelerometer-201...|       7.0|
| 21| 42| 48|Sitdown_chair|Accelerometer-201...|       7.0|
| 20| 42| 47|Sitdown_chair|Accelerometer-201...|       7.0|
| 21| 41| 47|Sitdown_chair|Accelerometer

In [41]:
# 2. One-Hot-Encoding: transform 'classIndex' to OHE encoded vector
from pyspark.ml.feature import OneHotEncoder
encoder=OneHotEncoder(inputCol='classIndex',outputCol='classCategoryVec')
encoded=encoder.fit(indexed).transform(indexed)
encoded.show() #classCategoryVec: Apache Spark representation of 'sparseVector':e.g., (12,[7],[1.0]) ->12: elements, [7]: position, [1.0]: value

+---+---+---+-------------+--------------------+----------+----------------+
|  x|  y|  z|        class|              source|classIndex|classCategoryVec|
+---+---+---+-------------+--------------------+----------+----------------+
| 20| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|
| 20| 41| 48|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|
| 20| 57| 33|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|
| 21| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|
| 20| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|
| 22| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|
| 22| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|
| 20| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|
| 21| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|
| 21| 41| 46|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|

In [43]:
#3. VectorAssembler: assemble multiple feature columns into a feature vector -> a requirement for ML algorithms in Spark.
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
#features: vector fed to ML algorithm
vectorAssembler=VectorAssembler(inputCols=['x','y','z'], outputCol='features')

features_vectorized=vectorAssembler.transform(encoded)
features_vectorized.show() #features column is fed into ML model

+---+---+---+-------------+--------------------+----------+----------------+----------------+
|  x|  y|  z|        class|              source|classIndex|classCategoryVec|        features|
+---+---+---+-------------+--------------------+----------+----------------+----------------+
| 20| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[20.0,41.0,47.0]|
| 20| 41| 48|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[20.0,41.0,48.0]|
| 20| 57| 33|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[20.0,57.0,33.0]|
| 21| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[21.0,41.0,47.0]|
| 20| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[20.0,40.0,47.0]|
| 22| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[22.0,40.0,47.0]|
| 22| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[22.0,40.0,47.0]|
| 20| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0| 

In [46]:
#4. Normalizing data: not particular important as 'x','y' & 'z' have same value range
from pyspark.ml.feature import Normalizer
normalizer=Normalizer(inputCol='features', outputCol='features_norm', p=1.0) #p=1 ->L1 normalizer (Manhattan Norm)
normalized_data=normalizer.transform(features_vectorized)

normalized_data.show()

+---+---+---+-------------+--------------------+----------+----------------+----------------+--------------------+
|  x|  y|  z|        class|              source|classIndex|classCategoryVec|        features|       features_norm|
+---+---+---+-------------+--------------------+----------+----------------+----------------+--------------------+
| 20| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[20.0,41.0,47.0]|[0.18518518518518...|
| 20| 41| 48|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[20.0,41.0,48.0]|[0.18348623853211...|
| 20| 57| 33|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[20.0,57.0,33.0]|[0.18181818181818...|
| 21| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[21.0,41.0,47.0]|[0.19266055045871...|
| 20| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[20.0,40.0,47.0]|[0.18691588785046...|
| 22| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[22.

**ML Pipeline:**

In [50]:
from pyspark.ml import Pipeline
pipeline=Pipeline(stages=[indexer,encoder,vectorAssembler,normalizer])

ml_model=pipeline.fit(df)

ml_model=ml_model.transform(df)

ml_model.show()


+---+---+---+-------------+--------------------+----------+----------------+----------------+--------------------+
|  x|  y|  z|        class|              source|classIndex|classCategoryVec|        features|       features_norm|
+---+---+---+-------------+--------------------+----------+----------------+----------------+--------------------+
| 20| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[20.0,41.0,47.0]|[0.18518518518518...|
| 20| 41| 48|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[20.0,41.0,48.0]|[0.18348623853211...|
| 20| 57| 33|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[20.0,57.0,33.0]|[0.18181818181818...|
| 21| 41| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[21.0,41.0,47.0]|[0.19266055045871...|
| 20| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[20.0,40.0,47.0]|[0.18691588785046...|
| 22| 40| 47|Sitdown_chair|Accelerometer-201...|       7.0|  (12,[7],[1.0])|[22.

In [54]:
#filter columns:
df_train=ml_model.drop('x').drop('y').drop('z').drop('class').drop('source').drop('features').drop('classIndex')

In [55]:
df_train.show()

+----------------+--------------------+
|classCategoryVec|       features_norm|
+----------------+--------------------+
|  (12,[7],[1.0])|[0.18518518518518...|
|  (12,[7],[1.0])|[0.18348623853211...|
|  (12,[7],[1.0])|[0.18181818181818...|
|  (12,[7],[1.0])|[0.19266055045871...|
|  (12,[7],[1.0])|[0.18691588785046...|
|  (12,[7],[1.0])|[0.20183486238532...|
|  (12,[7],[1.0])|[0.20183486238532...|
|  (12,[7],[1.0])|[0.18518518518518...|
|  (12,[7],[1.0])|[0.19266055045871...|
|  (12,[7],[1.0])|[0.19444444444444...|
|  (12,[7],[1.0])|[0.19090909090909...|
|  (12,[7],[1.0])|[0.18918918918918...|
|  (12,[7],[1.0])|[0.18348623853211...|
|  (12,[7],[1.0])|[0.19266055045871...|
|  (12,[7],[1.0])|[0.18518518518518...|
|  (12,[7],[1.0])|[0.19266055045871...|
|  (12,[7],[1.0])|[0.2,0.3727272727...|
|  (12,[7],[1.0])|[0.19819819819819...|
|  (12,[7],[1.0])|[0.19819819819819...|
|  (12,[7],[1.0])|[0.19444444444444...|
+----------------+--------------------+
only showing top 20 rows



In [57]:
spark.stop()