In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal as signal
import os
import math

sr = 200 # Sample rate of the AX3s

### Load CSVs, remove gravity, and calculate QoM

In [None]:
def get_qom(x: float, y: float, z: float) -> float:
    return abs(x) + abs(y) + abs(z)

In [None]:
csv_directory = "."

dir_contents = os.listdir(csv_directory)
csv_filenames = [file for file in dir_contents if file.endswith(".csv")]

id_code_mappings = {
    "14893" : "1",
    "14940" : "2",
    "17384" : "3",
}

dataframes_dict = {}
columns = ["ts", "x", "y", "z"]

for csv_filename in csv_filenames:
    # Get sensor ID and participant code
    sensor_id = csv_filename[:5]
    try:
        location_code = id_code_mappings[sensor_id]
    except Exception as e:
        print(f" ERROR unknown sensor ID: {sensor_id}")
        raise e
        
    print(f"Loading file: {csv_filename} (location_code: {location_code})")
    
    # Load CSV and add to dictionary
    csv_path = os.path.join(csv_directory, csv_filename)
    dataframes_dict[location_code] = pd.read_csv(csv_path, names=columns)
    dataframe = dataframes_dict[location_code]
    
    # Remove gravity
    print("  Subtracting column means to remove gravity...")
    for col in ["x", "y", "z"]:
        col_mean = dataframe[col].mean()
        dataframes_dict[location_code][col] = dataframes_dict[location_code][col]-dataframe[col].mean()
    print("  Subtracted column means.")
    
    # Get QoM
    print("  Adding QoM to dataframe...")
    for i in range(len(dataframe.index)):
        row = dataframe.iloc[i]
        dataframes_dict[location_code].loc[i, "qom"] = get_qom(row.loc["x"], row.loc["y"], row.loc["z"])
    dataframes_dict[location_code].drop(["x", "y", "z"], inplace=True, axis=1) # Drop XYZ axes to save RAM
    print("  Added QoM to dataframe.")
    
    # Plot QoM
    print("  Plotting QoM...")
    plt.figure(figsize=(6,2))
    plt.plot(dataframes_dict[location_code]["qom"], lw=0.5)
    plt.title(f"{csv_filename} (location code: {location_code})")
    plt.xlabel("Sample #")
    plt.ylabel("QoM")
    plt.ylim(0,15)
    plt.show()
    
    print("------------")

# Find sync cues

In [None]:
def is_start_sync_cue_option(peaks: list, idx: int, sr: int) -> bool:
    """
    Determine whether a given peak is the start of possible set containing the sync cue.
    Each of the three peaks must be between 0.5 and 1.5 seconds apart.
    """
    if sr*0.5 < peaks[idx+1] - peak < sr*1.5:
        if sr*0.5 < peaks[idx+2] - peaks[idx+1] < sr*2:
            return True
    return False

def create_participant_code(location: str, rnd: int) -> str:
    return f"{location}-{rnd}"

def plot_sync_cue_option(
    peak_list: np.ndarray, 
    dataframe: pd.DataFrame, 
    color: str,
    sr: int,
    is_only: bool = False, 
    n_options: int = 0
) -> None:
    
    # Plot figure
    plt.figure(figsize=FIGSIZE)
    plt.plot(dataframe["qom"][peak_list[0]-400:peak_list[-1]+400], c=color, lw=0.5)
    for peak in peak_list:
        plt.axvline(peak, ls="--", c="black", lw=1, zorder=3)
    
    # Title
    if is_only:
        plt.title(f"Single option: {peak_list}")
    else:
        plt.title(f"Option {n_options}: {peak_list}")
    
    # Formatting and show
    plt.ylim(0, 15)
    plt.show()

### SET PARAMETERS: start times of rounds, round length, window size

In [None]:
# Times since start of the recording at which each rounds started, in minutes
round_start_times = [2.5, 3.7, 4.4]
round_len_mins = 0.5 # The length of each round in minutes after the sync cue
round_start_tolerance = 0.25 # Minutes after sync cue to start round
round_stop_tolerance = 0.25 # Minutes before start_time+round_len_mins to stop rounds

# The size of the window around the start time in which to look
# I.e., 1 minute = 30 secs before, 30 secs after
window_size_mins = 1

### Locate sync cues per location per round

In [None]:
# Init dataframe to hold data about round windows
round_windows_df = pd.DataFrame(
    columns=[
        "loc", 
        "round", 
        "participant_code", 
        "window_start_idx", 
        "window_stop_idx", 
        "ts_start", 
        "ts_stop"
    ]
)

# Constants
SAMP_PER_MIN = sr*60
FIGSIZE=(4,1)
ROUND_LEN_SAMPLES = int(round_len_mins * SAMP_PER_MIN) # Number of samples i.e., rows per round

# Determine colours for plotting each round
round_colours = ["blue", "darkred", "darkorange", "darkgreen", "purple", "orange", "violet"]
round_labels = ["A", "B", "C", "D", "E", "F", "G"]

# Iterate over each AX3 dataframe
for location_code, dataframe in dataframes_dict.items():
    
    print(f"----- LOCATION CODE {location_code} -----\n")
    
    # Plot QoM of entire file at location
    print(f"Plotting QoM for location {location_code}.")
    plt.figure(figsize=(8,2))
    plt.plot(dataframe["qom"], lw=0.5)
    plt.grid(c="lightgrey", alpha=0.25)
    plt.ylim(0,15)
    plt.title(f"QoM for location {location_code}")
    plt.show()
    
    # Iterate over the start times for each round
    for i, round_start_time in enumerate(round_start_times):
        
        # Get round label
        round_no = round_labels[i]
        
        # Define window boundaries
        window_start = int((round_start_time * SAMP_PER_MIN) - 0.5*window_size_mins*SAMP_PER_MIN)
        window_start = 0 if window_start < 0 else window_start
        window_end = int((round_start_time * SAMP_PER_MIN) + 0.5*window_size_mins*SAMP_PER_MIN)
        window_end = len(dataframe.index) - 1 if window_end > len(dataframe.index) - 1 else window_end
        
        print(f"  LOCATION: {location_code} ROUND {round_no}")
        print(f"  Sync cue window: {window_start}-{window_end}")
        print(f"  (from {dataframe.loc[window_start, 'ts']} to {dataframe.loc[window_end, 'ts']})")
        
        # Plot QoM of window
        plt.figure(figsize=(6,3))
        plt.plot(
            dataframe["qom"][window_start:window_end], 
            lw=0.5
        )
        plt.grid(c="lightgrey", alpha=0.25)
        plt.title(f"Round {round_no} sync cue window")
        plt.show()
        
        height = float(input(f"  Enter the height on the Y axis above which to look for peaks: "))
        print("  ------")
        
        # Find peaks in window
        peaks = signal.find_peaks(
            dataframe["qom"][window_start:window_end], 
            height=height,
            threshold=0.1,
            distance=sr*0.5)[0]
        
        # If less than three peaks, assume no participant in range
        if len(peaks) < 3:
            print(f"  Less than three peaks found with threshold {threshold}, skipping.")
        
        # If only 3 peaks, plot them
        elif len(peaks) == 3:            
            
            # Add offset to shift peaks to location within dataset
            peak_list = peaks + window_start
            
            # Plot figure
            plot_sync_cue_option(peak_list, dataframe, round_colours[i], sr, is_only=True)
            
            # User chooses whether to accept the option
            accept = input("  One option found. Is this the sync cue? Enter Y/N: ")
            
            # If decline single option
            if accept.lower()[0] == "n":
                print(f"  Incorrect option, skipping.")
                continue
            # If accept single option
            elif accept.lower()[0] == "y":
                print(f"  Accepted option.")  
            # If invalid response
            else:
                print(f"  Invalid entry: {accept}\tSkipping.")
                continue
        
        # If more than 3 peaks in window, iterate over to find three the right distance apart
        else:
            print("    SELECT FROM NUMBERED OPTIONS")
            sync_cue_options = []
            n_options = 0
            
            # Loop through peaks to find options for start cues
            for j, peak in enumerate(peaks):
                if j < len(peaks)-3:
                    if is_start_sync_cue_option(peaks, j, sr):
                        
                        # Make array of points and add offset for window start
                        peak_list = peaks[j:j+3] + window_start
                        sync_cue_options.append(peak_list)
                        n_options += 1 # Increment counter for number of options
                        
                        # Plot figure
                        plot_sync_cue_option(peak_list, dataframe, round_colours[i], sr, is_only=False, n_options=n_options)
            
            # Get user selection for cue window
            print(f"    Select the window containing the sync cue for Round {round_no}.")
            sel = int(input(f"    If the sync cue is not present or there was no player for this round, enter -1: "))
            
            # Cue not in presented options
            if sel == -1:
                print("    No valid sync cue among options.")
                continue
            
            # Cue is in presented options
            else:
                # User selects invalid number i.e., out of range
                if sel > n_options:
                    print(f"    Selection {sel} out of range, skipping...")
                # Valid selection
                else:
                    sel -= 1 # Subtract one to get index from friendly value
                    print(f"    Selected: {sync_cue_options[sel]}")
                    
                    # Update peak_list to selected option
                    peak_list = sync_cue_options[sel]
                    
        # Get index in dataframe for the end of the round
        round_start_idx = int(peak_list[0] + (round_start_tolerance * SAMP_PER_MIN))
        round_stop_idx = int(peak_list[0] + ROUND_LEN_SAMPLES - (round_stop_tolerance * SAMP_PER_MIN))
        if round_stop_idx > len(dataframe.index)-1:
            round_top_idx = len(dataframe.index)-1

        # Update dataframe for round
        participant_idx = len(round_windows_df)
        round_windows_df.loc[participant_idx, "loc"] = location_code
        round_windows_df.loc[participant_idx, "round"] = round_no
        round_windows_df.loc[participant_idx, "participant_code"] = create_participant_code(location_code, round_no)
        round_windows_df.loc[participant_idx, "window_start_idx"] = peak_list[i]
        round_windows_df.loc[participant_idx, "window_stop_idx"] = round_stop_idx
        round_windows_df.loc[participant_idx, "ts_start"] = dataframe.loc[peak_list[0], "ts"]
        round_windows_df.loc[participant_idx, "ts_stop"] = dataframe.loc[round_stop_idx, "ts"]
        print("Added round to dataframe of rounds.") 
        print("  ------")
        
    print("============")

print("Found bounds for each round for all locations. Printing results...")
round_windows_df

# Calculate scores per participant

In [None]:
n_participants = len(round_windows_df.index)

scores_df = pd.DataFrame(columns=["participant_code", "score"])

for i in range(n_participants):
    # Get participant location, round no, window boundaries
    participant_data = round_windows_df.iloc[i]
    participant_code = participant_data["participant_code"]
    location_code = participant_data["loc"]
    window_start = participant_data["window_start_idx"]
    window_stop = participant_data["window_stop_idx"]
    
    # Get participant raw data
    participant_raw_data = dataframes_dict[location_code].iloc[window_start:window_stop]
    
    # Calculate mean QoM
    participant_score = participant_raw_data["qom"].mean()
    
    # Update scores_df with score
    scores_df.loc[i, "participant_code"] = participant_code
    scores_df.loc[i, "score"] = participant_score

print(f"# of participants: {scores_df.shape[0]}")
scores_df

## Order scores and show winner

In [None]:
# Order participants by score
scores_df.sort_values("score", inplace=True, ascending=False)
scores_df.reset_index(drop=True, inplace=True)

podium_colours = ["gold", "silver", "peru"]

# Plot scores
plt.figure(figsize=(8,6))
for i, row in scores_df.iterrows():
    if i < 3:
        color=podium_colours[i]
    else:
        color="crimson"
    plt.bar(row["participant_code"], row["score"], color=color)
plt.xlabel("Participant code")
plt.ylabel("Score [mean QoM]")
plt.xticks(rotation=45)
plt.title("Norwegian Championship of Standstill 2024 - Scores")
plt.show()

print(f"WINNER:\t{scores_df.loc[0, 'participant_code']}")
print(f"2ND PL:\t{scores_df.loc[1, 'participant_code']}")
print(f"3RD PL:\t{scores_df.loc[2, 'participant_code']}")