# Generate Train & Test Data

In [None]:
import pandas as pd
import plotly.graph_objects as go
import plotly.io as pio
import scipy.signal

pio.templates["default_theme"] = go.layout.Template(
    layout=go.Layout(
        font=dict(
            size=32,
            color="black",
            family="Computer Modern",
        ),
        title=dict(
            x=0.5,
            font=dict(size=34)
        ),
        xaxis=dict(title="Time (s)", showgrid=True),
        yaxis=dict(title="Measurement", showgrid=True),
        width=800,
        height=500,
    )
)

pio.templates.default = "simple_white" + "+default_theme"

In [None]:
full_file_name = f"../measurement/processed_data/kanguru_test.csv"
#full_file_name = f"../measurement/processed_data/kanguru_train.csv"

df = pd.read_csv(full_file_name, header=2)

with open(full_file_name, "r") as f:
        header = [next(f) for _ in range(2)]

df.reset_index(drop=True, inplace=True)
sampling_frequency = 1 / (df["Time(s)"].diff().mean())
print("Sampling Frequency: ", sampling_frequency)

df['Heel Button Measured'] = df['Heel Button']

df["Acceleration X (g) unfiltered"] = df["Acceleration X (g)"]
df["Acceleration Y (g) unfiltered"] = df["Acceleration Y (g)"]
df["Acceleration Z (g) unfiltered"] = df["Acceleration Z (g)"]
df["Angular Momentum X (dps) unfiltered"] = df["Angular Momentum X (dps)"]
df["Angular Momentum Y (dps) unfiltered"] = df["Angular Momentum Y (dps)"]
df["Angular Momentum Z (dps) unfiltered"] = df["Angular Momentum Z (dps)"]

cutoff_frequency = 100

num, den = scipy.signal.iirfilter(4, Wn=cutoff_frequency, fs=sampling_frequency, btype="low", ftype="butter")

df["Acceleration X (g)"] = scipy.signal.lfilter(num, den, df["Acceleration X (g) unfiltered"])
df["Acceleration Y (g)"] = scipy.signal.lfilter(num, den, df["Acceleration Y (g) unfiltered"])
df["Acceleration Z (g)"] = scipy.signal.lfilter(num, den, df["Acceleration Z (g) unfiltered"])
df["Angular Momentum X (dps)"] = scipy.signal.lfilter(num, den, df["Angular Momentum X (dps) unfiltered"])
df["Angular Momentum Y (dps)"] = scipy.signal.lfilter(num, den, df["Angular Momentum Y (dps) unfiltered"])
df["Angular Momentum Z (dps)"] = scipy.signal.lfilter(num, den, df["Angular Momentum Z (dps) unfiltered"])

df["Acceleration X (g)"] = df["Acceleration X (g)"] / df["Acceleration X (g)"].max()
df["Acceleration Y (g)"] = df["Acceleration Y (g)"] / df["Acceleration Y (g)"].max()
df["Acceleration Z (g)"] = df["Acceleration Z (g)"] / df["Acceleration Z (g)"].max()
df["Angular Momentum X (dps)"] = df["Angular Momentum X (dps)"] / df["Angular Momentum X (dps)"].max()
df["Angular Momentum Y (dps)"] = df["Angular Momentum Y (dps)"] / df["Angular Momentum Y (dps)"].max()
df["Angular Momentum Z (dps)"] = df["Angular Momentum Z (dps)"] / df["Angular Momentum Z (dps)"].max()
# Z Axis must be inverted for kanguru

df["Low Variance Signal"] = df["Angular Momentum Z (dps)"] * df["Acceleration Y (g)"]

In [None]:
figure = go.Figure()

for column in df.columns:
    if column != "Time(s)":
        figure.add_trace(go.Scatter(x=df["Time(s)"], y=df[column], mode="lines", name=column))

figure.show()

In [None]:
class DataAnalyzer:    
    def analytical_step_detection_kanguru(df, threshold_z_start=0.25, threshold_z_zero_crossing=0.1, threshold3=0.0, threshold_time=0.2):
        zero_crossing = 0
        flight_phase = False
        stand_phase_init = False
        stand_phase_state = 0

        df["predicted_heel_button"] = 0
        df["Flight detected"] = 0
        df["Stand init detected"] = 0

        for index, row in df.iterrows():
            time = row["Time(s)"]
            value_x = row["Acceleration X (g)"]
            value_y = row["Acceleration Y (g)"]
            value_z = -row["Angular Momentum Z (dps)"]

            if flight_phase:
                df.at[index, "Flight detected"] = 1
            if stand_phase_init:
                df.at[index, "Stand init detected"] = 1

            # detect flight phase
            if ((abs(value_z) > threshold_z_start) or flight_phase) and not stand_phase_init:
                stand_phase_state = 0
                df.at[index, "predicted_heel_button"] = 0
                flight_phase = True

                if zero_crossing == 0:
                    if value_z < -threshold_z_zero_crossing:
                        zero_crossing = 1
                        value_z_min = 0
                        z_peak = False
                if zero_crossing == 1:
                    if value_z < value_z_min:
                        value_z_min = value_z
                    else:
                        if value_z > value_z_min:
                            pass
                            #z_peak = True
                    if value_z > 0.0 or (z_peak and value_x > 0.0):
                        zero_crossing = 2
                if zero_crossing == 2:
                    if value_x > threshold3 or value_y > threshold3:
                        flight_phase = False
                        stand_phase_init = True
            else:
                df.at[index, "predicted_heel_button"] = 1
            
                # wait for vibration to end
                if stand_phase_init and stand_phase_state == 0:
                    zero_crossing = 0
                    stand_phase_state = 1
                    wait_time = time + threshold_time
                if stand_phase_state == 1:
                    if time > wait_time:
                        stand_phase_state = 2
                        stand_phase_init = False

        return df

df = DataAnalyzer.analytical_step_detection_kanguru(df)

In [None]:
#df = DataAnalyzer.analytical_step_detection(df, threshold=0.02, threshold2=0.2)

figure = go.Figure()

figure.add_trace(go.Scatter(x=df["Time(s)"], y=df["Stand detected"], mode="lines", name="Stand detected"))
figure.add_trace(go.Scatter(x=df["Time(s)"], y=df["Flight detected"], mode="lines", name="Flight detected"))
figure.add_trace(go.Scatter(x=df["Time(s)"], y=df["Stand init detected"], mode="lines", name="Stand init detected"))

figure.add_trace(go.Scatter(x=df["Time(s)"], y=df["Acceleration X (g)"], mode="lines", name="Acceleration X (g)"))
figure.add_trace(go.Scatter(x=df["Time(s)"], y=df["Acceleration Y (g)"], mode="lines", name="Acceleration Y (g)"))
figure.add_trace(go.Scatter(x=df["Time(s)"], y=df["Acceleration Z (g)"], mode="lines", name="Acceleration Z (g)"))
figure.add_trace(go.Scatter(x=df["Time(s)"], y=df["Angular Momentum X (dps)"], mode="lines", name="Angular Momentum X (dps)"))
figure.add_trace(go.Scatter(x=df["Time(s)"], y=df["Angular Momentum Y (dps)"], mode="lines", name="Angular Momentum Y (dps)"))
figure.add_trace(go.Scatter(x=df["Time(s)"], y=df["Angular Momentum Z (dps)"], mode="lines", name="Angular Momentum Z (dps)"))

figure.add_trace(go.Scatter(x=df["Time(s)"], y=df["predicted_heel_button"], mode="lines", name="Heel Button"))
#figure.add_trace(go.Scatter(x=df["Time(s)"], y=abs(df["Heel Button"] - df["Stand detected"]), mode="lines", name="Difference"))

figure.update_layout(title=dict(text="Analytical Step Detection"))

figure.show()

In [None]:
df["Time(s)"] = df["Time(s)"] - df.iloc[0]["Time(s)"]
df.to_csv("../measurement/processed_data/comparison_kang/analytic_test.csv")

df = df[df["Time(s)"] > 0.3]
df["Time(s)"] = df["Time(s)"] - df.iloc[0]["Time(s)"]

equal_values = (df["predicted_heel_button"] == df["Stand detected"]).sum()
print(f"{equal_values} out of {df.shape[0]} entries are equal, which is {equal_values / df.shape[0] * 100:.2f}%")

def count_artifacts(pred_signal, time_vector, threshold_0=0.05, threshold_1=0.15):
    """
    Counts artifacts in a binary step signal based on duration thresholds.
    
    Args:
        pred_signal (np.array): Array of predicted values (0 or 1).
        time_vector (np.array): Array of corresponding time values.
        threshold_0 (float): Minimum duration in seconds for a valid 0 segment.
        threshold_1 (float): Minimum duration in seconds for a valid 1 segment.
        
    Returns:
        Tuple (count_zero, count_one): Number of artifact segments for 0 and 1.
    """
    n = len(pred_signal)
    count_zero = 0
    count_one = 0
    start_idx = 0

    while start_idx < n:
        current = pred_signal[start_idx]
        end_idx = start_idx + 1
        while end_idx < n and pred_signal[end_idx] == current:
            end_idx += 1

        # Compute segment duration
        duration = time_vector[end_idx - 1] - time_vector[start_idx]

        if current == 0 and duration < threshold_0:
            count_zero += 1
        elif current == 1 and duration < threshold_1:
            count_one += 1

        start_idx = end_idx

    return count_zero, count_one

count_zero, count_one = count_artifacts(df["predicted_heel_button"].values, df["Time(s)"].values)
print(f"Number of short 0 segments (artifacts): {count_zero}")
print(f"Number of short 1 segments (artifacts): {count_one}")

print(f" {equal_values / df.shape[0] * 100:.2f}; {count_zero}; {count_one};")