# Data engeneering

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import os

In [2]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[8]"))
spark = SparkSession.builder.master("local[8]").getOrCreate()

In [9]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
    StructField('index', IntegerType()),
    StructField('X', IntegerType()),
    StructField('Y', IntegerType()),
    StructField('Z', IntegerType()),
    StructField('class', StringType()),
])

In [11]:
df = spark.read.csv('Datasets/166050_378273_bundle_archive/ActivityAccelerometer/data.csv', schema = schema)

In [12]:
from pyspark.ml.feature import StringIndexer

In [13]:
indexer = StringIndexer(inputCol = 'class', outputCol = 'indexedClass')
indexed = indexer.fit(df).transform(df)

In [14]:
indexed.show()

+-----+----+----+----+-------+------------+
|index|   X|   Y|   Z|  class|indexedClass|
+-----+----+----+----+-------+------------+
| null|1866|2390|2282|walking|         0.0|
|    1|1861|2367|2278|walking|         0.0|
|    2|1837|2387|2317|walking|         0.0|
|    3|1819|2409|2331|walking|         0.0|
|    4|1784|2407|2280|walking|         0.0|
|    5|1811|2414|2267|walking|         0.0|
|    6|1821|2411|2282|walking|         0.0|
|    7|1820|2395|2265|walking|         0.0|
|    8|1854|2369|2263|walking|         0.0|
|    9|1885|2375|2284|walking|         0.0|
|   10|1885|2375|2295|walking|         0.0|
|   11|1879|2377|2272|walking|         0.0|
|   12|1879|2392|2243|walking|         0.0|
|   13|1874|2398|2280|walking|         0.0|
|   14|1868|2417|2292|walking|         0.0|
|   15|1903|2414|2282|walking|         0.0|
|   16|1907|2406|2286|walking|         0.0|
|   17|1929|2401|2269|walking|         0.0|
|   18|1941|2395|2302|walking|         0.0|
|   19|1963|2391|2294|walking|  

In [15]:
from pyspark.ml.feature import OneHotEncoder

In [16]:
encoder = OneHotEncoder(inputCol = 'indexedClass', outputCol = 'vectors')

encoded = encoder.fit(indexed).transform(indexed)

In [17]:
encoded.show()

+-----+----+----+----+-------+------------+-------------+
|index|   X|   Y|   Z|  class|indexedClass|      vectors|
+-----+----+----+----+-------+------------+-------------+
| null|1866|2390|2282|walking|         0.0|(7,[0],[1.0])|
|    1|1861|2367|2278|walking|         0.0|(7,[0],[1.0])|
|    2|1837|2387|2317|walking|         0.0|(7,[0],[1.0])|
|    3|1819|2409|2331|walking|         0.0|(7,[0],[1.0])|
|    4|1784|2407|2280|walking|         0.0|(7,[0],[1.0])|
|    5|1811|2414|2267|walking|         0.0|(7,[0],[1.0])|
|    6|1821|2411|2282|walking|         0.0|(7,[0],[1.0])|
|    7|1820|2395|2265|walking|         0.0|(7,[0],[1.0])|
|    8|1854|2369|2263|walking|         0.0|(7,[0],[1.0])|
|    9|1885|2375|2284|walking|         0.0|(7,[0],[1.0])|
|   10|1885|2375|2295|walking|         0.0|(7,[0],[1.0])|
|   11|1879|2377|2272|walking|         0.0|(7,[0],[1.0])|
|   12|1879|2392|2243|walking|         0.0|(7,[0],[1.0])|
|   13|1874|2398|2280|walking|         0.0|(7,[0],[1.0])|
|   14|1868|24

In [18]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [19]:
vecAss= VectorAssembler(inputCols = ['X', 'Y', 'Z'], outputCol = 'features')
vectorized = vecAss.transform(encoded)

In [20]:
vectorized.show()

+-----+----+----+----+-------+------------+-------------+--------------------+
|index|   X|   Y|   Z|  class|indexedClass|      vectors|            features|
+-----+----+----+----+-------+------------+-------------+--------------------+
| null|1866|2390|2282|walking|         0.0|(7,[0],[1.0])|[1866.0,2390.0,22...|
|    1|1861|2367|2278|walking|         0.0|(7,[0],[1.0])|[1861.0,2367.0,22...|
|    2|1837|2387|2317|walking|         0.0|(7,[0],[1.0])|[1837.0,2387.0,23...|
|    3|1819|2409|2331|walking|         0.0|(7,[0],[1.0])|[1819.0,2409.0,23...|
|    4|1784|2407|2280|walking|         0.0|(7,[0],[1.0])|[1784.0,2407.0,22...|
|    5|1811|2414|2267|walking|         0.0|(7,[0],[1.0])|[1811.0,2414.0,22...|
|    6|1821|2411|2282|walking|         0.0|(7,[0],[1.0])|[1821.0,2411.0,22...|
|    7|1820|2395|2265|walking|         0.0|(7,[0],[1.0])|[1820.0,2395.0,22...|
|    8|1854|2369|2263|walking|         0.0|(7,[0],[1.0])|[1854.0,2369.0,22...|
|    9|1885|2375|2284|walking|         0.0|(7,[0],[1

In [21]:
from pyspark.ml.feature import Normalizer

In [22]:
normaliser = Normalizer(inputCol = 'features', outputCol = 'normFeatures', p = 1.0)

normalised = normaliser.transform(vectorized)

In [23]:
normalised.show()

+-----+----+----+----+-------+------------+-------------+--------------------+--------------------+
|index|   X|   Y|   Z|  class|indexedClass|      vectors|            features|        normFeatures|
+-----+----+----+----+-------+------------+-------------+--------------------+--------------------+
| null|1866|2390|2282|walking|         0.0|(7,[0],[1.0])|[1866.0,2390.0,22...|[0.28540838176812...|
|    1|1861|2367|2278|walking|         0.0|(7,[0],[1.0])|[1861.0,2367.0,22...|[0.28604365201352...|
|    2|1837|2387|2317|walking|         0.0|(7,[0],[1.0])|[1837.0,2387.0,23...|[0.28084390765937...|
|    3|1819|2409|2331|walking|         0.0|(7,[0],[1.0])|[1819.0,2409.0,23...|[0.27732886110687...|
|    4|1784|2407|2280|walking|         0.0|(7,[0],[1.0])|[1784.0,2407.0,22...|[0.27569154690156...|
|    5|1811|2414|2267|walking|         0.0|(7,[0],[1.0])|[1811.0,2414.0,22...|[0.27895871842267...|
|    6|1821|2411|2282|walking|         0.0|(7,[0],[1.0])|[1821.0,2411.0,22...|[0.27955173472520...|


In [24]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = [indexer, encoder, vecAss, normaliser])

In [25]:
model = pipeline.fit(df)

prediction = model.transform(df)

In [26]:
prediction.show()

+-----+----+----+----+-------+------------+-------------+--------------------+--------------------+
|index|   X|   Y|   Z|  class|indexedClass|      vectors|            features|        normFeatures|
+-----+----+----+----+-------+------------+-------------+--------------------+--------------------+
| null|1866|2390|2282|walking|         0.0|(7,[0],[1.0])|[1866.0,2390.0,22...|[0.28540838176812...|
|    1|1861|2367|2278|walking|         0.0|(7,[0],[1.0])|[1861.0,2367.0,22...|[0.28604365201352...|
|    2|1837|2387|2317|walking|         0.0|(7,[0],[1.0])|[1837.0,2387.0,23...|[0.28084390765937...|
|    3|1819|2409|2331|walking|         0.0|(7,[0],[1.0])|[1819.0,2409.0,23...|[0.27732886110687...|
|    4|1784|2407|2280|walking|         0.0|(7,[0],[1.0])|[1784.0,2407.0,22...|[0.27569154690156...|
|    5|1811|2414|2267|walking|         0.0|(7,[0],[1.0])|[1811.0,2414.0,22...|[0.27895871842267...|
|    6|1821|2411|2282|walking|         0.0|(7,[0],[1.0])|[1821.0,2411.0,22...|[0.27955173472520...|


In [39]:
df_train = prediction.drop('X').drop('Y').drop('Z').drop('class').drop('index').drop('features').drop('indexedClass')

In [40]:
df_train.show()

+-------------+--------------------+
|      vectors|        normFeatures|
+-------------+--------------------+
|(7,[0],[1.0])|[0.28540838176812...|
|(7,[0],[1.0])|[0.28604365201352...|
|(7,[0],[1.0])|[0.28084390765937...|
|(7,[0],[1.0])|[0.27732886110687...|
|(7,[0],[1.0])|[0.27569154690156...|
|(7,[0],[1.0])|[0.27895871842267...|
|(7,[0],[1.0])|[0.27955173472520...|
|(7,[0],[1.0])|[0.28086419753086...|
|(7,[0],[1.0])|[0.28584643848288...|
|(7,[0],[1.0])|[0.28805012224938...|
|(7,[0],[1.0])|[0.28756674294431...|
|(7,[0],[1.0])|[0.28783700980392...|
|(7,[0],[1.0])|[0.28845563401903...|
|(7,[0],[1.0])|[0.28601953601953...|
|(7,[0],[1.0])|[0.28402006994070...|
|(7,[0],[1.0])|[0.28837702682224...|
|(7,[0],[1.0])|[0.28898317926958...|
|(7,[0],[1.0])|[0.29231701772995...|
|(7,[0],[1.0])|[0.29240735161193...|
|(7,[0],[1.0])|[0.29527677496991...|
+-------------+--------------------+
only showing top 20 rows



In [66]:
datium = df

In [67]:
datium = datium.na.fill(0)

In [69]:
datium.show()

+-----+----+----+----+-------+
|index|   X|   Y|   Z|  class|
+-----+----+----+----+-------+
|    0|1866|2390|2282|walking|
|    1|1861|2367|2278|walking|
|    2|1837|2387|2317|walking|
|    3|1819|2409|2331|walking|
|    4|1784|2407|2280|walking|
|    5|1811|2414|2267|walking|
|    6|1821|2411|2282|walking|
|    7|1820|2395|2265|walking|
|    8|1854|2369|2263|walking|
|    9|1885|2375|2284|walking|
|   10|1885|2375|2295|walking|
|   11|1879|2377|2272|walking|
|   12|1879|2392|2243|walking|
|   13|1874|2398|2280|walking|
|   14|1868|2417|2292|walking|
|   15|1903|2414|2282|walking|
|   16|1907|2406|2286|walking|
|   17|1929|2401|2269|walking|
|   18|1941|2395|2302|walking|
|   19|1963|2391|2294|walking|
+-----+----+----+----+-------+
only showing top 20 rows



In [70]:
datium = datium.drop('index')

In [75]:
da = datium.groupBy('class')

In [86]:
da_mean = da.avg()

In [88]:
vectAss = VectorAssembler(inputCols = ['avg(X)', 'avg(Y)', 'avg(Z)'], outputCol = 'features')
verized = vectAss.transform(da_mean)

In [96]:
normali = normaliser.transform(verized)

In [97]:
normali.show()

+--------+------------------+------------------+------------------+--------------------+--------------------+
|   class|            avg(X)|            avg(Y)|            avg(Z)|            features|        normFeatures|
+--------+------------------+------------------+------------------+--------------------+--------------------+
| jumping|         2069.1112|2533.0573714285715| 2012.870742857143|[2069.1112,2533.0...|[0.31278894979982...|
| resting|            1999.0|            2532.0|            1996.0|[1999.0,2532.0,19...|[0.30626627853531...|
|climbing|2069.0233009708736|2540.2747001713306|2030.6288977727013|[2069.02330097087...|[0.31160332522772...|
| walking| 2100.151996124031|2528.1886046511627|2011.7688953488373|[2100.15199612403...|[0.31628273560096...|
|    bath|          2056.015|          2541.729|          2067.626|[2056.015,2541.72...|[0.30846224590682...|
| running| 2120.096153846154| 2491.517948717949|1915.2346153846154|[2120.09615384615...|[0.32482691808313...|
|  stairs|