In [None]:
import numpy as np
import scipy.signal as signal

def calculate_steering_reversal_rate(theta, fs, theta_min_deg=1.0, fLP=1.0):
    """
    Calculate the steering wheel reversal rate from a steering angle signal.

    Parameters:
    - theta: np.array, raw steering wheel angle signal (in degrees)
    - fs: int, sampling frequency in Hz
    - theta_min_deg: float, minimum angle difference to count as a reversal
    - fLP: float, low-pass filter cutoff frequency in Hz

    Returns:
    - reversal_rate: float, number of reversals per minute
    - total_reversals: int, total number of reversals
    """
    # Step 1: Low-pass filter
    b, a = signal.butter(N=2, Wn=fLP / (fs / 2), btype="low")
    theta_filtered = signal.filtfilt(b, a, theta)

    def find_stationary_points(signal_data):
        diff = np.diff(signal_data)
        stationary = []
        for i in range(1, len(diff)):
            if diff[i - 1] == 0 or np.sign(diff[i - 1]) != np.sign(diff[i]):
                stationary.append(i)
        return stationary

    def count_reversals(signal_data, stationary_points, theta_min):
        reversals = []
        k = 0
        for l in range(1, len(stationary_points)):
            i_k = stationary_points[k]
            i_l = stationary_points[l]
            if signal_data[i_l] - signal_data[i_k] >= theta_min:
                reversals.append((i_k, i_l))
                k = l
            elif signal_data[i_l] < signal_data[i_k]:
                k = l
        return reversals

    # Step 2: Find stationary points
    stationary_up = find_stationary_points(theta_filtered)
    stationary_down = find_stationary_points(-theta_filtered)

    # Step 3: Count reversals
    reversals_up = count_reversals(theta_filtered, stationary_up, theta_min_deg)
    reversals_down = count_reversals(-theta_filtered, stationary_down, theta_min_deg)

    # Step 4: Calculate reversal rate
    total_reversals = len(reversals_up) + len(reversals_down)
    duration_minutes = len(theta) / fs / 60
    reversal_rate = total_reversals / duration_minutes

    return reversal_rate, total_reversals

In [6]:
import numpy as np
np.sign(-10)

np.int64(-1)

In [5]:
import numpy as np
from scipy.signal import butter, filtfilt


def steering_wheel_reversal_rate(
    theta, sampling_rate, gap_size_deg, cutoff_freq_hz=0.6, filter_order=2
):
    """
    Calculate Steering Wheel Reversal Rate (SRR).

    Parameters:
    - theta: np.array, steering wheel angle signal (degrees)
    - sampling_rate: sampling frequency in Hz (samples per second)
    - gap_size_deg: minimum angular gap size in degrees (θ_min)
    - cutoff_freq_hz: low-pass Butterworth filter cutoff frequency (Hz), default=0.6 Hz
    - filter_order: order of the Butterworth filter, default=2

    Returns:
    - srr: steering wheel reversal rate in reversals per minute
    - reversals_count: total number of reversals detected
    """
    # Step 1: Low-pass filter
    nyquist_freq = 0.5 * sampling_rate
    normal_cutoff = cutoff_freq_hz / nyquist_freq
    b, a = butter(filter_order, normal_cutoff, btype="low", analog=False)
    theta_filtered = filtfilt(b, a, theta)

    # Step 2: Find stationary points (zeros or sign changes in first derivative)
    theta_diff = np.diff(theta_filtered)
    sign_diff = np.sign(theta_diff)

    # Apply sign change detection: stationary points where sign changes or derivative is zero
    stationary_indices = []
    # Include first point as possible stationary point
    stationary_indices.append(0)

    for i in range(1, len(sign_diff)):
        # Sign change between i-1 and i?
        if sign_diff[i] != sign_diff[i - 1]:
            stationary_indices.append(i)
    # Also include last point
    stationary_indices.append(len(theta_filtered) - 1)

    stationary_indices = sorted(set(stationary_indices))  # unique and sorted

    # Stationary points values
    e = np.array(stationary_indices)
    theta_sp = theta_filtered[e]

    def count_upward_reversals(theta_vals, gap):
        k = 0
        Nr = 0
        R = []
        N = len(theta_vals)
        for l in range(1, N):
            if theta_vals[l] - theta_vals[k] >= gap:
                Nr += 1
                R.append((k, l))
                k = l
            elif theta_vals[l] < theta_vals[k]:
                k = l
        return Nr, R

    # Step 3: Count upward reversals
    Nr_up, R_up = count_upward_reversals(theta_sp, gap_size_deg)
    # Count downward reversals on negative signal
    Nr_down, R_down = count_upward_reversals(-theta_sp, gap_size_deg)

    # Total reversals
    total_reversals = Nr_up + Nr_down

    # Step 4: Calculate reversal rate (per minute)
    total_duration_min = len(theta) / sampling_rate / 60.0
    srr = total_reversals / total_duration_min if total_duration_min > 0 else 0

    return srr, total_reversals


# Example usage:
if __name__ == "__main__":
    # Sample data: simulated steering wheel angles (degrees)
    theta_example = np.array([0, 1, 2, 1, 0, -1, -2, -1, 0, 1, 0, -1])
    sampling_rate = 1  # 1 Hz sampling (1 sample per second)
    gap_size = 1  # 1 degree gap size

    srr, count = steering_wheel_reversal_rate(theta_example, sampling_rate, gap_size)
    print(f"Steering Wheel Reversal Rate: {srr:.2f} reversals per minute")
    print(f"Total reversals detected: {count}")

ValueError: Digital filter critical frequencies must be 0 < Wn < 1

In [15]:
def stationary_points_fn(theta: np.array) -> np.array:
    """stationary points

    Args:
        theta (np.array): filtered steering angles array

    Returns:
        np.array: stationary points
    """

    # calculate the differnce
    theta_diff = np.diff(theta)
    sign_diff = np.sign(theta_diff)
    sign_diff = np.insert(sign_diff, 0, 0)

    stationary_idx = []

    for i in range(1, len(sign_diff)):
        if sign_diff[i] != sign_diff[i + 1]:
            stationary_idx.append(i)

    return np.array(stationary_idx)

In [16]:
theta_filtered = np.array([1,5,-2,-2,0,1,10])

theta_diff = np.diff(theta_filtered)
sign_diff = np.sign(theta_diff)
sign_diff = np.insert(sign_diff, 0, 0)

# Apply sign change detection: stationary points where sign changes or derivative is zero
stationary_indices = []

for i in range(1, len(sign_diff)):
    # Sign change between i-1 and i?
    if sign_diff[i] != sign_diff[i - 1]:
        stationary_indices.append(i)

In [17]:
stationary_indices

[1, 2, 3, 4]

In [18]:
def count_reversals(signal_data, stationary_points, theta_min):
    reversals = []
    k = 0
    for l in range(1, len(stationary_points)):
        i_k = stationary_points[k]
        i_l = stationary_points[l]
        if signal_data[i_l] - signal_data[i_k] >= theta_min:
            reversals.append((i_k, i_l))
            k = l
        elif signal_data[i_l] < signal_data[i_k]:
            k = l
    return reversals

In [42]:
def stationary_points_fn(theta: np.array) -> np.array:
    """stationary points

    Args:
        theta (np.array): filtered steering angles array

    Returns:
        np.array: stationary points
    """

    # calculate the differnce
    theta_diff = np.diff(theta)
    sign_diff = np.sign(theta_diff)
    sign_diff = np.insert(sign_diff, 0, 0)
    print(sign_diff)
    stationary_idx = []

    for i in range(1, len(sign_diff)-1):
        print(i)
        if sign_diff[i] != sign_diff[i + 1]:
            stationary_idx.append(i)

    return np.array(stationary_idx)


# Sample steering angle signal (length 10)
signal_data = [3, 2, 5, 3, 8, -1, -4]

# Sample stationary points indices (must be sorted and within range)
# For example, detected at indices where the derivative changes sign, including start and end
stationary_points_list = stationary_points_fn(np.array(signal_data))

# Minimum gap size for reversal detection
theta_min = 0.1

[ 0 -1  1 -1  1 -1 -1]
1
2
3
4
5


In [43]:
count_reversals(signal_data, stationary_points_list, theta_min)

[(np.int64(1), np.int64(2)), (np.int64(3), np.int64(4))]

In [44]:
stationary_points_list

array([1, 2, 3, 4])

In [52]:
x = np.array([3,6,3,2,4,0,-1,-2,2])
diff_array = np.diff(x)
upd_diff = np.insert(diff_array, 0, 0)

sign_diff = np.sign(upd_diff)

In [53]:
print(diff_array)
print(upd_diff)

[ 3 -3 -1  2 -4 -1 -1  4]
[ 0  3 -3 -1  2 -4 -1 -1  4]


In [58]:
stationary_idx = []
for i in range(1, len(sign_diff)-1):
    diff = abs(sign_diff[i] - sign_diff[i + 1])
    if diff == 2:
        stationary_idx.append(i)

In [56]:
stationary_idx

[1, 3, 4, 7]

In [59]:
def count_reversals(signal_data, stationary_points, theta_min):
    reversals = []
    k = 0
    for l in range(1, len(stationary_points)):
        i_k = stationary_points[k]
        i_l = stationary_points[l]
        if signal_data[i_l] - signal_data[i_k] >= theta_min:
            reversals.append((i_k, i_l))
            k = l
        elif signal_data[i_l] < signal_data[i_k]:
            k = l
    return reversals

In [65]:
output = count_reversals(x, stationary_idx, 1)
output

[(3, 4)]

In [70]:
import numpy as np


def count_upward_reversals(theta_vals, gap):
    k = 0
    Nr = 0
    R = []
    N = len(theta_vals)
    for l in range(1, N):
        if theta_vals[l] - theta_vals[k] >= gap:
            Nr += 1
            R.append((k, l))
            k = l
        elif theta_vals[l] < theta_vals[k]:
            k = l
    return Nr, R


x = np.array([3, 6, 3, 2, 4, 0, -1, -2, 2])

# Step 1: Find stationary points roughly - where the derivative changes sign or zero
# Calculate discrete derivative
diff_x = np.diff(x)
sign_diff = np.sign(diff_x)

stationary_points = [0]  # include first index

for i in range(1, len(sign_diff)):
    if sign_diff[i] != sign_diff[i - 1]:
        stationary_points.append(i)

stationary_points.append(len(x) - 1)  # include last index

print("Stationary points indices:", stationary_points)
print("Stationary points values:", x[stationary_points])

# Set gap size, e.g., 1 degree as threshold
theta_min = 1

# Calculate upward reversals
reversals_up, up = count_upward_reversals(x, theta_min)
# print("Upward reversals (index pairs):", reversals_up)
# print("Upward reversals (values):", [(x[s], x[e]) for s, e in reversals_up])

# To count downward reversals, repeat on negative signal
neg_x = -x
reversals_down, down = count_upward_reversals(neg_x, theta_min)
# print("Downward reversals (index pairs):", reversals_down)
# print("Downward reversals (values):", [(x[s], x[e]) for s, e in reversals_down])

Stationary points indices: [0, 1, 3, 4, 7, 8]
Stationary points values: [ 3  6  2  4 -2  2]


In [71]:
print(reversals_up, up)

3 [(0, 1), (3, 4), (7, 8)]
