#Analyzing physical activity monitor data

TODO intro

## Data

TODO

To convert from sas transport file format with extension`.xpt` to CSV we can use `xport` module from python PyPI packages:
```bash
pip install xport
```
and use the `xport` module as a command-line tool to convert an XPT file to CSV file:

```bash
python -m xport paxraw_d.xpt > paxraw_d.csv
```

## Reading the data

In [ ]:
val spark = sparkSession

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@3abdda32


In [ ]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


In [ ]:
val PaxSchema = StructType(
    StructField("SEQN", FloatType, false) ::
    StructField("PAXSTAT", FloatType, false) ::
    StructField("PAXCAL", FloatType, false) ::
    StructField("PAXDAYSAS", FloatType, false) ::
    StructField("PAXN", FloatType, false) ::
    StructField("PAXHOUR", FloatType, false) ::
    StructField("PAXMINUT", FloatType, false) ::
    StructField("PAXINTEN", FloatType, false) ::
    StructField("PAXSTEP", StringType, true) :: Nil
)

PaxSchema: org.apache.spark.sql.types.StructType = StructType(StructField(SEQN,FloatType,false), StructField(PAXSTAT,FloatType,false), StructField(PAXCAL,FloatType,false), StructField(PAXDAYSAS,FloatType,false), StructField(PAXN,FloatType,false), StructField(PAXHOUR,FloatType,false), StructField(PAXMINUT,FloatType,false), StructField(PAXINTEN,FloatType,false), StructField(PAXSTEP,StringType,true))


TODO note on `nan` values in `PAXSTEP` column

In [ ]:
val PaxDF = spark.read
  .format("csv")
  .schema(PaxSchema)
  .option("header", true)
  .load("./notebooks/spark-notebooks-gallery/gallery/physical-activity-monitor/data/paxraw_d.csv")
  .withColumn("PAXSTEP", when($"PAXSTEP" === "nan", null).otherwise($"PAXSTEP".cast(DoubleType)))
  .select($"SEQN".cast(IntegerType),
          $"PAXSTAT".cast(IntegerType),
          $"PAXCAL".cast(IntegerType),
          $"PAXDAYSAS".cast(IntegerType),
          $"PAXN".cast(IntegerType),
          $"PAXHOUR".cast(IntegerType),
          $"PAXMINUT".cast(IntegerType),
          $"PAXINTEN".cast(IntegerType),
          $"PAXSTEP".cast(IntegerType))
  .filter(!isnull($"PAXSTEP"))

PaxDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [SEQN: int, PAXSTAT: int ... 7 more fields]


In [ ]:
PaxDF.write.format("parquet").save("./notebooks/spark-notebooks-gallery/gallery/physical-activity-monitor/data/paxraw_d.parquet")

In [ ]:
val PaxDF = spark.read
  .format("parquet")
  .load("./notebooks/spark-notebooks-gallery/gallery/physical-activity-monitor/data/paxraw_d.parquet")
  .withColumn("datetime", concat($"PAXDAYSAS", lit(".01.2005 "), $"PAXHOUR", lit(":"), $"PAXMINUT"))
  .withColumn("time", unix_timestamp($"datetime", "d.MM.yyyy HH:mm"))
  .withColumn("datetime", from_unixtime($"time"))

PaxDF: org.apache.spark.sql.DataFrame = [SEQN: int, PAXSTAT: int ... 9 more fields]


Let's take a closer look at the data.

In [ ]:
PaxDF.describe("PAXSTEP").show

+-------+------------------+
|summary|           PAXSTEP|
+-------+------------------+
|  count|          74874033|
|   mean|22.429516051312476|
| stddev| 708.6893633662772|
|    min|                 0|
|    max|             32767|
+-------+------------------+



`PAXSTEP` - is the step count per minute recorded by the physical activity monitor.
The values like over `500` steps per minutes seems unreliable to me.
Let's excluse those devices that have recorded more than `500` steps per minute.

In [ ]:
PaxDF.where($"PAXSTEP" > 500).select($"SEQN").distinct.count

res4: Long = 166


In [ ]:
val broadcastedBlackList = spark.sparkContext.broadcast(
  PaxDF.where($"PAXSTEP" > 500).select($"SEQN").distinct
  .collect.map(_(0).asInstanceOf[Int]).toSet
)

broadcastedBlackList: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Set[Int]] = Broadcast(8)


In [ ]:
def inBlacklistUDF = udf((seqNum: Int) => {
  broadcastedBlackList.value.contains(seqNum)
})

inBlacklistUDF: org.apache.spark.sql.expressions.UserDefinedFunction


In [ ]:
val PaxUnreliable = PaxDF.where(inBlacklistUDF($"SEQN"))

PaxUnreliable: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [SEQN: int, PAXSTAT: int ... 9 more fields]


In [ ]:
val PaxReliable = PaxDF.where(!inBlacklistUDF($"SEQN"))

PaxReliable: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [SEQN: int, PAXSTAT: int ... 9 more fields]


In [ ]:
val seqNums = PaxReliable.select($"SEQN").distinct
  .sample(false, 0.01)
  .limit(5)
  .collect
  .map(_(0).asInstanceOf[Int]).toList

seqNums: List[Int] = List(35982, 37152, 34805, 37472, 31769)


TODO show for both Reliable and Unreliable data.

In [ ]:
CustomPlotlyChart(PaxReliable.where($"SEQN" === seqNums(1)),
                  layout="""{title: 'Physical activity monitor data', 
                           yaxis: {title: 'Device Step Count per 1 min'},
                           showlegend: false}""",
                  dataOptions="""{
                    colorscale: 'Electric',
                    autocolorscale: true
                  }""",
                  dataSources="""{
                    x: 'datetime',
                    y: 'PAXSTEP'
                  }""",
                 maxPoints=2000)

res49: notebook.front.widgets.charts.CustomPlotlyChart[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = <CustomPlotlyChart widget>


Let's see on steps count distribution

TODO show for both Reliable and Unreliable data.

In [ ]:
CustomPlotlyChart(PaxReliable.sample(withReplacement=false, 0.001),
                  layout="""{title: 'Steps count distribution', 
                             yaxis: {type: 'log'},
                             xaxis: {title: 'Step Count per minute'},
                             bargap: 0.02}""",
                  dataOptions="{type: 'histogram', opacity: 0.7}",
                  dataSources="{x: 'PAXSTEP'}",
                  maxPoints=5000)

res9: notebook.front.widgets.charts.CustomPlotlyChart[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = <CustomPlotlyChart widget>


## Bucketize

In [ ]:
import org.apache.spark.ml.feature.Bucketizer

val splits = Array(0, 1, 4, 10, 30, 40, 60, 80, 100, 120, Double.PositiveInfinity)

val bucketizer = new Bucketizer()
  .setInputCol("PAXSTEP")
  .setOutputCol("activityLevel")
  .setSplits(splits)

import org.apache.spark.ml.feature.Bucketizer
splits: Array[Double] = Array(0.0, 1.0, 4.0, 10.0, 30.0, 40.0, 60.0, 80.0, 100.0, 120.0, Infinity)
bucketizer: org.apache.spark.ml.feature.Bucketizer = bucketizer_2e76cd2fb8b6


In [ ]:
val bucketedPax = bucketizer
  .transform(PaxReliable.withColumn("PAXSTEP", $"PAXSTEP".cast(DoubleType)))
  .withColumn("activityLevel", $"activityLevel".cast(IntegerType))

bucketedPax: org.apache.spark.sql.DataFrame = [SEQN: int, PAXSTAT: int ... 10 more fields]


In [ ]:
bucketedPax.where($"activityLevel" > 0).limit(10).show

+-----+-------+------+---------+----+-------+--------+--------+-------+-------------------+----------+-------------+
| SEQN|PAXSTAT|PAXCAL|PAXDAYSAS|PAXN|PAXHOUR|PAXMINUT|PAXINTEN|PAXSTEP|           datetime|      time|activityLevel|
+-----+-------+------+---------+----+-------+--------+--------+-------+-------------------+----------+-------------+
|35660|      1|     1|        2| 395|      6|      34|    1088|    6.0|2005-01-02 06:34:00|1104636840|            2|
|35660|      1|     1|        2| 396|      6|      35|     786|   25.0|2005-01-02 06:35:00|1104636900|            3|
|35660|      1|     1|        2| 397|      6|      36|     786|   15.0|2005-01-02 06:36:00|1104636960|            3|
|35660|      1|     1|        2| 399|      6|      38|     107|   12.0|2005-01-02 06:38:00|1104637080|            3|
|35660|      1|     1|        2| 400|      6|      39|     374|   37.0|2005-01-02 06:39:00|1104637140|            4|
|35660|      1|     1|        2| 401|      6|      40|     328| 

## Building a Transition Matrix

In [ ]:
import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.expressions.Window


In [ ]:
val customers = sc.parallelize(List(("Alice", Array(Array(25.0, 12.0), Array(43.0, 50.00))),
                                    ("Bob",  Array(Array(27.00, 13.0), Array(12.0, 25.0))))).
                               toDF("name", "amountSpent")

customers: org.apache.spark.sql.DataFrame = [name: string, amountSpent: array<array<double>>]


In [ ]:
customers.schema

res46: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(amountSpent,ArrayType(ArrayType(DoubleType,false),true),true))


1. use `Window Function` to collect previous minute activity

In [ ]:
val windowSpec = Window.partitionBy("SEQN").orderBy("time")


windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@2d093132


In [ ]:
PaxSmall.show(4)

+-----+-------+------+---------+----+-------+--------+--------+-------+-------------------+----------+-------------+
| SEQN|PAXSTAT|PAXCAL|PAXDAYSAS|PAXN|PAXHOUR|PAXMINUT|PAXINTEN|PAXSTEP|           datetime|      time|activityLevel|
+-----+-------+------+---------+----+-------+--------+--------+-------+-------------------+----------+-------------+
|35982|      1|     1|        7|   1|      0|       0|       0|    0.0|2005-01-07 00:00:00|1105045200|            0|
|35982|      1|     1|        7|   2|      0|       1|       0|    0.0|2005-01-07 00:01:00|1105045260|            0|
|35982|      1|     1|        7|   3|      0|       2|       0|    0.0|2005-01-07 00:02:00|1105045320|            0|
|35982|      1|     1|        7|   4|      0|       3|       0|    0.0|2005-01-07 00:03:00|1105045380|            0|
+-----+-------+------+---------+----+-------+--------+--------+-------+-------------------+----------+-------------+
only showing top 4 rows



In [ ]:
val df = PaxSmall
  .select($"SEQN", $"activityLevel", $"time")
  .withColumn("previousMinuteActivity", lag("activityLevel", 1).over(windowSpec))
  .withColumn("previousMinuteActivity", when(isnull($"previousMinuteActivity"), -1).otherwise($"previousMinuteActivity"))

df: org.apache.spark.sql.DataFrame = [SEQN: int, activityLevel: int ... 2 more fields]


In [ ]:
def initTransitionMatrix = udf{ (currentActivityLevel: Int, previousActivityLevel: Int, size: Int) => {
  val W = Array.fill(size, size)(0.0)
  if (previousActivityLevel >= 0)
    W.updated(currentActivityLevel, W(currentActivityLevel).updated(previousActivityLevel, 1.0))
  else
    W
}}

initTransitionMatrix: org.apache.spark.sql.expressions.UserDefinedFunction


In [ ]:
val dfW = df.withColumn("W", initTransitionMatrix($"activityLevel", $"previousMinuteActivity", lit(10)))

dfW: org.apache.spark.sql.DataFrame = [SEQN: int, activityLevel: int ... 3 more fields]


In [ ]:
dfW.show(5)

+-----+-------------+----------+----------------------+--------------------+
| SEQN|activityLevel|      time|previousMinuteActivity|                   W|
+-----+-------------+----------+----------------------+--------------------+
|35982|            0|1104526800|                    -1|[WrappedArray(0.0...|
|35982|            0|1104526860|                     0|[WrappedArray(1.0...|
|35982|            0|1104526920|                     0|[WrappedArray(1.0...|
|35982|            0|1104526980|                     0|[WrappedArray(1.0...|
|35982|            0|1104527040|                     0|[WrappedArray(1.0...|
+-----+-------------+----------+----------------------+--------------------+
only showing top 5 rows



In [ ]:
val arr = Array.fill(4,4)(0.0)

arr: Array[Array[Double]] = Array(Array(0.0, 0.0, 0.0, 0.0), Array(0.0, 0.0, 0.0, 0.0), Array(0.0, 0.0, 0.0, 0.0), Array(0.0, 0.0, 0.0, 0.0))


In [ ]:
arr.updated(1, arr(1).updated(2, -1))

res84: Array[Array[_ >: Double <: AnyVal]] = Array(Array(0.0, 0.0, 0.0, 0.0), Array(0.0, 0.0, -1, 0.0), Array(0.0, 0.0, 0.0, 0.0), Array(0.0, 0.0, 0.0, 0.0))
