[source](https://pub.towardsai.net/big-data-pipelines-with-sparkml-8207c86fc995)

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName('HMP') \
    .getOrCreate()

In [3]:
spark

In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType
import os

schema = StructType([
                     StructField('x',IntegerType(),True),
                     StructField('y',IntegerType(),True),
                     StructField('z',IntegerType(),True)
])

In [7]:
file_list = os.listdir('data/HMP_Dataset')
file_list

['Eat_soup',
 'Descend_stairs',
 'Getup_bed',
 'Climb_stairs',
 'Sitdown_chair',
 'Standup_chair',
 'Eat_meat',
 'MANUAL.txt',
 'final.py',
 'impdata.py',
 'README.txt',
 'Use_telephone',
 'Drink_glass',
 'Brush_teeth',
 'Pour_water',
 'Comb_hair',
 '.git',
 'Liedown_bed',
 'Walk',
 '.idea']

In [8]:
# Let’s remove non-action folders from the file list.
# These are typically folders without an underscore in their names.

file_list_filtered = [i for i in file_list if '_' in i or i == 'Walk']
file_list_filtered

['Eat_soup',
 'Descend_stairs',
 'Getup_bed',
 'Climb_stairs',
 'Sitdown_chair',
 'Standup_chair',
 'Eat_meat',
 'Use_telephone',
 'Drink_glass',
 'Brush_teeth',
 'Pour_water',
 'Comb_hair',
 'Liedown_bed',
 'Walk']

In [9]:
# Okay, we have all the folders containing the data in one array. Now we can iterate over this array.
# First we define an empty data frame that we'd append data to
df = None
# next we import tqdm progress bars to see how our code runs 
from tqdm import tqdm

from pyspark.sql.functions import lit
# The lit library helps us write string literals column to an apache dataframe.

# Now let's iterate through the folders
for category in tqdm(file_list_filtered):
    # Now we traverse all through the files in each folder
    data_files = os.listdir('data/HMP_Dataset/' + category)
    for data_file in data_files:
        # Now we create a temporary dataframe
        # we use our defined schema above
        temp_df = spark.read.option('header','false').option('delimiter',' ').csv('data/HMP_Dataset/'+ category + '/' + data_file, schema=schema)  
        temp_df = temp_df.withColumn('class',lit(category))  # Adding a class column to the dataframe
        temp_df = temp_df.withColumn('source',lit(data_file))  # Adding a source column to the dataframe
        # now we put a condition if df is empty
        if df is None:
            df = temp_df
        else:
            df = df.union(temp_df)  # else union appends the data frames vertically

100%|██████████| 14/14 [00:41<00:00,  2.94s/it]


In [13]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- y: integer (nullable = true)
 |-- z: integer (nullable = true)
 |-- class: string (nullable = false)
 |-- source: string (nullable = false)



In [14]:
df.show()

+---+---+---+--------+--------------------+
|  x|  y|  z|   class|              source|
+---+---+---+--------+--------------------+
| 36| 37| 51|Eat_soup|Accelerometer-201...|
| 36| 37| 51|Eat_soup|Accelerometer-201...|
| 35| 38| 53|Eat_soup|Accelerometer-201...|
| 36| 39| 52|Eat_soup|Accelerometer-201...|
| 36| 38| 51|Eat_soup|Accelerometer-201...|
| 35| 37| 51|Eat_soup|Accelerometer-201...|
| 36| 38| 52|Eat_soup|Accelerometer-201...|
| 36| 37| 52|Eat_soup|Accelerometer-201...|
| 36| 38| 51|Eat_soup|Accelerometer-201...|
| 37| 38| 51|Eat_soup|Accelerometer-201...|
| 35| 38| 51|Eat_soup|Accelerometer-201...|
| 36| 38| 52|Eat_soup|Accelerometer-201...|
| 36| 38| 52|Eat_soup|Accelerometer-201...|
| 37| 39| 51|Eat_soup|Accelerometer-201...|
| 36| 39| 51|Eat_soup|Accelerometer-201...|
| 35| 38| 52|Eat_soup|Accelerometer-201...|
| 34| 38| 52|Eat_soup|Accelerometer-201...|
| 35| 39| 52|Eat_soup|Accelerometer-201...|
| 35| 38| 52|Eat_soup|Accelerometer-201...|
| 37| 38| 52|Eat_soup|Accelerome

# Data Transformation

In [15]:

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = 'class', outputCol = 'classIndex')
indexed = indexer.fit(df).transform(df)  # This is a new data frame

# Let's see it
indexed.show(10)

+---+---+---+--------+--------------------+----------+
|  x|  y|  z|   class|              source|classIndex|
+---+---+---+--------+--------------------+----------+
| 36| 37| 51|Eat_soup|Accelerometer-201...|      13.0|
| 36| 37| 51|Eat_soup|Accelerometer-201...|      13.0|
| 35| 38| 53|Eat_soup|Accelerometer-201...|      13.0|
| 36| 39| 52|Eat_soup|Accelerometer-201...|      13.0|
| 36| 38| 51|Eat_soup|Accelerometer-201...|      13.0|
| 35| 37| 51|Eat_soup|Accelerometer-201...|      13.0|
| 36| 38| 52|Eat_soup|Accelerometer-201...|      13.0|
| 36| 37| 52|Eat_soup|Accelerometer-201...|      13.0|
| 36| 38| 51|Eat_soup|Accelerometer-201...|      13.0|
| 37| 38| 51|Eat_soup|Accelerometer-201...|      13.0|
+---+---+---+--------+--------------------+----------+
only showing top 10 rows



# One-Hot Encoding

In [18]:
from pyspark.ml.feature import OneHotEncoder

# The OneHotEncoder is a pure transformer object. it does not use the fit()
encoder = OneHotEncoder(inputCol = 'classIndex', outputCol = 'categoryVec')
encoder.setDropLast(False)
ohe = encoder.fit(indexed)  # This is a new data frame
encoded = ohe.transform(indexed)  # This is a new data frame

encoded.show(10, False)

+---+---+---+--------+-------------------------------------------------+----------+---------------+
|x  |y  |z  |class   |source                                           |classIndex|categoryVec    |
+---+---+---+--------+-------------------------------------------------+----------+---------------+
|36 |37 |51 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup-f1.txt|13.0      |(14,[13],[1.0])|
|36 |37 |51 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup-f1.txt|13.0      |(14,[13],[1.0])|
|35 |38 |53 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup-f1.txt|13.0      |(14,[13],[1.0])|
|36 |39 |52 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup-f1.txt|13.0      |(14,[13],[1.0])|
|36 |38 |51 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup-f1.txt|13.0      |(14,[13],[1.0])|
|35 |37 |51 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup-f1.txt|13.0      |(14,[13],[1.0])|
|36 |38 |52 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup-f1.txt|13.0      |(14,[13],[1.0])|


# VectorAssembler

In [19]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# VectorAssembler creates vectors from ordinary data types for us

vectorAssembler = VectorAssembler(inputCols = ['x','y','z'], outputCol = 'features')
# Now we use the vectorAssembler object to transform our last updated dataframe

features_vectorized = vectorAssembler.transform(encoded)  # note this is a new df

# Let's see the first 10 rows
features_vectorized.show(10, False)

+---+---+---+--------+-------------------------------------------------+----------+---------------+----------------+
|x  |y  |z  |class   |source                                           |classIndex|categoryVec    |features        |
+---+---+---+--------+-------------------------------------------------+----------+---------------+----------------+
|36 |37 |51 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup-f1.txt|13.0      |(14,[13],[1.0])|[36.0,37.0,51.0]|
|36 |37 |51 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup-f1.txt|13.0      |(14,[13],[1.0])|[36.0,37.0,51.0]|
|35 |38 |53 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup-f1.txt|13.0      |(14,[13],[1.0])|[35.0,38.0,53.0]|
|36 |39 |52 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup-f1.txt|13.0      |(14,[13],[1.0])|[36.0,39.0,52.0]|
|36 |38 |51 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup-f1.txt|13.0      |(14,[13],[1.0])|[36.0,38.0,51.0]|
|35 |37 |51 |Eat_soup|Accelerometer-2011-03-24-13-33-22-eat_soup

# Normalizing The Dataset

In [20]:
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol = 'features', outputCol = 'features_norm', p=1.0)  # Manhattan Distance
normalized_data = normalizer.transform(features_vectorized) # New data frame too.

normalized_data.show(10)

+---+---+---+--------+--------------------+----------+---------------+----------------+--------------------+
|  x|  y|  z|   class|              source|classIndex|    categoryVec|        features|       features_norm|
+---+---+---+--------+--------------------+----------+---------------+----------------+--------------------+
| 36| 37| 51|Eat_soup|Accelerometer-201...|      13.0|(14,[13],[1.0])|[36.0,37.0,51.0]|[0.29032258064516...|
| 36| 37| 51|Eat_soup|Accelerometer-201...|      13.0|(14,[13],[1.0])|[36.0,37.0,51.0]|[0.29032258064516...|
| 35| 38| 53|Eat_soup|Accelerometer-201...|      13.0|(14,[13],[1.0])|[35.0,38.0,53.0]|[0.27777777777777...|
| 36| 39| 52|Eat_soup|Accelerometer-201...|      13.0|(14,[13],[1.0])|[36.0,39.0,52.0]|[0.28346456692913...|
| 36| 38| 51|Eat_soup|Accelerometer-201...|      13.0|(14,[13],[1.0])|[36.0,38.0,51.0]| [0.288,0.304,0.408]|
| 35| 37| 51|Eat_soup|Accelerometer-201...|      13.0|(14,[13],[1.0])|[35.0,37.0,51.0]|[0.28455284552845...|
| 36| 38| 52|Eat_so

# Creating The Pipeline

In [25]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = [indexer,encoder,vectorAssembler,normalizer])

In [26]:
data_model = pipeline.fit(df)
pipelined_data = data_model.transform(df)
pipelined_data.show(10)

+---+---+---+--------+--------------------+----------+---------------+----------------+--------------------+
|  x|  y|  z|   class|              source|classIndex|    categoryVec|        features|       features_norm|
+---+---+---+--------+--------------------+----------+---------------+----------------+--------------------+
| 36| 37| 51|Eat_soup|Accelerometer-201...|      13.0|(14,[13],[1.0])|[36.0,37.0,51.0]|[0.29032258064516...|
| 36| 37| 51|Eat_soup|Accelerometer-201...|      13.0|(14,[13],[1.0])|[36.0,37.0,51.0]|[0.29032258064516...|
| 35| 38| 53|Eat_soup|Accelerometer-201...|      13.0|(14,[13],[1.0])|[35.0,38.0,53.0]|[0.27777777777777...|
| 36| 39| 52|Eat_soup|Accelerometer-201...|      13.0|(14,[13],[1.0])|[36.0,39.0,52.0]|[0.28346456692913...|
| 36| 38| 51|Eat_soup|Accelerometer-201...|      13.0|(14,[13],[1.0])|[36.0,38.0,51.0]| [0.288,0.304,0.408]|
| 35| 37| 51|Eat_soup|Accelerometer-201...|      13.0|(14,[13],[1.0])|[35.0,37.0,51.0]|[0.28455284552845...|
| 36| 38| 52|Eat_so

In [27]:
# first let's list out the columns we want to drop
cols_to_drop = ['x','y','z','class','source','classIndex','features']

# Next let's use a list comprehension with conditionals to select cols we need
selected_cols = [col for col in pipelined_data.columns if col not in cols_to_drop]

# Let's define a new train_df with only the categoryVec and features_norm cols
df_train = pipelined_data.select(selected_cols)

# Let's see our training dataframe.
df_train.show(10)

+---------------+--------------------+
|    categoryVec|       features_norm|
+---------------+--------------------+
|(14,[13],[1.0])|[0.29032258064516...|
|(14,[13],[1.0])|[0.29032258064516...|
|(14,[13],[1.0])|[0.27777777777777...|
|(14,[13],[1.0])|[0.28346456692913...|
|(14,[13],[1.0])| [0.288,0.304,0.408]|
|(14,[13],[1.0])|[0.28455284552845...|
|(14,[13],[1.0])|[0.28571428571428...|
|(14,[13],[1.0])| [0.288,0.296,0.416]|
|(14,[13],[1.0])| [0.288,0.304,0.408]|
|(14,[13],[1.0])|[0.29365079365079...|
+---------------+--------------------+
only showing top 10 rows

