In [1]:
# Initial Configuration
import findspark 
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local') # we are hosting clusters locally.
spark = SparkSession(sc) # create an interface to the spark session.

In [2]:
# Create Schema
from pyspark.sql.types import StructType, StructField, IntegerType

schema = StructType([StructField("x",IntegerType(),True),
                     StructField("y",IntegerType(),True),
                     StructField("z",IntegerType(),True)])

In [3]:
import os

file_list = os.listdir('HMP_dataset/Climb_stairs')
#file_list

In [6]:
# Read all the files from the folder and enqueue them in a pyspark dataframe
df = None

from pyspark.sql.functions import lit

for file in file_list:
        
    
        temp_df = spark.read.option('header','false').option('delimiter',' ').csv('HMP_Dataset/' + 'Climb_stairs' + '/' + file, schema = schema)
        temp_df = temp_df.withColumn('Class',lit("Climb_stairs"))
        temp_df = temp_df.withColumn('Source',lit(file))
        
        if df is None:
            df = temp_df
        else:
            df = df.union(temp_df)
    

In [7]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- y: integer (nullable = true)
 |-- z: integer (nullable = true)
 |-- Class: string (nullable = false)
 |-- Source: string (nullable = false)



In [8]:
df.show()

+---+---+---+------------+--------------------+
|  x|  y|  z|       Class|              Source|
+---+---+---+------------+--------------------+
|  5| 39| 34|Climb_stairs|Accelerometer-201...|
|  2| 41| 34|Climb_stairs|Accelerometer-201...|
|  5| 39| 34|Climb_stairs|Accelerometer-201...|
| 12| 38| 34|Climb_stairs|Accelerometer-201...|
|  9| 38| 30|Climb_stairs|Accelerometer-201...|
| 10| 36| 29|Climb_stairs|Accelerometer-201...|
| 10| 36| 30|Climb_stairs|Accelerometer-201...|
| 12| 36| 30|Climb_stairs|Accelerometer-201...|
| 16| 36| 29|Climb_stairs|Accelerometer-201...|
| 16| 37| 30|Climb_stairs|Accelerometer-201...|
| 16| 36| 30|Climb_stairs|Accelerometer-201...|
| 17| 37| 30|Climb_stairs|Accelerometer-201...|
| 18| 37| 30|Climb_stairs|Accelerometer-201...|
| 16| 37| 31|Climb_stairs|Accelerometer-201...|
| 16| 36| 31|Climb_stairs|Accelerometer-201...|
| 15| 39| 31|Climb_stairs|Accelerometer-201...|
| 14| 39| 32|Climb_stairs|Accelerometer-201...|
| 14| 40| 32|Climb_stairs|Accelerometer-

In [9]:
# Create a SQL table of PySpark dataframe
df.createOrReplaceTempView('df')
df_energy = spark.sql("""
select sqrt(sum(x*x)+sum(y*y)+sum(z*z)) as label, class from df group by class
""")      
df_energy.createOrReplaceTempView('df_energy')          

In [10]:
df_join = spark.sql('select * from df inner join df_energy on df.class=df_energy.class')

In [11]:
df_join.show()

+---+---+---+------------+--------------------+------------------+------------+
|  x|  y|  z|       Class|              Source|             label|       Class|
+---+---+---+------------+--------------------+------------------+------------+
|  5| 39| 34|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs|
|  2| 41| 34|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs|
|  5| 39| 34|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs|
| 12| 38| 34|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs|
|  9| 38| 30|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs|
| 10| 36| 29|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs|
| 10| 36| 30|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs|
| 12| 36| 30|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs|
| 16| 36| 29|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs|
| 16| 37| 30|Climb_stairs|Accelerometer-

In [12]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer


vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

In [13]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [14]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, normalizer,lr])


In [15]:
model = pipeline.fit(df_join)

In [16]:
prediction = model.transform(df_join)

In [17]:
prediction.show()

+---+---+---+------------+--------------------+------------------+------------+----------------+--------------------+------------------+
|  x|  y|  z|       Class|              Source|             label|       Class|        features|       features_norm|        prediction|
+---+---+---+------------+--------------------+------------------+------------+----------------+--------------------+------------------+
|  5| 39| 34|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs| [5.0,39.0,34.0]|[0.06410256410256...|11082.626493751399|
|  2| 41| 34|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs| [2.0,41.0,34.0]|[0.02597402597402...|11082.626493751399|
|  5| 39| 34|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs| [5.0,39.0,34.0]|[0.06410256410256...|11082.626493751399|
| 12| 38| 34|Climb_stairs|Accelerometer-201...|11082.626493751379|Climb_stairs|[12.0,38.0,34.0]|[0.14285714285714...|11082.626493751399|
|  9| 38| 30|Climb_stairs|Accelerometer-2

In [18]:
model.stages[2].summary.r2

-inf