In [None]:
import os
import pandas as pd
import pyspark
conf = pyspark.SparkConf().setMaster("local[*]").setAppName("HMP")
sc = pyspark.SparkContext(conf = conf)
spark=pyspark.sql.SparkSession(sc)

If the data folder does not exist, clone it:

In [None]:
%%bash
if [ ! -d "DataSet" ] ; then 
    git clone "https://github.com/wchill/HMP_Dataset.git" "DataSet" 
fi

In [None]:
def Data_File_Path_Iterator():
    Data_Folders_List=[x for x in os.listdir("DataSet") if os.path.isdir("DataSet/"+x) and x[0]!='.']
    for Class in Data_Folders_List:
        for file_name in os.listdir("DataSet/"+Class):
            file_path="DataSet/"+Class+"/"+file_name
            yield file_path,Class

In [None]:
from pyspark.sql.types import StructField,StructType,IntegerType
Schema=StructType([StructField(name="X",dataType=IntegerType(),nullable=True),
                   StructField(name="Y",dataType=IntegerType(),nullable=True),
                   StructField(name="Z",dataType=IntegerType(),nullable=True)])


Data_Frame.cache() will force the spark engine to keep the data frame in memory

In [None]:
from pyspark.sql.functions  import lit
Files=Data_File_Path_Iterator()
Data_Frame=None
try:
    while True:
        file_path,Class=next(Files)
        Temp_DF=spark.read.option("delimiter", " ").csv(file_path,schema=Schema).withColumn('Class',lit(Class))
        if Data_Frame:
            Data_Frame=Data_Frame.union(Temp_DF)
        else:
            Data_Frame=Temp_DF
except StopIteration:
    pass
Data_Frame.registerTempTable("HMP")
Data_Frame.cache()

In [None]:
from pyspark import ml as ML
Class_Label=ML.feature.StringIndexer(inputCol="Class",outputCol="Label")
Class_One_Hot_Label=ML.feature.OneHotEncoder(inputCol="Label",outputCol="One_Hot_Label")
XYZ_t0_Vector=ML.feature.VectorAssembler(inputCols=["X","Y","Z"],outputCol="Input_Vector_Unprocessed")
Normalized_Input=ML.feature.Normalizer(inputCol="Input_Vector_Unprocessed",outputCol="Input_Vector_Normalized",p=1.0)
PipeLine=ML.Pipeline(stages=[Class_Label,Class_One_Hot_Label,XYZ_t0_Vector,Normalized_Input]).fit(Data_Frame)
Data_Frame=PipeLine.transform(Data_Frame)

In [None]:
Data_Frame.write.parquet("DataSet.parquet")