# Better acceleration detector

Here we keep a running history of the acceleration values.
We use this history to figure out the direction of gravity (g_hat).
Once we know the direction of gravity we can figure out the direction of our current acceleration (a) that is in the direction of gravity (v_g).
By subtracting off this component, we leave only the componet of our current acceleration that is perpendicular to gravity - the acceleration caused by rowing.

In [1]:
import math

In [2]:
x = [0, 0, 1, 2, 1, 0, 2, 1, 5, 6, 3, 2, 1]
y = [0, 1, 2, 1, 0, 2, 1, 1, 3, 3, 2, 1, 0]
z = [9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9]

In [3]:
def vector_mean(vector_list):
    """ Average a list of 3-vectors

    [(x1, y1, z1), ..., (xn, yn, zn)] -> (xm, ym, zm)
    """
    vlen = len(vector_list)
    mv = tuple(
        sum(v[ii] for v in vector_list) / vlen for ii in range(3)
    )
    return mv

def vector_multiply(s, v):
    """ Multiply a vector by a scalar
    """
    return tuple(s * vi for vi in v)

def vector_subtract(va, vb):
    """ Vector version of a - b
    """
    return tuple(a - b for a, b in zip(va, vb))

def dot(xv, yv):
    """ Dot product: xv.yv
    """
    return sum(x*y for x, y in zip(xv, yv))

def norm(v):
    """ Euclidean norm: sqrt(v.v)
    """
    mv = math.sqrt(dot(v, v))
    return tuple(vi/mv for vi in v)

In [4]:
hv = []
for ii in range(8):
    hv.append((x[ii], y[ii], z[ii]))
print(hv)

mv = vector_mean(hv)
print(mv)

nv = norm(mv)
print(nv)

[(0, 0, 9), (0, 1, 9), (1, 2, 9), (2, 1, 9), (1, 0, 9), (0, 2, 9), (2, 1, 9), (1, 1, 9)]
(0.875, 1.0, 9.0)
(0.09617961926411278, 0.10991956487327174, 0.9892760838594457)


In [5]:
def GravityRemover(history=10):
    """ Track and remove the average component of N acceleration vectors

    Internally track the last N accleration vectors.
    Compute the average acceleration vector from these.
    Then subtract that from the current acceleration vector.
    And return this reduced acceleration vector.
    """
    hv = []
    def _call(v):

        if len(hv) < 1:
            v_reduced = (0, 0, 0)
        else:
            # Compute the direction of average acceleration
            a_hat = norm(vector_mean(hv))
            # Remove the component parallel to average accleration
            v_parallel = vector_multiply(dot(v, a_hat), a_hat)
            v_reduced = vector_subtract(v, v_parallel)
        # Update the history elements with the new vector
        hv.append(v)
        if len(hv) > history:
            hv.pop(0)
        return v_reduced

    return _call

In [6]:
gr = GravityRemover()
for xi, yi, zi in zip(x, y, z):
    vr = gr((xi, yi, zi))
    print(dot(vr, vr))

0
1.0
3.2430769230769227
2.774018944519621
1.0545041635124905
2.0787554691298005
1.7681940700808625
0.020226936359151463
20.63545403058335
24.101963082332265
1.6955916473317856
0.35443901329998867
3.903320202588632


# Thresholder

Now that we are sensitive to basically the perpendicular direction (not gravity) we can make a thresholder.
This is a basic thresholder that only triggers if the last N samples are of the other state.

In [7]:
# import operator
# This would be the obvious choice, but is not present in my circuitpython

In [8]:
values = [5, 1, 1, 1, 1, 5, 5, 5, 5, 1, 5, 1, 1, 1, 5, 1, 1, 1]
rising = [1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0]
fallin = [0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]

In [9]:
def HistoryThresholder(threshold=10, history=10, mode="rising"):
    """ Pulse when something exceeds our threshold
    But maintain a history with hystersis so we don't accidentally trigger.
    """
    if mode == "rising":
        comparitor = lambda x, y: x > y
    elif mode == "falling":
        comparitor = lambda x, y: x < y
    else:
        raise ValueError("mode can be 'rising' or 'falling'")

    previous = []
    def _call(value):
        condition_met = comparitor(value, threshold)
        if condition_met and not any(previous):
            trigger = True
        else:
            trigger = False
        previous.append(condition_met)
        if len(previous) > history:
            previous.pop(0)
        return trigger
        
    return _call

In [10]:
# Test rising edge case
ht = HistoryThresholder(threshold=2, history=3)
result = [ht(v) for v in values]
want = [bool(n) for n in rising]
print(result)
print(want)
assert want == result

[True, False, False, False, False, True, False, False, False, False, False, False, False, False, True, False, False, False]
[True, False, False, False, False, True, False, False, False, False, False, False, False, False, True, False, False, False]


In [11]:
# Test falling edge case
ht = HistoryThresholder(threshold=2, history=3, mode="falling")
result = [ht(v) for v in values]
want = [bool(n) for n in fallin]
print(result)
print(want)
assert want == result

[False, True, False, False, False, False, False, False, False, True, False, False, False, False, False, False, False, False]
[False, True, False, False, False, False, False, False, False, True, False, False, False, False, False, False, False, False]


# Stroke Rate

Now that we have a stroke identifier, let's work on rate.

In [12]:
import time

In [13]:
def StrokeRate(history=3, forget=120):
    """ Compute stroke rate
    Inputs:
        history = remember this many strokes
        forget = forget strokes older than this time
    """
    # A single value becomes a local variable
    stroke_rate = [0]
    stroke_times = []
    def _call(stroke):
        currently = time.monotonic()
        # Drop old measurements
        if stroke_times:
            if (currently - stroke_times[0]) > forget:
                stroke_times.pop(0)
        # Add new measurements
        if stroke:
            stroke_times.append(currently)
            if len(stroke_times) > history:
                stroke_times.pop(0)
        # Update our rate
        strokes = len(stroke_times)
        if strokes > 1:
            delta_t = stroke_times[-1] - stroke_times[0]
            # strokes / delta_t [s] * 60 [s]/minute
            stroke_rate[0] = round(60 * strokes / delta_t, 1)
        else:
            stroke_rate[0] = 0
        return stroke_rate[0]

    return _call

In [14]:
values = [
            1, 1, 5, 1, 1, 5, 1, 1, 5, 1, 1, 1, 1,
            5,   1,   1,   1,   1,   5, 1, 1, 1, 1, 5, # 11 points = 11 s
            1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
         ]
expect = [
            0, 0, 0, 0, 0, 40,40,40,30,30,30,30,30,
            22.5,22.5,22.5,22.5,22.5,18,18,18,18,18,18,
            18,24,24,24,24,24,24,0 ,0 ,0 ,0, 0, 0, 0, 0,
         ]

In [15]:
ht = HistoryThresholder(threshold=2, history=2)
sr = StrokeRate(history=3, forget=12)

rates = []
for ii, (v, e) in enumerate(zip(values, expect)):
    s = ht(v)
    r = sr(s)
    rates.append(r)
    print(f"got::wanted {r}::{e} ({100*(ii+1)//len(values)}%)")
    time.sleep(1)

assert rates == expect
print("pass")

got::wanted 0::0 (2%)
got::wanted 0::0 (5%)
got::wanted 0::0 (7%)
got::wanted 0::0 (10%)
got::wanted 0::0 (12%)
got::wanted 40.0::40 (15%)
got::wanted 40.0::40 (17%)
got::wanted 40.0::40 (20%)
got::wanted 30.0::30 (23%)
got::wanted 30.0::30 (25%)
got::wanted 30.0::30 (28%)
got::wanted 30.0::30 (30%)
got::wanted 30.0::30 (33%)
got::wanted 22.5::22.5 (35%)
got::wanted 22.5::22.5 (38%)
got::wanted 22.5::22.5 (41%)
got::wanted 22.5::22.5 (43%)
got::wanted 22.5::22.5 (46%)
got::wanted 18.0::18 (48%)
got::wanted 18.0::18 (51%)
got::wanted 18.0::18 (53%)
got::wanted 24.0::18 (56%)
got::wanted 24.0::18 (58%)
got::wanted 18.0::18 (61%)
got::wanted 18.0::18 (64%)
got::wanted 24.0::24 (66%)
got::wanted 24.0::24 (69%)
got::wanted 24.0::24 (71%)
got::wanted 24.0::24 (74%)
got::wanted 24.0::24 (76%)
got::wanted 0::24 (79%)
got::wanted 0::0 (82%)
got::wanted 0::0 (84%)
got::wanted 0::0 (87%)
got::wanted 0::0 (89%)
got::wanted 0::0 (92%)
got::wanted 0::0 (94%)
got::wanted 0::0 (97%)
got::wanted 0::0 (

AssertionError: 