In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *


In [6]:
ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext

files = 'WISDM/*/*/'
files_pair = sc.wholeTextFiles(files)

# 1. Files number

In [3]:
files_pair.count()

204

# 2. Records number

In [4]:
def filter_none(x):
    if len(x[1]) == 6:
        return True
    else:
        return False

In [5]:
filtered_file_record = files_pair.flatMapValues(lambda x: x.split('\n')).mapValues(lambda x: x.split(",")).filter(filter_none).partitionBy(100).cache()


In [6]:
filtered_file_record.count()

15630426

# 3. Create dataframe from rdd

In [7]:
def toIntSafe(value):
  try:
    return int(value)
  except ValueError:
    return None

def toFloatSafe(value):
  try:
    return float(value)
  except ValueError:
    return None


In [8]:
def assign_columns(x):
    file_name = x[0].split('/')[-1].split('.')[0].split('_')
    record = x[1]
    
    user_id = toIntSafe(record[0])
    sensor_type = file_name[2]
    device_type = file_name[3]
    acticity_code = record[1]
    timestamp = toIntSafe(record[2])
    x_axis = toFloatSafe(record[3])
    y_axis = toFloatSafe(record[4])
    z_axis = toFloatSafe(record[5][:-1])
    
    return user_id, sensor_type, device_type, acticity_code, timestamp, x_axis, y_axis, z_axis
            

In [9]:
itemized_file_record = filtered_file_record.map(assign_columns)


In [10]:
schema = StructType([ StructField("subject_id", IntegerType(), False),
                      StructField("sensor_type", StringType(), False),
                      StructField("device_type", StringType(), False),
                      StructField("acticity_code", StringType(), False),
                      StructField("timestamp", LongType(), False),
                      StructField("x", FloatType(), False),
                      StructField("y", FloatType(), False),
                      StructField("z", FloatType(), False)
                    ])

In [11]:
df_record = ss.createDataFrame(itemized_file_record, schema)

In [12]:
df_record.show(5)

+----------+-----------+-----------+-------------+----------------+----------+-----------+-----------+
|subject_id|sensor_type|device_type|acticity_code|       timestamp|         x|          y|          z|
+----------+-----------+-----------+-------------+----------------+----------+-----------+-----------+
|      1644|      accel|      phone|            A|1819410008602000| 7.4509063| -11.718831|-0.06945557|
|      1644|      accel|      phone|            A|1819410028698000|  5.007986| -11.266172| -0.6682105|
|      1644|      accel|      phone|            A|1819410048826000|-0.9364527| -11.785892| -0.7927515|
|      1644|      accel|      phone|            A|1819410069560000| -6.231841|-12.1379595| -2.1004322|
|      1644|      accel|      phone|            A|1819410089720000| -8.988508| -11.160791|  -3.772156|
+----------+-----------+-----------+-------------+----------------+----------+-----------+-----------+
only showing top 5 rows



In [13]:
df_record.printSchema()

root
 |-- subject_id: integer (nullable = false)
 |-- sensor_type: string (nullable = false)
 |-- device_type: string (nullable = false)
 |-- acticity_code: string (nullable = false)
 |-- timestamp: long (nullable = false)
 |-- x: float (nullable = false)
 |-- y: float (nullable = false)
 |-- z: float (nullable = false)



In [14]:
df_record_distinct = df_record.select('subject_id').distinct()
count = df_record_distinct.count()
df_record_distinct.orderBy("subject_id", ascending = True).show(count)

+----------+
|subject_id|
+----------+
|      1600|
|      1601|
|      1602|
|      1603|
|      1604|
|      1605|
|      1606|
|      1607|
|      1608|
|      1609|
|      1610|
|      1611|
|      1612|
|      1613|
|      1614|
|      1615|
|      1616|
|      1617|
|      1618|
|      1619|
|      1620|
|      1621|
|      1622|
|      1623|
|      1624|
|      1625|
|      1626|
|      1627|
|      1628|
|      1629|
|      1630|
|      1631|
|      1632|
|      1633|
|      1634|
|      1635|
|      1636|
|      1637|
|      1638|
|      1639|
|      1640|
|      1641|
|      1642|
|      1643|
|      1644|
|      1645|
|      1646|
|      1647|
|      1648|
|      1649|
|      1650|
+----------+



In [15]:
df_record.select('sensor_type').distinct().orderBy("sensor_type", ascending = True).show()


+-----------+
|sensor_type|
+-----------+
|      accel|
|       gyro|
+-----------+



In [16]:
df_record.select('acticity_code').distinct().orderBy("acticity_code", ascending = True).show()


+-------------+
|acticity_code|
+-------------+
|            A|
|            B|
|            C|
|            D|
|            E|
|            F|
|            G|
|            H|
|            I|
|            J|
|            K|
|            L|
|            M|
|            O|
|            P|
|            Q|
|            R|
|            S|
+-------------+



In [17]:
subject_id = 1631
activity_code = 'B'
n = 10

In [18]:
from pyspark.sql.functions import asc, desc


In [19]:
df_record.where(f"subject_id == {subject_id} and acticity_code == '{activity_code}'").orderBy(asc("timestamp"), desc("acticity_code")).show(n)


+----------+-----------+-----------+-------------+----------------+----------+----------+----------+
|subject_id|sensor_type|device_type|acticity_code|       timestamp|         x|         y|         z|
+----------+-----------+-----------+-------------+----------------+----------+----------+----------+
|      1631|      accel|      watch|            B|1328285960211589|  16.17358| 7.7905827| -9.755025|
|      1631|       gyro|      watch|            B|1328286009711589|-1.6902926| 0.5162165|-1.0149877|
|      1631|      accel|      watch|            B|1328286009711589| 13.688399|-12.773215|-10.083031|
|      1631|       gyro|      watch|            B|1328286059211589| 1.1283971| 1.0626972|-2.8483078|
|      1631|      accel|      watch|            B|1328286059211589|  9.958233|   3.79466| -8.308928|
|      1631|      accel|      watch|            B|1328286108711589|  9.077167|-6.9050274|-3.9395092|
|      1631|       gyro|      watch|            B|1328286108711589| 1.4809996|0.88373274|-2

# 4. Determine the sign of coordiantes

In [20]:
df_record.withColumn('x_positive', df_record['x']>= 0) \
        .withColumn('y_positive', df_record['y']>= 0) \
        .withColumn('z_positive', df_record['z']>= 0) \
        .select('subject_id', 'sensor_type', 'device_type', 'acticity_code', 'timestamp', 'x_positive', 'y_positive', 'z_positive')\
        .where(f"subject_id == {subject_id} and acticity_code == '{activity_code}'") \
        .orderBy(asc("timestamp"), desc("acticity_code")).show(n)

+----------+-----------+-----------+-------------+----------------+----------+----------+----------+
|subject_id|sensor_type|device_type|acticity_code|       timestamp|x_positive|y_positive|z_positive|
+----------+-----------+-----------+-------------+----------------+----------+----------+----------+
|      1631|      accel|      watch|            B|1328285960211589|      true|      true|     false|
|      1631|      accel|      watch|            B|1328286009711589|      true|     false|     false|
|      1631|       gyro|      watch|            B|1328286009711589|     false|      true|     false|
|      1631|       gyro|      watch|            B|1328286059211589|      true|      true|     false|
|      1631|      accel|      watch|            B|1328286059211589|      true|      true|     false|
|      1631|      accel|      watch|            B|1328286108711589|      true|     false|     false|
|      1631|       gyro|      watch|            B|1328286108711589|      true|      true|  

In [22]:
sc.stop()