In [None]:
!pip install catboost

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

import plotly.graph_objs as go
import plotly.express as px

from scipy.stats import moment, skew, kurtosis


from joblib import load

# Reading and filtering data

Files from the dataset have data about the sensor in the first 3 lines, we will skip these lines.

In [None]:
def skip_fun(x):
    return x in (0, 1, 2)

Load the dataset and rename the columns for convenience

In [None]:
data_path = "/data/fight_file/14.txt"
data = pd.read_csv(data_path, skiprows=skip_fun)

data.rename(columns={"Time (s)": "Time",
                      " X (m/s2)": "X",
                      " Y (m/s2)": "Y",
                      " Z (m/s2)": "Z",
                      " R (m/s2)": "R",
                      " Theta (deg)": "Theta",
                      " Phi (deg)": "Phi",}, inplace=True)

Let's look at our data

In [None]:
trace1 = go.Scatter(x=data["Time"], y=data["X"], name="acc_X")
trace2 = go.Scatter(x=data["Time"], y=data["Y"], name="acc_Y")
trace3 = go.Scatter(x=data["Time"], y=data["Z"], name="acc_Z")
trace4 = go.Scatter(x=data["Time"], y=data["R"], name="acc_full")
trace5 = go.Scatter(x=data["Time"], y=data["Theta"], name="Theta")
trace6 = go.Scatter(x=data["Time"], y=data["Phi"], name="Phi")


data_plot = [trace1, trace2, trace3, trace4, trace5, trace6]
layout = {"title": "Dependence of acceleration on time",
          "xaxis_title" : "Time (point number)",
          "yaxis_title" : "Acceleration",
          "template" : "plotly"}

fig = go.Figure(data=data_plot, layout=layout)
fig.show()

In [None]:
# Data filtering using the Kalman filter
# https://habr.com/ru/post/588270/
def kalman(f, q=0.5, r=7):
    if not hasattr(kalman, "Accumulated_Error"):
        kalman.Accumulated_Error = 1
        kalman.kalman_adc_old = 0

    if abs(f-kalman.kalman_adc_old)/50 > 0.25:
        Old_Input = f*0.382 + kalman.kalman_adc_old*0.618
    else:
        Old_Input = kalman.kalman_adc_old

    Old_Error_All = (kalman.Accumulated_Error**2 + q**2)**(1/2)
    H = Old_Error_All**2/(Old_Error_All**2 + r**2)
    kalman_adc = Old_Input + H * (f - Old_Input)
    kalman.Accumulated_Error = ((1 - H)*Old_Error_All**2)**(1/2)
    kalman.kalman_adc_old = kalman_adc

    return kalman_adc


def normalise_kalman(func):
    o = []
    for p in func:
        res = kalman(p)
        o.append(res)
    return o

For data filtering, we use a combination of the median filter (to remove sharp peaks) and the Kalman method (to smooth the data)

In [None]:
data_filt = data.copy()

# Using the median filter
for i in ["X", "Y", "Z", "R", "Theta", "Phi"]:
    data_filt[i] = data_filt[[i]].rolling(10).mean()
    data_filt.loc[range(9), [i]] = data.loc[range(9), [i]]

# Using the Kalman method
for i in ["X", "Y", "Z", "R", "Theta", "Phi"]:
    data_filt[i] = normalise_kalman(data_filt[i])

# Let's break the signal down into separate actions that will be classified by our model
data_filt["action_index"] = data_filt.index // 120 

# Finding punches in a signal

In [None]:
# A function that counts the number of signal crossings of its average value
def n_cross(x):
    x_del = x - x.mean()
    return (np.diff(np.sign(x_del)) != 0).sum()

# A function that counts the number of points
# where the signal is greater than the maximum value of the sensor
def n_sensor_limit_max(x):
    return (x >= 77).sum()

# A function that counts the number of points
# where the signal is greater than the minimum value of the sensor
def n_sensor_limit_min(x):
    return (x <= -77).sum()

Prepare a dataset for a model that looks for punches in the fight signal

In [None]:
# For each action, we will calculate the statistical features that will be used in the model
# The set of statistical features is slightly different for data from the gyroscope and accelerometer
# So we count them separately
data_feature_acc = data_filt.loc[:, ["X", "Y", "Z", "R", "action_index"]]\
    .groupby("action_index").aggregate([
    lambda x: x.mean(),
    lambda x: x.std(),
    lambda x: x.max(),
    lambda x: x.min(),
    lambda x: x.max() - x.min(),
    lambda x: np.percentile(x, 25),
    lambda x: np.percentile(x, 50),
    lambda x: np.percentile(x, 75),
    lambda x: moment(x, moment=3),
    lambda x: moment(x, moment=4),
    lambda x: skew(x),
    lambda x: kurtosis(x),
    lambda x: n_cross(x),
    lambda x: n_sensor_limit_max(x),
    lambda x: n_sensor_limit_min(x)])

# Remove transformation artifacts
data_feature_acc = data_feature_acc.T.reset_index().T.drop(["level_0", "level_1"])
data_feature_acc = data_feature_acc.apply(lambda x: x.astype(float))

# Similarly for the gyroscope
data_feature_cor = data_filt.loc[:, ["Theta", "Phi", "action_index"]]\
    .groupby("action_index").aggregate([
    lambda x: x.mean(),
    lambda x: x.std(),
    lambda x: x.max(),
    lambda x: x.min(),
    lambda x: x.max() - x.min(),
    lambda x: np.percentile(x, 25),
    lambda x: np.percentile(x, 50),
    lambda x: np.percentile(x, 75),
    lambda x: skew(x),
    lambda x: kurtosis(x),
    lambda x: n_cross(x),
    lambda x: n_sensor_limit_max(x),
    lambda x: n_sensor_limit_min(x)])

data_feature_cor = data_feature_cor.T.reset_index().T.drop(["level_0", "level_1"])
data_feature_cor = data_feature_cor.apply(lambda x: x.astype(float))

# Merge dataset => We received a ready dataset for the first model
data_feature_acc.reset_index(inplace=True)
data_feature_cor.reset_index(inplace=True)
X_test = pd.merge(data_feature_acc, data_feature_cor, on="action_index")
X_test.drop("action_index", axis=1, inplace=True)

In [None]:
# Make a prediction of the model (detect punchs)
model = load("/model/punch_detect_catboost.joblib")
model_pred = model.predict(X_test)
# Transfer the prediction to the initial dataset
pred_df = pd.DataFrame({"action_index": range(model_pred.shape[0]),
                        "model_predict": model_pred})
data_filt = pd.merge(data_filt, pred_df, how="outer", on="action_index")

Got detected punchs in our signal. Draw them.

In [None]:
fig = px.scatter(x=data_filt.index,
                 y=data_filt["R"],
                 color=data_filt["model_predict"])\
    .update_layout(title="Залежність повного прискорення від часу (color: model_predict_is_punch)",
                   xaxis_title="Номер точки(час)",
                   yaxis_title="Прискорення",
                   template='plotly')
fig.show()

# Punch classification

Divide the signal according to the results of the first model into separate punches

In [None]:
data_filt["action_index2"] = 0
action_ind = 0
for i in range(1, data_filt.shape[0]):
    if data_filt.loc[i, "model_predict"] != data_filt.loc[i - 1, "model_predict"]:
        action_ind += 1
    data_filt.loc[i, "action_index2"] = action_ind

Extract parts of the signal where there are punches

In [None]:
data_punch_filt = data_filt[data_filt["model_predict"] == 1]

Prepare a dataset for a model that сlassifies punches

In [None]:
# For each action, we will calculate the statistical features that will be used in the model
# The set of statistical features is slightly different for data from the gyroscope and accelerometer
# So we count them separately
data_feature_acc = data_punch_filt.loc[:, ["X", "Y", "Z", "R", "action_index2"]]\
    .groupby("action_index2").aggregate([
    lambda x: x.mean(),
    lambda x: x.std(),
    lambda x: x.max(),
    lambda x: x.min(),
    lambda x: x.max() - x.min(),
    lambda x: np.percentile(x, 25),
    lambda x: np.percentile(x, 50),
    lambda x: np.percentile(x, 75),
    lambda x: moment(x, moment=3),
    lambda x: moment(x, moment=4),
    lambda x: skew(x),
    lambda x: kurtosis(x),
    lambda x: n_cross(x),
    lambda x: n_sensor_limit_max(x),
    lambda x: n_sensor_limit_min(x)])

# Remove transformation artifacts
data_feature_acc = data_feature_acc.T.reset_index().T.drop(["level_0", "level_1"])
data_feature_acc = data_feature_acc.apply(lambda x: x.astype(float))

# Similarly for the gyroscope
data_feature_cor = data_punch_filt.loc[:, ["Theta", "Phi", "action_index2"]]\
    .groupby("action_index2").aggregate([
    lambda x: x.mean(),
    lambda x: x.std(),
    lambda x: x.max(),
    lambda x: x.min(),
    lambda x: x.max() - x.min(),
    lambda x: np.percentile(x, 25),
    lambda x: np.percentile(x, 50),
    lambda x: np.percentile(x, 75),
    lambda x: skew(x),
    lambda x: kurtosis(x),
    lambda x: n_cross(x),
    lambda x: n_sensor_limit_max(x),
    lambda x: n_sensor_limit_min(x)])

data_feature_cor = data_feature_cor.T.reset_index().T.drop(["level_0", "level_1"])
data_feature_cor = data_feature_cor.apply(lambda x: x.astype(float))

# Merge dataset => We received a ready dataset for the second model
data_feature_acc.reset_index(inplace=True)
data_feature_cor.reset_index(inplace=True)
X_test2 = pd.merge(data_feature_acc, data_feature_cor, on="action_index2")
X_test2.drop("action_index2", axis=1, inplace=True)

In [None]:
# Make a prediction of the model (сlassifies punchs)
model2 = load("/model/punch_classification_catboost.joblib")
model_pred2 = model2.predict(X_test2)
# Transfer the prediction to the initial dataset
pred_df = pd.DataFrame({"action_index2": data_punch_filt["action_index2"].unique(),
                        "model_predict2": model_pred2[:, 0]})
data_filt = pd.merge(data_filt, pred_df, how="outer", on="action_index2")

data_filt.fillna(0,inplace=True)

Got the final result. Draw them.

In signal detected and classify punches.

In [None]:
fig = px.scatter(x=data_filt.index,
                 y=data_filt["R"],
                 color=data_filt["model_predict2"])\
    .update_layout(title="Залежність повного прискорення від часу (color: model_predict_punch_type)",
                   xaxis_title="Номер точки(час)",
                   yaxis_title="Прискорення",
                   template='plotly')
fig.show()

# Comparison of results

Let's compare the final result with the true values.
True values are taken from the markup file for this file.

In [None]:
data_filt["real_punch_type"] = 0

jab_ind = [9, 10, 11, 22, 23, 24, 34, 35, 36, 77, 78, 79, 119, 120, 121]

uppercut_ind = [56, 57, 58, 82, 83, 84, 112, 113, 114, 123, 124, 138, 139, 140]
                
hook_ind = [47, 48, 49, 59, 60, 61, 93, 94, 95, 126, 127, 135, 136]

data_filt.loc[data_filt["action_index"].isin(jab_ind), "real_punch_type"] = 1
data_filt.loc[data_filt["action_index"].isin(uppercut_ind ), "real_punch_type"] = 2
data_filt.loc[data_filt["action_index"].isin(hook_ind), "real_punch_type"] = 3

Draw true labels

In [None]:
fig = px.scatter(x=data_filt.index,
                 y=data_filt["R"],
                 color=data_filt["real_punch_type"])\
    .update_layout(title="Залежність повного прискорення від часу (color: real_punch_type)",
                   xaxis_title="Номер точки(час)",
                   yaxis_title="Прискорення",
                   template='plotly')
fig.show()