In [9]:
import os
import sys

import numpy as np
import pandas as pd

from scipy.stats import moment, skew, kurtosis

from joblib import load


def skip_fun(x):
    return x in (0, 1, 2)


file_name = os.listdir("test_file")[-1]
data = pd.read_csv("test_file/" + file_name, skiprows=skip_fun)
data.rename(columns={"Time (s)": "Time",
                     " X (m/s2)": "X",
                     " Y (m/s2)": "Y",
                     " Z (m/s2)": "Z",
                     " R (m/s2)": "R",
                     " Theta (deg)": "Theta",
                     " Phi (deg)": "Phi", }, inplace=True)

data["frec"] = 1 / data["Time"].diff()

errors = []

if any(data["frec"].apply(lambda x: np.abs(x - data["frec"].mean()) > 1.5)):
    errors.append("Не стабільна частота")

if ((data.loc[:, ["X", 'Y', "Z"]].max()).max() > 80) | ((data.loc[:, ["X", 'Y', "Z"]].min()).min() < -80):
    errors.append("Досягнут ліміт датчика по одній осі")

if data["R"].max() > 150:
    errors.append("Аномальні прискорення")

if data.shape[0] < 150:
    errors.append("Можливо ваш запис був пошкоджений, він надто короткий")

if not (5 < data["R"].mean() < 20):
    errors.append("Аномальне середнє прискорення")

if errors:
    file_report_error = open("Error report/Error report {}".format(file_name), "w")

    original_stdout = sys.stdout
    sys.stdout = file_report_error

    print("Звіт помилок {}".format(file_name[:-4]), "", sep="\n")

    errors = "\n".join(errors)
    print("У вашому файлі наявні такі помолки:", "", errors, sep="\n")

    sys.stdout = original_stdout
    file_report_error.close()

data_filt = data.copy()

for i in ["X", "Y", "Z", "R", "Theta", "Phi"]:
    data_filt[i] = data_filt[[i]].rolling(5).mean()
    data_filt.loc[range(4), [i]] = data.loc[range(4), [i]]

data_filt["win_ind"] = data_filt.index // 25

data_win_25 = data_filt.loc[:, ["R", "win_ind"]]\
    .groupby("win_ind")\
    .aggregate(lambda x: int((x.max() > 13) or (x.min() < 7)))\
    .rename(columns={"R": "Active"})\
    .reset_index()

data_filt = pd.merge(data_filt, data_win_25, how='outer', on='win_ind')

data_filt["action_index"] = 0
action_ind = 0
for i in range(1, data_filt.shape[0]):
    if data_filt.loc[i, "Active"] != data_filt.loc[i - 1, "Active"]:
        action_ind += 1
    data_filt.loc[i, "action_index"] = action_ind

def n_cross(x):
    x_del = x - x.mean()
    return (np.diff(np.sign(x_del)) != 0).sum()

data_feature_acc = data_filt.loc[:, ["X", "Y", "Z", "R", "action_index"]]\
    .groupby("action_index").aggregate([
    lambda x: x.mean(),
    lambda x: x.std(),
    lambda x: x.max(),
    lambda x: x.min(),
    lambda x: x.std() / x.mean(),
    lambda x: np.percentile(x, 25),
    lambda x: np.percentile(x, 50),
    lambda x: np.percentile(x, 75),
    lambda x: moment(x, moment=3),
    lambda x: moment(x, moment=4),
    lambda x: skew(x),
    lambda x: kurtosis(x),
    lambda x: n_cross(x)])

data_feature_acc = data_feature_acc.T.reset_index().T.drop(["level_0", "level_1"])
data_feature_acc = data_feature_acc.apply(lambda x: x.astype(float))

data_feature_cor = data_filt.loc[:, ["Theta", "Phi", "action_index"]]\
    .groupby("action_index").aggregate([
    lambda x: x.mean(),
    lambda x: x.std(),
    lambda x: x.max(),
    lambda x: x.min(),
    lambda x: x.std() / x.mean(),
    lambda x: np.percentile(x, 25),
    lambda x: np.percentile(x, 50),
    lambda x: np.percentile(x, 75)])

data_feature_cor = data_feature_cor.T.reset_index().T.drop(["level_0", "level_1"])
data_feature_cor = data_feature_cor.apply(lambda x: x.astype(float))

data_feature_acc.reset_index(inplace=True)
data_feature_cor.reset_index(inplace=True)
data_feature = pd.merge(data_feature_acc, data_feature_cor, on="action_index")
data_feature.drop("action_index", axis=1, inplace=True)

Sts = load("models/sts_model_28.joblib")
data_feature = Sts.transform(data_feature)

model = load("models/cat_model_28.joblib")
model_pred = model.predict(data_feature)
pred_df = pd.DataFrame({"action_index": range(model_pred.shape[0]),
                        "model_predict": model_pred[:, 0]})
data_filt = pd.merge(data_filt, pred_df, how="outer", on="action_index")

time_0, time_1, time_2, time_3 = 0.0, 0.0, 0.0, 0.0
ex_ind = 0

file_report_fit = open("Fitness report/Fitness report {}".format(file_name), "w")

original_stdout = sys.stdout
sys.stdout = file_report_fit

print("Звіт тренування {}".format(file_name[:-4]), "", sep="\n")

for i in data_filt["action_index"].unique():
    data_i = data_filt[data_filt["action_index"] == i].reset_index()
    dt = data_i.loc[data_i.shape[0] - 1, "Time"] - data_i.loc[0, "Time"]
    if data_i.loc[0, "model_predict"] == 0:
        time_0 += dt
        continue
    elif data_i.loc[0, "model_predict"] == 1:
        ex_ind += 1
        time_1 += dt
        print("Ваша {} вправа махи руками, вона тривала: {:.4} секунд".format(ex_ind, dt))
        continue
    elif data_i.loc[0, "model_predict"] == 2:
        ex_ind += 1
        time_2 += dt
        print("Ваша {} вправа присідання, вона тривала: {:.4} секунд".format(ex_ind, dt))
        continue
    else:
        ex_ind += 1
        time_3 += dt
        print("Ваша {} вправа удари руками, вона тривала: {:.4} секунд".format(ex_ind, dt))

print("", "Результат:", "", sep="\n")
print("Ви виконували махи руками: {:.4} секунд".format(time_1))
print("Ви виконували присідання: {:.4} секунд".format(time_2))
print("Ви виконували удари руками: {:.4} секунд".format(time_3))
print("Ви відпочивали: {:.4} секунд".format(time_0))

all_good = True
bad = []
if time_1 < 12:
    bad.append("махи руками")
    all_good = False
if time_2 < 12:
    bad.append("присідання")
    all_good = False
if time_3 < 12:
    bad.append("удари руками")
    all_good = False
bad = ", ".join(bad)

if all_good:
    print("", "Ви гарно впорались з розминкою, приємного тренування!", sep="\n")
else:
    print("", "Ваша розминка ще не закінчена, вам потрібно краще зробити {}.".format(bad), sep="\n")

sys.stdout = original_stdout
file_report_fit.close()

