In [None]:
import os
from typing import Optional

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
%matplotlib widget

In [None]:
data_path = "data/FN_ids_data"
# prefix = "Ben_2024-05-20_14:33:17:628"
prefix = "emulator"
events_path = os.path.join(data_path, f"{prefix}_events.csv")
# sensors_path = os.path.join(data_path, f"{prefix}_sensors_data.csv")

In [None]:
data_path = "data/izhar"
before_path = os.path.join(data_path, "before_cardio.csv")
after_path = os.path.join(data_path, "after_cardio.csv")

paths = {"before": before_path, "after": after_path}

In [None]:
wanted_game_ids = [[0], [1]]

In [None]:
def convert_time_stamps(df: pd.DataFrame):
    df["Timestamp"] = df["Timestamp"].str.replace(
        r"(.*)_(.*):(\d\d\d)$", r"\1 \2.\3", regex=True
    )
    df["Timestamp"] = pd.to_datetime(df["Timestamp"], format="%Y-%m-%d %H:%M:%S.%f")


def compute_game_id(df: pd.DataFrame):
    game_id = 0
    game_ids = np.zeros(len(df), dtype=int)
    game_starts = df.fillna("").query("Details.str.contains('game starts')").index
    for game_start in game_starts:
        game_ids[int(game_start) :] = game_id
        game_id += 1
    df["Game_id"] = game_ids


def compute_scroll_dist(scroll: str):
    distance_x = np.abs(float(scroll.split("X: ")[1].split(" ")[0]))
    distance_y = np.abs(float(scroll.split("Y: ")[1].split(" ")[0]))
    distance = np.sqrt(distance_x * distance_x + distance_y * distance_y)
    return distance_x, distance_y, distance


def compute_fling_vel(fling: str):
    velocity_x = float(fling.split("X: ")[1].split(" ")[0])
    velocity_y = float(fling.split("Y: ")[1].split(" ")[0])
    speed = np.sqrt(velocity_x * velocity_x + velocity_y * velocity_y)
    direction = np.tanh(velocity_y / velocity_x)
    return velocity_x, velocity_y, speed, direction


def compute_fn_response_time(df: pd.DataFrame):

    df["Fruit_id"] = df["Details"].str.extract(r"(\d+)$").astype(int)

    spawn_df = df[df["Details"].str.contains("New")].copy()
    cut_df = df[df["Details"].str.contains("Cut")].copy()

    merge_df = pd.merge(
        spawn_df, cut_df, on=["Game_id", "Fruit_id"], suffixes=("_spawn", "_cut")
    )
    merge_df["Response_time"] = (
        merge_df["Timestamp_cut"] - merge_df["Timestamp_spawn"]
    ).dt.total_seconds()

    df = pd.merge(
        df,
        merge_df[["Game_id", "Fruit_id", "Response_time"]],
        on=["Game_id", "Fruit_id"],
    )

    return df


def extract_data(paths: dict):

    res = {key: {} for key in paths.keys()}

    for i, (key, path) in enumerate(paths.items()):
        df = pd.read_csv(path, header=1)
        convert_time_stamps(df)
        compute_game_id(df)
        df = df.query("Game_id == {}".format(wanted_game_ids[i]))

        touches = (
            df.query("Activity.str.contains('FruitNinja')").reset_index().fillna("")
        )
        res[key]["touches"] = touches

        fruits = df.fillna("").query("Details.str.contains('fruit')").reset_index()
        fruits = compute_fn_response_time(fruits)
        res[key]["fruits"] = fruits

        actions = touches.query("Details.str.contains('Action')")
        actions[["x", "y"]] = actions.apply(
            lambda row: (
                float(row["CoorX"].split(":")[1]),
                float(row["CoorY"].split(":")[1]),
            ),
            axis=1,
            result_type="expand",
        )
        res[key]["actions"] = actions

        scrolls = touches.query("Details.str.contains('Scroll')")
        scrolls[["distance_x", "distance_y", "distance"]] = scrolls.apply(
            lambda row: compute_scroll_dist(row["Details"]),
            axis=1,
            result_type="expand",
        )
        res[key]["scrolls"] = scrolls

        flings = touches.query("Details.str.contains('Fling')")
        flings[["velocity_x", "velocity_y", "speed", "direction"]] = flings.apply(
            lambda row: compute_fling_vel(row["Details"]), axis=1, result_type="expand"
        )
        res[key]["flings"] = flings

        # actions_down = touches.query("Details.str.contains('DOWN')")
        # moves = touches.query("Details.str.contains('MOVE')")
        # actions_up = touches.query("Details.str.contains('UP')")

    return res

In [None]:
data_dict = extract_data(paths)

In [None]:
def plot_hist_stat(arr: np.ndarray, ax: plt.Axes, label: str = "", title: str = ""):
    ax.hist(arr, density=True)
    ax.set_title(title)
    avg = np.mean(arr)
    var = np.std(arr)
    ax.axvline(avg, color="green")
    ax.axvline(avg - var, color="green", linestyle="--")
    ax.axvline(avg + var, color="green", linestyle="--")


def plot_compare_hists(dfs_dict: dict, features: list, nbins: int = 10):
    # fig = plt.figure(constrained_layout=True)
    # subfigs = fig.subfigures(nrows=len(dfs_dict))
    # for irow, (key, df) in enumerate(dfs_dict.items()):
    #     subfigs[irow].suptitle(key)
    #     ax = subfigs[irow].subplots(nrows=1, ncols=len(features), squeeze=False)
    #     for icol, feature in enumerate(features):
    # plot_hist_stat(df[feature[0]][feature[1]], ax[0,icol], title=feature)

    fig, ax = plt.subplots(figsize=(8, 3), ncols=len(features), squeeze=False)
    for icol, feature in enumerate(features):
        ax[0, icol].set_title(f"{feature[0]}, {feature[1]}")
        feat_acc = []
        weights = []
        for _, df in dfs_dict.items():
            f = df[feature[0]][feature[1]]
            feat_acc.append(f)
            weights.append(np.full(f.shape[0], 1 / f.shape[0]))
        ax[0, icol].hist(feat_acc, nbins, weights=weights, label=dfs_dict.keys())
        ax[0, icol].set_xlabel(feature[2])

    fig.tight_layout(rect=[0, 0.1, 1, 0.8])
    plt.legend()

Scrolls - an event after Down action and before Up action, if the coordinates of the two differ. Scroll gets samples every ~120 times a seconds (dynamic, so no way to know exactly). As a result, one scroll may end not in an Up event, but another scroll (can change direction and speed).
Fling - a scroll event that does end up in Up action AND exceeds a certain threshold of speed.


In [None]:
plot_compare_hists(
    data_dict,
    [
        ["scrolls", "distance", "pixels (px)"],
        ["flings", "speed", "px/sec"],
        ["fruits", "Response_time", "sec"],
    ],
    nbins=7,
)

Distribution of scroll's distance. Not accumulated into a single move, so this is per a single 'flick'. 

PD patients are likely to have shorter scrolls, maybe overall longer DOWN to UP time

Demonstration of the data we collect from the screen. Maybe we should show in a similar manner the sensors' data we collect. Direction relative to the screen?

In [None]:
def collect_trajectories(dfs_dict: dict, poly_deg=5):

    trajectories = {key: [] for key in dfs_dict.keys()}
    polyms = {key: [] for key in dfs_dict.keys()}

    for key, df in dfs_dict.items():
        df = df["actions"]
        for _, action in df.iterrows():
            if "DOWN" in action["Details"]:
                down_index = action["index"]
            elif ("UP" in action["Details"]) and (action["index"] > down_index):
                up_index = action["index"]

                trajectory = df.query("@down_index < index <= @up_index")[
                    ["x", "y"]
                ].values
                trajectories[key].append(np.array(trajectory))

                z = np.polyfit(trajectory[:, 0], trajectory[:, 1], poly_deg)
                p = np.poly1d(z)
                polyms[key].append(p(trajectory[:, 0]))

                # m, c = np.linalg.lstsq(np.vstack([trajectory[:,0], np.ones(len(trajectory[:,0]))]).T, trajectory[:,1], rcond=None)[0]
                # polyms[key].append(trajectory[:,0]*m + c)

                down_index = 9999

    return trajectories, polyms


def plot_trajectory(
    trajectory: np.ndarray, idx: int, ax: plt.Axes, polym: Optional[np.ndarray] = None
):
    x, y = trajectory[0][0], trajectory[0][1]
    ax.scatter(x, y, color="green")
    ax.text(x, y, idx)

    x, y = trajectory[-1][0], trajectory[-1][1]
    ax.scatter(x, y, color="red")

    ax.plot(trajectory[:, 0], trajectory[:, 1], color="blue")

    if polym is not None:
        smoothness = np.sqrt(np.sum(np.square(polym - trajectory[:, 1])))
        smoothness = np.std(np.square(polym - trajectory[:, 1]))
        ax.text(x, y, f"{smoothness:.3f}")
        ax.plot(trajectory[:, 0], polym, color="orange")


def plot_compare_trajectories(
    trajectories: dict, num: int = 10, polyms: Optional[dict] = None
):
    _, ax = plt.subplots(ncols=len(trajectories), sharex=True, sharey=True)
    polym = None
    for icol, (key, ele) in enumerate(trajectories.items()):
        for e, trajectory in enumerate(ele):
            if e >= num:
                break
            if polyms is not None:
                polym = polyms[key][e]
            plot_trajectory(trajectory, e, ax[icol], polym)

    plt.tight_layout();

In [None]:
trajectories, polyms = collect_trajectories(data_dict)

In [None]:
plot_compare_trajectories(trajectories, num=10, polyms=polyms)

In [None]:
np.sqrt(np.sum(np.square(np.diff(paths[0], axis=0)), axis=1))

Velocities - speed of movement. From position to position. Speed of change of distance traveled is acceleration?

In [None]:
velocities = []
for index, action in actions.iterrows():
    if "DOWN" in action["Details"]:
        down_index = action["index"]
    if "UP" in action["Details"]:
        up_index = action["index"]
        scroll = scrolls.query("{} <= index <= {}".format(down_index - 1, up_index))
        if len(scroll) > 0:
            scroll_time_delta = scroll["Timestamp"].diff().dt.total_seconds() * 1000
            velocities.append((scroll["distance"] / scroll_time_delta).values)
        break

In [None]:
fig, ax = plt.subplots(ncols=2)
for velocity in velocities:
    ax[0].plot(velocity)
ax[1].hist([np.nanmean(velocity) for velocity in velocities]);