In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from pyspark.sql import SparkSession, functions as F


def get_spark():
    return SparkSession.builder.config("spark.local.dir", "/mnt/data/tmp").getOrCreate()


spark = get_spark()
spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/29 23:34:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/29 23:34:34 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in standalone/kubernetes and LOCAL_DIRS in YARN).
25/09/29 23:34:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = spark.read.csv(
    "/home/anthony/scratch/ai4animals/sensor_eda/AcTBeCalf.csv",
    header=True,
    inferSchema=True,
).cache()
df.show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+------+--------+--------+---------+--------------------+-----+
|            dateTime|calfId|    accX|    accY|     accZ|           behaviour|segId|
+--------------------+------+--------+--------+---------+--------------------+-----+
|2022-02-24 00:08:...|  1306|  0.8125|0.390625|  0.28125|oral_manipulation...|    0|
|2022-02-24 00:08:...|  1306|  0.6875|   0.375|    0.375|oral_manipulation...|    0|
|2022-02-24 00:08:...|  1306|   0.625|0.296875| 0.265625|oral_manipulation...|    0|
|2022-02-24 00:08:...|  1306|0.703125| 0.21875|  0.21875|oral_manipulation...|    0|
|2022-02-24 00:08:...|  1306|0.734375| 0.21875|     0.25|oral_manipulation...|    0|
|2022-02-24 00:08:...|  1306|    0.75|0.234375|     0.25|oral_manipulation...|    0|
|2022-02-24 00:08:...|  1306|  0.9375|  0.3125| 0.171875|oral_manipulation...|    0|
|2022-02-24 00:08:...|  1306|0.890625|0.328125|     0.25|oral_manipulation...|    0|
|2022-02-24 00:08:...|  1306|0.703125|   0.375|  0.15625|oral_man

                                                                                

In [31]:
from pyspark.sql import Window


def update_behaviour(label):
    if label in ["drinking_milk", "lying", "running", "standing"]:
        return label
    else:
        return "other"


# take the first few sequences just for testing
test = (
    df.where("calfId = 1306")
    .withColumn("behaviour", F.udf(update_behaviour, "string")("behaviour"))
    .where(F.col("behaviour") != "other")
    .orderBy("dateTime")
)
# keep the first few
seg = (
    test.select("calfId", "segId")
    .distinct()
    .withColumn(
        "rank", F.row_number().over(Window.partitionBy("calfId").orderBy("segId"))
    )
)
test = (
    test.join(seg.where("rank <= 20").select("calfId", "segId"), on=["calfId", "segId"])
    .orderBy("dateTime")
)
test.select("segId", "behaviour").distinct().groupBy("behaviour").count().show()

                                                                                

+---------+-----+
|behaviour|count|
+---------+-----+
| standing|    8|
|  running|   11|
|    lying|    1|
+---------+-----+



In [33]:
import matplotlib.pyplot as plt
pdf = test.select("dateTime", "accX", "accY", "accZ", "behaviour").toPandas()
display(pdf.head())
display(pdf.shape)
pdf.plot(x="dateTime", y=["accX", "accY", "accZ"])

                                                                                

Unnamed: 0,dateTime,accX,accY,accZ,behaviour
0,2022-02-06 17:31:44.261902,1.0,-0.328125,-0.09375,standing
1,2022-02-06 17:31:44.301903,1.0,-0.3125,-0.0625,standing
2,2022-02-06 17:31:44.340903,0.9375,-0.359375,-0.046875,standing
3,2022-02-06 17:31:44.381904,0.890625,-0.359375,-0.03125,standing
4,2022-02-06 17:31:44.421904,0.890625,-0.34375,-0.078125,standing


(20700, 5)

<Axes: xlabel='dateTime'>

In [28]:
import numpy as np

X = pdf[["accX", "accY", "accZ"]].to_numpy()
display(X.shape)

# now write as txt to temporary file
infile = "/home/anthony/scratch/ai4animals/sensor_eda/test_ticc.txt"
np.savetxt(infile, X, delimiter=",")

! head -n5 {infile}

(20700, 3)

1.000000000000000000e+00,-3.281250000000000000e-01,-9.375000000000000000e-02
1.000000000000000000e+00,-3.125000000000000000e-01,-6.250000000000000000e-02
9.375000000000000000e-01,-3.593750000000000000e-01,-4.687500000000000000e-02
8.906250000000000000e-01,-3.593750000000000000e-01,-3.125000000000000000e-02
8.906250000000000000e-01,-3.437500000000000000e-01,-7.812500000000000000e-02


In [None]:
# now lets run TICC
from ticc.TICC_solver import TICC

# 25 hz, so window size of 100 is 4 seconds
ticc = TICC(
    window_size=100,
    number_of_clusters=3,
    lambda_parameter=11e-2,
    beta=600,
    maxIters=100,
    threshold=2e-5,
    write_out_file=False,
    prefix_string="output_folder/",
    num_proc=4,
)
(cluster_assignment, cluster_MRFs) = ticc.fit(input_file=infile)

type(cluster_assignment), type(cluster_MRFs)