In [1]:
import numpy as np
import plotly.graph_objects as go
import polars as pl
import pyEDM
from plotly.subplots import make_subplots
from scipy.signal import find_peaks

In [2]:
# https://github.com/mmalekzadeh/motion-sense

In [3]:
path = "../data/motions/jog_16/sub_2.csv"

df = pl.read_csv(path).drop("")

In [4]:
df

attitude.roll,attitude.pitch,attitude.yaw,gravity.x,gravity.y,gravity.z,rotationRate.x,rotationRate.y,rotationRate.z,userAcceleration.x,userAcceleration.y,userAcceleration.z
f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
1.683416,-1.309282,1.183437,0.256888,0.966,0.029054,2.590787,-3.295517,1.143418,0.064874,-0.846249,-0.171922
1.429139,-1.281262,1.005889,0.282632,0.958377,-0.040307,2.427872,-4.584084,1.446238,0.239188,-0.36185,0.069375
1.203411,-1.236084,0.851848,0.306565,0.944505,-0.117984,3.114478,-3.246325,1.706761,0.207503,0.522841,0.315403
1.050246,-1.172212,0.725235,0.336698,0.921611,-0.193027,3.702307,-1.078448,1.985827,0.277697,1.411168,-0.007291
0.925612,-1.101603,0.597556,0.361271,0.891933,-0.271904,4.287506,0.271694,0.83771,0.377606,3.610844,-0.071937
…,…,…,…,…,…,…,…,…,…,…,…
0.895589,-0.454595,0.241023,0.7013,0.439099,-0.561577,1.692241,-1.680805,2.804636,1.380762,-0.105709,0.319496
0.849137,-0.417162,0.26215,0.68633,0.405167,-0.603977,-0.336049,-3.268461,1.466839,0.305888,-0.106018,0.404926
0.78901,-0.422614,0.292139,0.64722,0.410146,-0.642561,-1.587874,-3.709327,0.382961,-0.522265,-0.127004,0.750487
0.730418,-0.449609,0.316982,0.600874,0.434614,-0.670865,-1.731372,-3.141262,-0.359047,-1.193541,-0.47009,1.08833


In [5]:
sensors = {}
for columns in df.columns:
    key = columns.split(".")[0]
    if key not in sensors:
        sensors[key] = []
    sensors[key].append(columns)

for sensor, columns in sensors.items():
    fig = make_subplots(
        rows=len(columns),
        cols=1,
        subplot_titles=[column.split(".")[1] for column in columns],
    )

    for i, columns in enumerate(columns):
        fig.add_trace(
            go.Scatter(
                name=columns,
                x=np.arange(0, len(df)),
                y=df[columns],
                mode="lines",
                showlegend=False,
            ),
            row=i + 1,
            col=1,
        )

    fig.update_layout(title_text=sensor)
    fig.show()

In [6]:
tau = 2

layout = go.Layout(
    title="Lagged embedding of userAcceleration.x",
    scene=dict(
        xaxis=dict(title="x(t)"),
        yaxis=dict(title="x(t-tau)"),
        zaxis=dict(title="x(t-2*tau)"),
    ),
    width=600,
    height=600,
)
data = [
    go.Scatter3d(
        x=df["userAcceleration.x"][: -2 * tau],
        y=df["userAcceleration.x"][tau:-tau],
        z=df["userAcceleration.x"][2 * tau :],
        mode="lines",
        line=dict(color=np.arange(0, len(df)), colorscale="Viridis"),
        hovertemplate="x(t): %{x}<br>x(t-tau): %{y}<br>x(t-2*tau): %{z}<extra></extra>",
    )
]

fig = go.Figure(data=data, layout=layout)
fig.show()

In [7]:
tau = 2

x = df["userAcceleration.x"].to_numpy()
peaks, _ = find_peaks(x, height=0)

peaks = peaks[np.where((peaks >= tau) & (peaks < len(x) - 2 * tau))]

layout = go.Layout(
    title="userAcceleration.x",
    xaxis=dict(title="t"),
    yaxis=dict(title="x"),
)

data = [
    go.Scatter(
        name="userAcceleration.x",
        x=np.arange(0, len(x)),
        y=x,
        mode="lines",
        showlegend=False,
    ),
    go.Scatter(
        name="peaks",
        x=peaks,
        y=x[peaks],
        mode="markers",
        marker=dict(color="red", size=3),
        showlegend=False,
    ),
]

fig = go.Figure(data=data, layout=layout)
fig.show()

In [8]:
layout = go.Layout(
    title="Lagged embedding of userAcceleration.x",
    scene=dict(
        xaxis=dict(title="x(t)"),
        yaxis=dict(title="x(t-tau)"),
        zaxis=dict(title="x(t-2*tau)"),
    ),
    width=600,
    height=600,
)


colors = np.zeros(len(x))
for i in range(len(peaks) - 1):
    start, end = peaks[i], peaks[i + 1]
    colors[start:end] = np.concatenate(
        [
            np.linspace(0, 1, (end - start) // 2),
            np.linspace(1, 0, (end - start) - (end - start) // 2),
        ]
    )


data = [
    go.Scatter3d(
        name="lagged embedding",
        x=x[: -2 * tau],
        y=x[tau:-tau],
        z=x[2 * tau :],
        mode="lines",
        line=dict(color=colors[: -2 * tau], colorscale="Viridis"),
    ),
    go.Scatter3d(
        name="peaks",
        x=x[peaks],
        y=x[peaks + tau],
        z=x[peaks + 2 * tau],
        mode="markers",
        marker=dict(color="red", size=3),
    ),
]

fig = go.Figure(data=data, layout=layout)
fig.show()

In [9]:
X = np.array([x[peaks], x[peaks + tau], x[peaks + 2 * tau]])
d = np.linalg.norm(X, axis=0)

layout = go.Layout(
    title="distances",
    xaxis=dict(title="t"),
)
data = [
    go.Scatter(
        name="userAcceleration.x",
        x=np.arange(0, len(x)),
        y=x,
        mode="lines",
        showlegend=False,
    ),
    go.Scatter(
        name="peaks",
        x=peaks,
        y=x[peaks],
        mode="markers",
        marker=dict(color="red", size=3),
        showlegend=False,
    ),
    go.Scatter(x=peaks, y=d, mode="lines", line=dict(color="red"), showlegend=False),
]

fig = go.Figure(data=data, layout=layout)
fig.show()

In [10]:
peaks_of_peaks, _ = find_peaks(d, height=0, distance=tau)
peaks = peaks[peaks_of_peaks]

layout = go.Layout(
    title="userAcceleration.x",
    xaxis=dict(title="t"),
    yaxis=dict(title="x"),
)

data = [
    go.Scatter(x=np.arange(0, len(x)), y=x, mode="lines", showlegend=False),
    go.Scatter(
        x=peaks,
        y=x[peaks],
        mode="markers",
        marker=dict(color="red", size=3),
        showlegend=False,
    ),
]

fig = go.Figure(data=data, layout=layout)
fig.show()

In [11]:
layout = go.Layout(
    title="Lagged embedding of userAcceleration.x",
    scene=dict(
        xaxis=dict(title="x(t)"),
        yaxis=dict(title="x(t-tau)"),
        zaxis=dict(title="x(t-2*tau)"),
    ),
    width=600,
    height=600,
)


colors = np.zeros(len(x))
for i in range(len(peaks) - 1):
    start, end = peaks[i], peaks[i + 1]
    colors[start:end] = np.concatenate(
        [
            np.linspace(0, 1, (end - start) // 2),
            np.linspace(1, 0, (end - start) - (end - start) // 2),
        ]
    )


data = [
    go.Scatter3d(
        name="lagged embedding",
        x=x[: -2 * tau],
        y=x[tau:-tau],
        z=x[2 * tau :],
        mode="lines",
        line=dict(color=colors[: -2 * tau], colorscale="Viridis"),
    ),
    go.Scatter3d(
        name="peaks",
        x=x[peaks],
        y=x[peaks + tau],
        z=x[peaks + 2 * tau],
        mode="markers",
        marker=dict(color="red", size=3),
    ),
]

fig = go.Figure(data=data, layout=layout)
fig.show()

In [12]:
layout = go.Layout(
    title="Lagged embedding of userAcceleration.x",
    scene=dict(
        xaxis=dict(title="x(t)"),
        yaxis=dict(title="x(t-tau)"),
        zaxis=dict(title="x(t-2*tau)"),
    ),
    width=600,
    height=600,
)

data = [
    go.Scatter3d(
        name="lagged embedding (t < 1400)",
        x=df["userAcceleration.x"][:1400][: -2 * tau],
        y=df["userAcceleration.x"][:1400][tau:-tau],
        z=df["userAcceleration.x"][:1400][2 * tau :],
        mode="lines",
        marker=dict(color="blue"),
    ),
    go.Scatter3d(
        name="lagged embedding (t >= 1400)",
        x=df["userAcceleration.x"][1400:][: -2 * tau],
        y=df["userAcceleration.x"][1400:][tau:-tau],
        z=df["userAcceleration.x"][1400:][2 * tau :],
        mode="lines",
        marker=dict(color="red"),
    ),
]

fig = go.Figure(data=data, layout=layout)
fig.show()

In [13]:
layout = go.Layout(
    title="Embedding of userAcceleration",
    scene=dict(
        xaxis=dict(title="x"),
        yaxis=dict(title="y"),
        zaxis=dict(title="z"),
    ),
    width=600,
    height=600,
)

data = [
    go.Scatter3d(
        name="userAcceleration",
        x=df["userAcceleration.x"],
        y=df["userAcceleration.y"],
        z=df["userAcceleration.z"],
        mode="lines",
    )
]

fig = go.Figure(data=data, layout=layout)
fig.show()

In [14]:
tau = 2

layout = go.Layout(
    title="Multivariate lagged embedding of userAcceleration.(x, y)",
    scene=dict(
        xaxis=dict(title="x(t)"),
        yaxis=dict(title="y(t)"),
        zaxis=dict(title="y(t-tau)"),
    ),
    width=600,
    height=600,
)
data = [
    go.Scatter3d(
        name="lagged embedding",
        x=df["userAcceleration.x"][:-tau],
        y=df["userAcceleration.y"][:-tau],
        z=df["userAcceleration.y"][tau:],
        mode="lines",
    )
]

fig = go.Figure(data=data, layout=layout)
fig.show()

In [15]:
gravity = np.sqrt(np.power(df[sensors["gravity"]].to_numpy(), 2).sum(axis=1))

layout = go.Layout(
    title="Magnitude of gravity",
    xaxis=dict(title="t"),
    yaxis=dict(title="g"),
    width=900,
    height=500,
)
data = [go.Scatter(x=np.arange(0, len(gravity)), y=gravity, mode="lines")]

fig = go.Figure(data=data, layout=layout)
fig.show()

In [16]:
acceleration = np.sqrt(
    np.power(df[sensors["userAcceleration"]].to_numpy(), 2).sum(axis=1)
)

layout = go.Layout(
    title="Magnitude of acceleration",
    xaxis=dict(title="t"),
    yaxis=dict(title="a"),
    width=900,
    height=500,
)
data = [go.Scatter(x=np.arange(0, len(acceleration)), y=acceleration, mode="lines")]

fig = go.Figure(data=data, layout=layout)
fig.show()

In [17]:
tau = 2

layout = go.Layout(
    title="Lagged embedding of acceleration",
    scene=dict(
        xaxis=dict(title="x(t)"),
        yaxis=dict(title="x(t-tau)"),
        zaxis=dict(title="x(t-2*tau)"),
    ),
    width=600,
    height=600,
)


data = [
    go.Scatter3d(
        x=acceleration[: -2 * tau],
        y=acceleration[tau:-tau],
        z=acceleration[2 * tau :],
        mode="lines",
        hovertemplate="x(t): %{x}<br>x(t-tau): %{y}<br>x(t-2*tau): %{z}<extra></extra>",
    )
]

fig = go.Figure(data=data, layout=layout)
fig.show()

In [18]:
def autocorrelation(x, max_lag):
    lags = range(max_lag)
    autocorr = [1] + [np.corrcoef(x[:-lag], x[lag:])[0, 1] for lag in lags[1:]]
    return np.array(autocorr)


def plot_rho_Tp_tau(df, columns: list[str], E: int, maxTp: int, minTau: int):
    PredictIntervalKwargs = {
        "dataFrame": df,
        "lib": [1, df.shape[0] // 2],
        "pred": [df.shape[0] // 2 + 1, df.shape[0]],
        "maxTp": maxTp,
        "E": E,
        "showPlot": False,
    }

    tau_range = range(-1, minTau - 1, -1)
    Tp_range = range(1, maxTp + 1)

    fig = make_subplots(
        rows=1,
        cols=len(columns),
        subplot_titles=columns,
        horizontal_spacing=0.01,
        specs=[[{"type": "surface"} for _ in range(len(columns))]],
    )

    for i, column in enumerate(columns):
        rho = []
        for tau in tau_range:
            result = pyEDM.PredictInterval(
                columns=column, target=column, tau=tau, **PredictIntervalKwargs
            )
            rho.append(result["rho"].values)

        Tp, tau = np.meshgrid(Tp_range, tau_range)
        rho = np.array(rho)

        autocorr = autocorrelation(df[column].values, max_lag=maxTp)
        autocorr = np.repeat(autocorr.reshape(1, -1), len(tau_range), axis=0)

        fig.add_trace(
            go.Surface(
                name="rho",
                x=Tp,
                y=tau,
                z=rho,
                scene=f"scene{i+1}",
                hovertemplate="Tp: %{x}<br>tau: %{y}<br>rho: %{z}<extra></extra>",
                showscale=False,
            ),
            row=1,
            col=i + 1,
        )
        fig.add_trace(
            go.Surface(
                name="autocorrelation",
                x=Tp,
                y=tau,
                z=np.abs(autocorr),
                opacity=0.5,
                scene=f"scene{i+1}",
                hovertemplate="Tp: %{x}<br>rho: %{z}<extra></extra>",
                showscale=False,
            ),
            row=1,
            col=i + 1,
        )

    fig.update_scenes(
        xaxis_title_text="Tp",
        yaxis_title_text="tau",
        zaxis_title_text="rho",
        zaxis_range=[-1, 1],
    )

    fig.update_layout(
        height=400, width=len(columns) * 400, margin=dict(l=20, r=20, t=30, b=30)
    )
    fig.show()


plot_rho_Tp_tau(df.to_pandas(), sensors["userAcceleration"], E=5, maxTp=15, minTau=-5)

In [19]:
def plot_rho_E_tau(df, columns: list[str], maxE: int, Tp: int, minTau: int):
    assert minTau < 0

    EmbedDimensionKwargs = {
        "dataFrame": df,
        "lib": [1, df.shape[0] // 2],
        "pred": [df.shape[0] // 2 + 1, df.shape[0]],
        "maxE": maxE,
        "Tp": Tp,
        "showPlot": False,
    }

    tau_range = range(-1, minTau - 1, -1)
    E_range = range(1, maxE + 1)

    fig = make_subplots(
        rows=1,
        cols=len(columns),
        subplot_titles=columns,
        horizontal_spacing=0.01,
        specs=[[{"type": "surface"} for _ in range(len(columns))]],
    )

    for i, column in enumerate(columns):
        rho = []
        for tau in tau_range:
            result = pyEDM.EmbedDimension(
                columns=column, target=column, tau=tau, **EmbedDimensionKwargs
            )
            rho.append(result["rho"].values)

        E, tau = np.meshgrid(E_range, tau_range)
        rho = np.array(rho)

        fig.add_trace(
            go.Surface(
                x=E,
                y=tau,
                z=rho,
                scene=f"scene{i+1}",
                hovertemplate="E: %{x}<br>tau: %{y}<br>rho: %{z}<extra></extra>",
                showscale=False,
            ),
            row=1,
            col=i + 1,
        )
    fig.update_scenes(
        xaxis_title_text="E",
        yaxis_title_text="tau",
        zaxis_title_text="rho",
        zaxis_range=[-1, 1],
    )

    fig.update_layout(
        height=400, width=len(columns) * 400, margin=dict(l=20, r=20, t=30, b=30)
    )
    fig.show()


plot_rho_E_tau(
    df.to_pandas(),
    sensors["userAcceleration"],
    10,
    4,
    -10,
)

In [20]:
E, Tp, tau = 5, 4, -1

In [21]:
SimplexKwargs = {
    "dataFrame": df.to_pandas(),
    "lib": [1, df.shape[0] // 2],
    "pred": [df.shape[0] // 2 + 1, df.shape[0]],
    "E": E,
    "Tp": Tp,
    "tau": tau,
    "showPlot": False,
}

fig = make_subplots(
    rows=len(sensors["userAcceleration"]),
    cols=1,
    subplot_titles=sensors["userAcceleration"],
    horizontal_spacing=0.01,
)

for i, columns in enumerate(sensors["userAcceleration"]):
    # result = pyEDM.Simplex(
    #     columns=sensors["userAcceleration"],
    #     target=column,
    #     embedded=True,
    #     **SimplexKwargs,
    # )
    result = pyEDM.Simplex(
        columns="userAcceleration.x", target=columns, **SimplexKwargs
    )
    # result = pyEDM.Simplex(columns=column, target=column, **SimplexKwargs)

    fig.add_trace(
        go.Scatter(
            name="actual",
            x=np.arange(0, len(df)),
            y=df[columns],
            mode="lines",
            showlegend=False,
        ),
        row=i + 1,
        col=1,
    )

    fig.add_trace(
        go.Scatter(
            name="prediction",
            x=np.arange(df.shape[0] // 2 + Tp, df.shape[0]),
            y=result["Predictions"][Tp:-Tp],  # type: ignore
            mode="lines",
            line=dict(color="red", dash="dot"),
            showlegend=False,
        ),
        row=i + 1,
        col=1,
    )

fig.update_layout(
    hoversubplots="axis",
    hovermode="x",
)

fig.show()

In [22]:
df_jog = pl.read_csv("../data/motions/jog_9/sub_2.csv").drop("")
df_walk = pl.read_csv("../data/motions/wlk_8/sub_2.csv").drop("")
print(df_jog.shape, df_walk.shape)

(4966, 12) (5614, 12)


In [23]:
tau = 1

layout = go.Layout(
    title="Lagged embedding of userAcceleration.x",
    scene=dict(
        xaxis=dict(title="x(t)"),
        yaxis=dict(title="x(t-tau)"),
        zaxis=dict(title="x(t-2*tau)"),
    ),
    width=600,
    height=600,
)

data = [
    go.Scatter3d(
        name="jog",
        x=df_jog["userAcceleration.x"][: -2 * tau],
        y=df_jog["userAcceleration.x"][tau:-tau],
        z=df_jog["userAcceleration.x"][2 * tau :],
        mode="lines",
        line=dict(color="rgba(0, 0, 255, 0.4)"),
        hovertemplate="x(t): %{x}<br>x(t-tau): %{y}<br>x(t-2*tau): %{z}<extra></extra>",
    ),
    go.Scatter3d(
        name="walk",
        x=df_walk["userAcceleration.x"][: -2 * tau],
        y=df_walk["userAcceleration.x"][tau:-tau],
        z=df_walk["userAcceleration.x"][2 * tau :],
        mode="lines",
        line=dict(color="rgba(255, 0, 0, 0.4)"),
        hovertemplate="x(t): %{x}<br>x(t-tau): %{y}<br>x(t-2*tau): %{z}<extra></extra>",
    ),
]


fig = go.Figure(data=data, layout=layout)
fig.show()

In [24]:
fig = make_subplots(
    rows=2,
    cols=1,
    subplot_titles=["jog", "walk"],
    shared_xaxes=True,
)

fig.add_trace(
    go.Scatter(
        x=np.arange(0, len(df_jog)), y=df_jog["userAcceleration.x"], mode="lines"
    ),
    row=1,
    col=1,
)

fig.add_trace(
    go.Scatter(
        x=np.arange(0, len(df_walk)), y=df_walk["userAcceleration.x"], mode="lines"
    ),
    row=2,
    col=1,
)

fig.update_layout(
    hoversubplots="axis",
    hovermode="x",
)
fig.show()