# Bar Crawl: Detecting Heavy Drinking - Data Visualisation
#### Accelerometer and transdermal alcohol content data from a college bar crawl. Used to predict heavy drinking episodes via mobile data
[Source: http://ceur-ws.org/Vol-2429/paper6.pdf](http://ceur-ws.org/Vol-2429/paper6.pdf)

#### Features: Three-axis time series accelerometer data
#### Target: Time series transdermal alcohol content (TAC) data (real-time measure of intoxication)
The study decomposed each time series into 10 second windows and performed binary classification to predict if windows corresponded to an intoxicated participant (TAC >= 0.08) or sober participant (TAC < 0.08). The study tested several models and achieved a test accuracy of 77.5% with a random forest.

In [1]:
import numpy as np

# Since my computer has limited resources (24GB RAM, 8 cores), I must aggregate the data to accelerate the notebook
AGGREGATE = 50

In [63]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, regexp_replace, collect_list, row_number, floor, avg, lit, coalesce, first, last, array
from pyspark.sql.functions import min, max
from pyspark.sql.functions import when, col
from pyspark.sql.window import Window
from pyspark.sql import functions as F

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

In [3]:
# Initialise SparkSession
spark = SparkSession.builder.appName("heavy_drinking_project").config("spark.hadop.fs.defaultFS","hdfs://localhost:9000").getOrCreate()

25/02/05 12:24:10 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.0.1; using 192.168.1.62 instead (on interface wlp61s0)
25/02/05 12:24:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/05 12:24:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Load data from HDFS
accelerometer_data = spark.read.csv("hdfs://localhost:9000/heavy_drinking_project/all_accelerometer_data_pids_13.csv", header=True, inferSchema=True)
# Display first 5 rows of data
#accelerometer_data.show(5)

                                                                                

In [5]:
# Aggregation - Groupby by 50 rows, avg on values, partition by pid
window_spec = Window.partitionBy("pid").orderBy("time")
accelerometer_data = accelerometer_data.withColumn("row_num", row_number().over(window_spec))
accelerometer_data = accelerometer_data.withColumn("group_num", floor((col("row_num") - 1) / AGGREGATE))
accelerometer_data = accelerometer_data.groupBy("pid", "group_num").agg(
    F.avg("x").alias("x"),
    F.avg("y").alias("y"),
    F.avg("z").alias("z"),
    F.min("time").alias("time")
)
# Delete group_num column
accelerometer_data = accelerometer_data.drop("group_num")

In [6]:
# I have also selected just one of the pid's for acceleration purposes - but since I am using partition, they never get mixed
#accelerometer_data = accelerometer_data.filter((accelerometer_data["pid"] == "SA0297") | (accelerometer_data["pid"] == "BK7610"))
accelerometer_data = accelerometer_data.filter((accelerometer_data["pid"] == "SA0297"))

In [7]:
#accelerometer_data.describe().show()

In [8]:
# Number of rows where time in 0
# accelerometer_data.filter(accelerometer_data["time"] == 0).count()
# Drop rows where time is 0
accelerometer_data = accelerometer_data.filter(accelerometer_data["time"] != 0)

In [9]:
#accelerometer_data.select("pid").distinct().count()

What can we observe first:
- Here the time is measured in milliseconds
- They have different starting and ending times, might occure shorter, or longer breaks

In [10]:
# Min-max values of time / pid
acc_mean_max = accelerometer_data.groupBy("pid").agg(
    min("time").alias("min_value"),
    max("time").alias("max_value")
)
acc_mean_max_value = acc_mean_max.collect()

                                                                                

In [11]:
#for row in acc_mean_max_value:
#    print("pid: ", row.pid, "min_value: ", row.min_value, "max_value: ", row.max_value)

In [12]:
# Display statistics of data
# accelerometer_data.describe().show()

Let"s examine the TAC data (the labels) as well

In [13]:
# Load data from HDFS
tac_data = spark.read.csv("hdfs://localhost:9000/heavy_drinking_project/clean_tac", header=True, inferSchema=True)
# Display first 5 rows of data
#tac_data.show(5)

Készítsünk plotot, hogy hogyan változott a TAC, és mikor voltak a mérések

In [14]:
tac_data = tac_data.withColumn("timestamp", (col("timestamp").cast("long") * 1000))

In [15]:
tac_data=tac_data.withColumn("pid", input_file_name())
# Clean up the source_file column: remove prefix and suffix
tac_data = tac_data.withColumn(
    "pid",
    regexp_replace(tac_data["pid"], r"hdfs://localhost:9000/heavy_drinking_project/clean_tac/|\_clean_TAC.csv", "")
)
# tac_data.show(5)

In [16]:
tac_data = tac_data.withColumn(
    "label",
    when(col("TAC_Reading") >= 0.08, 1).otherwise(0)
)
# tac_data.show(5)

In [17]:
#tac_data.describe().show()

In [18]:
tac_data = tac_data.filter(tac_data["pid"] == "SA0297")

In [19]:
# Rename column timestamp to time
tac_data = tac_data.withColumnRenamed("timestamp", "time")
# Join accelerometer and tac data by time and pid
joined_data = accelerometer_data.join(tac_data, ["time", "pid"], "inner")

In [20]:
#joined_data.show(5)

In [21]:
# Select time and pid from tac_data, where time is not in accelerometer_data
missing_data = tac_data.join(accelerometer_data, ["time", "pid"], "left_anti")

In [22]:
#missing_data.describe().show()

In [23]:
# Min-max values of time / pid
missing_mean_max = missing_data.groupBy("pid").agg(
    min("time").alias("min_value"),
    max("time").alias("max_value")
)
missing_mean_max_value = missing_mean_max.collect()
# Show min-max values of time / pid
for row in missing_mean_max_value:
    print("pid: ", row.pid, "min_value: ", row.min_value, "max_value: ", row.max_value)

                                                                                

pid:  SA0297 min_value:  1493716723000 max_value:  1493806094000


In [24]:
# For every pid, delete records from missing_data where missing_row.pid==acc_row.pid, missing_df[time] is less than acc_row.min_value or greater than acc_row.max_value
filtered_missing_data = spark.createDataFrame([], missing_data.schema)
for row in acc_mean_max_value:
    filtered = missing_data.filter((missing_data["pid"] == row.pid) & (missing_data["time"] >= row.min_value) & (missing_data["time"] <= row.max_value))
    filtered_missing_data = filtered_missing_data.union(filtered)

In [25]:
# Min-max values of time / pid
filtered_min_max = filtered_missing_data.groupBy("pid").agg(
    min("time").alias("min_value"),
    max("time").alias("max_value")
)
filtered_min_max_value = filtered_min_max.collect()

                                                                                

In [26]:
for row in filtered_min_max_value:
    print("pid: ", row.pid, "min_value: ", row.min_value, "max_value: ", row.max_value)

pid:  SA0297 min_value:  1493734448000 max_value:  1493806094000


In [27]:
missing_data = filtered_missing_data

In [28]:
# Add columns x, y, z to missing_data with null values
missing_data = missing_data \
    .withColumn("id", lit(0).cast("int")) \
    .withColumn("x", lit(None).cast("double")) \
    .withColumn("y", lit(None).cast("double")) \
    .withColumn("z", lit(None).cast("double"))
# missing_data.show(5)

In [29]:
accelerometer_data = accelerometer_data \
    .withColumn("TAC_Reading", lit(None).cast("double")) \
    .withColumn("label", lit(None).cast("int"))

In [30]:
# Add missing_data to accelerometer_data, the order of columns might differ
missing_data = missing_data.select(accelerometer_data.columns)
accelerometer_data = accelerometer_data.union(missing_data)

In [31]:
# Update row number
window_spec = Window.partitionBy("pid").orderBy("time")
accelerometer_data = accelerometer_data.withColumn("id", row_number().over(window_spec))

In [32]:
#missing_data.describe().show()

In [33]:
#accelerometer_data.describe().show()

In [34]:
# Sort data by pid and time
accelerometer_data = accelerometer_data.orderBy("pid", "time")

Partitiók használata

In [35]:
# Define a window for previous and next records
window_spec = Window.partitionBy("pid").orderBy("time").rowsBetween(-1, 1)

In [36]:
# If null, then avg of col
accelerometer_data = accelerometer_data.withColumn(
    "x",
    coalesce(accelerometer_data["x"], avg("x").over(window_spec))
).withColumn(
    "y",
    coalesce(accelerometer_data["y"], avg("y").over(window_spec))
).withColumn(
    "z",
    coalesce(accelerometer_data["z"], avg("z").over(window_spec))
)

In [37]:
#accelerometer_data.describe().show()

In [38]:
# Drop rows where x, y, z is null
accelerometer_data = accelerometer_data.filter(accelerometer_data["x"].isNotNull() & accelerometer_data["y"].isNotNull() & accelerometer_data["z"].isNotNull())

In [39]:
# Add new row, with mintime-1 and maxtime+1
for row in acc_mean_max_value:
    new_row = spark.createDataFrame([(row.pid, 0.0, 0.0, 0.0, row.min_value - 1, 0.0, 0, 0), (row.pid, 0.0, 0.0, 0.0, row.max_value + 1, 0.0,0, accelerometer_data.filter(accelerometer_data["pid"] == row.pid).count())], accelerometer_data.schema)
    accelerometer_data = accelerometer_data.union(new_row)

                                                                                

In [40]:
window_spec_prev = Window.partitionBy("pid").orderBy("time").rowsBetween(Window.unboundedPreceding, Window.currentRow)
window_spec_next = Window.partitionBy("pid").orderBy("time").rowsBetween(Window.currentRow, Window.unboundedFollowing)

In [41]:
accelerometer_data = accelerometer_data.withColumn("prev_tac", last("TAC_Reading", ignorenulls=True).over(window_spec_prev))
accelerometer_data = accelerometer_data.withColumn("prev_index", last(when(col("TAC_Reading").isNotNull(), col("id")), ignorenulls=True).over(window_spec_prev))
accelerometer_data = accelerometer_data.withColumn("next_tac", first("TAC_Reading", ignorenulls=True).over(window_spec_next))
accelerometer_data = accelerometer_data.withColumn("next_index", first(when(col("TAC_Reading").isNotNull(), col("id")), ignorenulls=True).over(window_spec_next))

In [42]:
accelerometer_data = accelerometer_data.withColumn("count_since_prev", col("id") - col("prev_index"))
accelerometer_data = accelerometer_data.withColumn("count_until_next", col("next_index") - col("id"))

accelerometer_data = accelerometer_data.withColumn("pos_ratio", col("count_since_prev") / (col("count_since_prev") + col("count_until_next")))

In [43]:
accelerometer_data = accelerometer_data.withColumn("TAC_Reading", when(col("TAC_Reading").isNotNull(), col("TAC_Reading")).otherwise(col("prev_tac") + (col("next_tac") - col("prev_tac")) * col("pos_ratio")))

In [44]:
# Drop columns for calculating interpolated_tac
accelerometer_data = accelerometer_data.drop("prev_tac", "prev_index", "next_tac", "next_index", "count_since_prev", "count_until_next", "pos_ratio")

In [45]:
#accelerometer_data.filter(accelerometer_data["time"] >= "1493734446000").show(5)

In [46]:
# Label data
accelerometer_data = accelerometer_data.withColumn("label", when(col("TAC_Reading") >= 0.08, 1).otherwise(0))

muszály osztani az időt 25/50-el, mivel ez így nem fog lefutni

In [47]:
# Save data to HDFS
#accelerometer_data.write.csv("hdfs://localhost:9000/heavy_drinking_project/accelerometer_data.csv", header=True)

# Coding

In [48]:
accelerometer_data.show(20)

[Stage 118:>                                                        (0 + 1) / 1]

+------+--------------------+--------------------+--------------------+-------------+--------------------+-----+---+
|   pid|                   x|                   y|                   z|         time|         TAC_Reading|label| id|
+------+--------------------+--------------------+--------------------+-------------+--------------------+-----+---+
|SA0297|                 0.0|                 0.0|                 0.0|1493733882408|                 0.0|    0|  0|
|SA0297|-0.02039999999999999|-0.00200199999999...|-0.00549400000000...|1493733882409|1.498731297848257E-4|    0|  1|
|SA0297|8.480000000000006E-4|-0.00220600000000...|-0.00357400000000...|1493733884715|2.997462595696514E-4|    0|  2|
|SA0297|0.002445999999999999|              0.0052|-0.00558799999999...|1493733886976|4.496193893544771E-4|    0|  3|
|SA0297|-5.32000000000000...|           -0.001226|-0.00104200000000...|1493733889237|5.994925191393028E-4|    0|  4|
|SA0297|             -7.8E-4|-9.40000000000000...|           -0.

                                                                                

In [94]:
dataset = accelerometer_data

In [95]:
dataset.count()

                                                                                

19292

In [96]:
window_spec = Window.partitionBy("pid").orderBy("time").rowsBetween(-12, 12)

In [97]:
assembler = VectorAssembler(
    inputCols=["x", "y", "z"],
    outputCol="coordinates"
)

In [98]:
dataset = assembler.transform(dataset)

In [99]:
dataset = dataset.withColumn(
    "feature", 
    collect_list("coordinates").over(window_spec)
)

In [100]:
dataset = dataset.filter(F.size(dataset.feature) == 25)

In [101]:
from pyspark.ml.linalg import VectorUDT


def merge_vectors(vectors):
    if vectors is None:
        return None
    merged_array = []
    for v in vectors:
        merged_array.extend(v.toArray())
    return Vectors.dense(merged_array)

merged_vectors = F.udf(merge_vectors, VectorUDT())

In [102]:
dataset = dataset.withColumn("flat_feature", merged_vectors("feature"))

In [103]:
dataset.show(5)

[Stage 608:>                                                        (0 + 1) / 1]

+------+--------------------+--------------------+--------------------+-------------+--------------------+-----+---+--------------------+--------------------+--------------------+
|   pid|                   x|                   y|                   z|         time|         TAC_Reading|label| id|         coordinates|             feature|        flat_feature|
+------+--------------------+--------------------+--------------------+-------------+--------------------+-----+---+--------------------+--------------------+--------------------+
|SA0297|-1.89999999999999...|5.799999999999999E-4|-0.00328199999999...|1493733907408|0.001798477557417...|    0| 12|[-1.8999999999999...|[(3,[],[]), [-0.0...|[0.0,0.0,0.0,-0.0...|
|SA0297|2.919999999999999E-4|-2.65999999999999...|-0.00311199999999...|1493733909668|0.001948350687202...|    0| 13|[2.91999999999999...|[[-0.020399999999...|[-0.0203999999999...|
|SA0297|             4.74E-4|-1.19999999999999...|-0.00288400000000...|1493733911924| 0.002098223816

                                                                                

In [104]:
train, test_val = dataset.randomSplit([0.7, 0.3], seed=42)

In [105]:
test, val = test_val.randomSplit([0.5, 0.5], seed=42)

In [106]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="flat_feature", numTrees=50)
model = rf.fit(train)

                                                                                

In [107]:
pred = model.transform(test)

In [108]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred)
print(f"Random Forest accuracy: {accuracy}")


[Stage 936:>                                                        (0 + 1) / 1]

Random Forest accuracy: 0.8455704929334712


                                                                                