In [1]:
import numpy as np
import sys, os
import _pickle as pickle
import matplotlib.pyplot as plt

import plotly.graph_objects as go
import plotly.express as ex
from plotly.subplots import make_subplots
from cytoolz.itertoolz import concat, sliding_window
import scipy.signal as signal
from scipy.optimize import minimize


sys.path.append("../")
import func as F

# Test local motion phases

In [4]:
def extract_info(data):
    contact_info = []
    velocity_info = []
    for f in data["frames"]:
        contacts = np.asarray([jo["contact"] for jo in f]).astype(np.float32)
        velocity = np.concatenate([jo["velocity"]for jo in f])

        contact_info.append(contacts)
        velocity_info.append(velocity)
    contact_info = np.vstack(contact_info)
    velocity_info = np.vstack(velocity_info)
    return contact_info.T, velocity_info.T

def normalise(block_fn:np.ndarray, window_size_half:int=30) -> np.ndarray:
    """
    Takes an ndarray of shape(n_joints, n_frames), where 1 = contact and 0 otherwise.
    :param block_fn:
    :param n_windows:
    :return:
    """
    Frames = block_fn.shape[1]
    t = np.arange(Frames)
    normalised_block_fn = np.zeros_like(block_fn, dtype=np.float32)

    for ts in t:
        low = max(ts-window_size_half, 0)
        high = min(ts+window_size_half, Frames)
        window = np.arange(low, high)
        slice = block_fn[:, window]
        mean = np.mean(slice, axis=1)
        std = np.std(slice, axis=1)
        std[std == 0] = 1
        normalised_block_fn[:, ts] = (block_fn[:, ts]-mean) / std

    filter = signal.butter(3, .1, "low", analog=False, output="sos")
    normalised_block_fn = signal.sosfilt(filter, normalised_block_fn)
    return normalised_block_fn

def calc_local_motion_phase(block_fn:np.ndarray):
    """
    Takes the contact info and approximates the local motion phase, as described in [Local Motion Phases for Learning Multi-Contact Character Movements]
    :param block_fn:
    :return:
    """
    block_fn = normalise(block_fn)
    results = [fit(row) for row in block_fn]
    return block_fn, results

def infer(t, params):
    return params[0] * np.sin(params[1] * t - params[2]) + params[3]

def func(x, *args):
    window = args[0]
    row_fn = args[1]
    N = float(len(window))
    return np.sqrt(
        np.sum((infer(window, x) - row_fn)**2) / N
    )

def fit(row_fn):
    Frames = len(row_fn)
    a = np.ones(Frames) * 0.001
    f = np.ones(Frames) * 0.001
    s = np.ones(Frames) * 0.001
    b = np.ones(Frames) * 0.001
    params = np.column_stack([a,f,s,b])
    t = np.arange(Frames)

    results = []
    for ts in t:
        low = max(ts-120, 0)
        high = min(ts+120, Frames)
        window = np.arange(low, high)
        res = minimize(func, params[ts], args=(window, row_fn[window]), method="BFGS")
        results.append(res)
    return results

def plot_contact(original, normalised, optimised, local_phase, amp):
    fig = make_subplots(rows=4, cols=1, subplot_titles=["Original", "Normalised", "Optimised", "Local phase vector"])
    x = np.arange(original.shape[1])

    for i in range(len(original)):
        fig.add_trace(go.Scatter(x=x, y=original[i], name="J"+str(i)), row=1, col=1)
        fig.add_trace(go.Scatter(x=x, y=normalised[i], name="J"+str(i)), row=2, col=1)
        fig.add_trace(go.Scatter(x=x, y=optimised[i], name="J"+str(i)), row=3, col=1)
        fig.add_trace(ex.bar(x=x, y=amp[i]).data[0], row=3, col=1)
        fig.add_trace(go.Scatter(x=x, y=local_phase[0][i], name="J"+str(i)), row=4, col=1)
        fig.add_trace(go.Scatter(x=x, y=local_phase[1][i], name="J"+str(i)), row=4, col=1)

    fig.update_layout(
        title="Contact info",
        width=800,
        height=800
    )
    fig.update_xaxes(title_text="frames")
    return fig
def plot_contact_single(original, normalised, optimised, local_phase, amp, velocity, i=14):
    fig = make_subplots(rows=5, cols=1,
                        subplot_titles=
                        ["Original", "Normalised", "Optimised", "Local phase vector", "Velocity Magnitude"])
    x = np.arange(len(original))

    fig.add_trace(go.Scatter(x=x, y=original, name="J"+str(i)), row=1, col=1)
    fig.add_trace(go.Scatter(x=x, y=normalised, name="J"+str(i)), row=2, col=1)
    fig.add_trace(go.Scatter(x=x, y=optimised, name="J"+str(i)), row=3, col=1)
    fig.add_trace(ex.bar(x=x, y=amp).data[0], row=3, col=1)
    fig.add_trace(go.Scatter(x=x, y=local_phase[0], name="J"+str(i)), row=4, col=1)
    fig.add_trace(go.Scatter(x=x, y=local_phase[1], name="J"+str(i)), row=4, col=1)
    fig.add_trace(go.Scatter(x=x, y=velocity, name="J"+str(i)), row=5, col=1)

    fig.update_layout(
        title="Contact info",
        width=800,
        height=800
    )
    fig.update_xaxes(title_text="frames")
    return fig

def plot_local_phase(results):
    fig = make_subplots(rows=2, cols=2)
    x = np.arange(len(results))
    # plot the approximated motion phases
    amp = np.asarray([res.x[0] for res in results], dtype=np.float32)
    freq = np.asarray([res.x[1] for res in results], dtype=np.float32)
    phase = np.asarray([res.x[2] for res in results], dtype=np.float32)
    bias = np.asarray([res.x[3] for res in results], dtype=np.float32)

    y = infer(x, [amp.mean(), freq.mean(), phase.mean(), bias.mean()])
    amp = (amp-amp.mean()) / amp.std()
    fig.add_trace(go.Scatter(x=x, y=y, name="Y"), row=1, col=1)
    fig.add_trace(go.Scatter(x=x, y=amp, name="amplitude"), row=1, col=2)
    fig.add_trace(go.Scatter(x=x, y=phase, name="phase"), row=2, col=1)
    fig.add_trace(go.Scatter(x=x, y=freq, name="freq"), row=2, col=2)

    fig.show()

def calc_local_phase_vec(results, velocity):
    phi = np.asarray([np.asarray([(r.x[1] * i - r.x[2]) % (2*np.pi) for i, r in enumerate(res)]) for res in results])
    velocity = np.reshape(velocity, (-1, 3, len(results))).T
    velocity = np.sqrt(np.sum(velocity**2, axis=1))
    # velocity = np.squeeze(velocity, axis=1)
    Phi = (1 / np.diff(phi, prepend=0, axis=1)) * velocity
    amp = np.asarray([np.asarray([(r.x[0]) for r in res]) for res in results])
    P = [Phi * amp * np.sin(phi), np.abs(Phi) * amp * np.cos(phi)]
    return P, amp, velocity



NameError: name 'np' is not defined

In [183]:
# Load data
data_path = "../../data/"
data1 = F.load(data_path+"TWO_ROT_R2-default-Two.pbz2")
data2 = F.load(data_path+"TWO_ROT_R1-default-Two.pbz2")
d1 = pickle.loads(data1[10])  # only inspect the first animation sequence
d2 = pickle.loads(data2[10])  # only inspect the first animation sequence

In [2]:
# plot the contact info
cInfo1, vInfo1= extract_info(d1)
cInfo2, vInfo2 = extract_info(d2)



NameError: name 'd1' is not defined

In [3]:
cNormalised1, cOptimised1 = calc_local_motion_phase(np.copy(cInfo1))
cNormalised2, cOptimised2 = calc_local_motion_phase(np.copy(cInfo2))


NameError: name 'calc_local_motion_phase' is not defined

In [253]:
cLocalPhase1, cAmp1, cVel1 = calc_local_phase_vec(cOptimised1, vInfo1)
cLocalPhase2, cAmp2, cVel2 = calc_local_phase_vec(cOptimised2, vInfo2)

In [254]:
cOpt1 = [[infer(i, jo[i].x) for i in range(len(jo))] for jo in cOptimised1]
cOpt2 = [[infer(i, jo[i].x) for i in range(len(jo))] for jo in cOptimised2]

In [255]:
cO_14 = cInfo1[14]
cN_14 = cNormalised1[14]
cOpt_14 = cOpt1[14]
cAmp_14 = cAmp1[14]
cVel_14 = cVel1[14]
cLocal_14 = [cLocalPhase1[0][14], cLocalPhase1[1][14]]
cO_20 = cInfo1[20]
cN_20 = cNormalised1[20]
cOpt_20 = cOpt1[20]
cAmp_20 = cAmp1[20]
cVel_20 = cVel1[20]
cLocal_20 = [cLocalPhase1[0][20], cLocalPhase1[1][20]]


In [259]:
# plot_contact(cO_14, cNormalised1, cOpt1, cLocalPhase1, cAmp1)
plot_contact_single(cO_14, cN_14, cOpt_14, cLocal_14, cAmp_14, cVel_14,  i=14)

In [260]:
plot_contact_single(cO_20, cN_20, cOpt_20, cLocal_20, cAmp_20, cVel_20, i=20)



In [265]:
cO_6 = cInfo2[6]
cN_6 = cNormalised2[6]
cOpt_6 = cOpt2[6]
cAmp_6 = cAmp2[6]
cVel_6 = cVel2[6]
cLocal_6 = [cLocalPhase2[0][6], cLocalPhase2[1][6]]
cO_10 = cInfo2[10]
cN_10 = cNormalised2[10]
cOpt_10 = cOpt2[10]
cAmp_10 = cAmp2[10]
cVel_10 = cVel2[10]
cLocal_10 = [cLocalPhase2[0][10], cLocalPhase2[1][10]]

In [266]:
# plot_contact(cO_14, cNormalised1, cOpt1, cLocalPhase1, cAmp1)
plot_contact_single(cO_6, cN_6, cOpt_6, cLocal_6, cAmp_6, cVel_6, i=6)

In [267]:
plot_contact_single(cO_10, cN_10, cOpt_10, cLocal_10, cAmp_10, cVel_10, i=10)


In [None]:
# plot_contact(cInfo2, cNormalised2, cOpt2, cLocalPhase2, cAmp2)
def func2(x0, *args):
    curve = cN_14
    t = np.arange(120)
    pred = infer(t,x0)
    return np.sqrt((pred - curve)**2).mean()

param = [.01,.01,.01,.01]
results = minimize(func2, param, method="BFGS", options={"disp":True})

In [None]:
print(results)

In [None]:
x = np.arange(120)
phi = (results.x[1] * x - results.x[2]) % (2*np.pi)
velocity = np.reshape(vInfo2, (-1, 3, 120)).T
velocity = np.sqrt(np.sum(velocity**2, axis=1))
print(velocity.shape)
print(phi.shape)
print(velocity.T[10])
# velocity.T[10][velocity.T[10] > 4] = 0

In [None]:
Phi = (1 / np.diff(phi, prepend=0)) * velocity.T[10]
# amp = np.asarray([np.asarray([(r.x[0]) for r in res]) for res in results])
P = [Phi * results.x[0] * np.sin(phi), np.abs(Phi) * results.x[0] * np.cos(phi)]

In [None]:
fig = ex.line(x=x, y=P[0])
fig.show()

In [None]:
fig = ex.line(x=x, y=P[1])
fig.show()

In [None]:
fig = ex.line(x=x, y=P[0]+P[1])
fig.show()

In [None]:
param_opt2 = results.x
y = infer(x, param_opt2)

fig = ex.line(x=x, y=y)
fig.show()

In [None]:
fig = ex.line(x=x, y=P[0])
fig.show()

In [None]:
fig = ex.line(x=x, y=P[1])
fig.show()

In [None]:
fig = ex.line(x=x, y=P[0]+P[1])
fig.show()

In [None]:
param_opt2 = results.x
y = infer(x, param_opt2)

fig = ex.line(x=x, y=y)
fig.show()