IMPORT DEPENDENCIES AND LIBS

In [23]:
import os
import random
import itertools
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score

In [6]:
def process_folder(folder_path, label):
    """
    Process all files in the given folder and compile data into a DataFrame.

    Parameters:
    - folder_path: Path to the folder containing the files.
    - label: The label to assign to all data from this folder (e.g., 1 for valid, 0 for invalid).
    
    Returns:
    - A DataFrame containing all processed data from the files.
    """
    all_data = []  # List to hold data from all files

    # Iterate over all files in the folder
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        
        # Ensure we're only processing .csv files
        if os.path.isfile(file_path) and file_path.endswith('.csv'):
            # Read the file
            data = pd.read_csv(file_path)
            
            # Filter rows for xBuffer, yBuffer, zBuffer and reset index
            x_data = data[data['Expression'] == 'xBuffer'].reset_index(drop=True)
            y_data = data[data['Expression'] == 'yBuffer'].reset_index(drop=True)
            z_data = data[data['Expression'] == 'zBuffer'].reset_index(drop=True)
            
            # Ensure data is aligned
            min_length = min(len(x_data), len(y_data), len(z_data))
            structured_df = pd.DataFrame({
                'x': x_data['Value'].head(min_length),
                'y': y_data['Value'].head(min_length),
                'z': z_data['Value'].head(min_length),
                'label': label
            })
            
            # Append to the list
            all_data.append(structured_df)
    
    # Combine all data into a single DataFrame
    combined_data = pd.concat(all_data, ignore_index=True)
    
    return combined_data

In [3]:

# Load the valid knock data
valid_data_path = 'data/valid/valid1.csv'
valid_data = pd.read_csv(valid_data_path)

# Display the first few rows of the dataset to understand its structure
valid_data.head()

Unnamed: 0,Level,Expression,Value,Location,Refresh,Access
0,0,extractionBuffer,,0x20005EDC,Off,private
1,1,[0],,0x20005EDC,Off,public
2,2,xBuffer,9.0,0x20005EDC,Off,public
3,2,yBuffer,116.0,0x20005EDE,Off,public
4,2,zBuffer,54.0,0x20005EE0,Off,public


In [4]:

# Extract only the rows that contain acceleration values and reset index to avoid grouping issues
x_data = valid_data[valid_data['Expression'] == 'xBuffer'].reset_index(drop=True)
y_data = valid_data[valid_data['Expression'] == 'yBuffer'].reset_index(drop=True)
z_data = valid_data[valid_data['Expression'] == 'zBuffer'].reset_index(drop=True)

# Ensure we only take as many rows as the shortest among x, y, z to keep data aligned
min_length = min(len(x_data), len(y_data), len(z_data))

# Reconstruct the DataFrame using the aligned data
structured_df_aligned = pd.DataFrame({
    'x': x_data['Value'].head(min_length),
    'y': y_data['Value'].head(min_length),
    'z': z_data['Value'].head(min_length),
    'label': 1  # Label for valid knock
})

structured_df_aligned.head()

Unnamed: 0,x,y,z,label
0,9.0,116.0,54.0,1
1,2.0,-20.0,99.0,1
2,100.0,111.0,-22.0,1
3,33.0,76.0,34.0,1
4,15.0,-25.0,-24.0,1


In [13]:
def structure_sequences(df, label):
    """
    Structures the DataFrame such that each 90-row sequence (representing 30 time points of x, y, z data)
    is treated as a single observation.

    Parameters:
    - df: DataFrame containing the sequences.
    - label: The label for these sequences (1 for valid, 0 for invalid).
    
    Returns:
    - A list of tuples, where each tuple is (sequence, label), and
      each sequence is a (90, ) shape array if flattening or a (30, 3) array if keeping x, y, z separate.
    """
    sequences = []
    num_sequences = len(df) // 90  # Assuming each sequence is exactly 90 rows
    
    for i in range(num_sequences):
        start_idx = i * 90
        sequence = df.iloc[start_idx:start_idx + 90][['x', 'y', 'z']].values.flatten()  # Flattened sequence
        # Alternatively, keep as a (30, 3) array for models that can handle sequence data
        # sequence = df.iloc[start_idx:start_idx + 90][['x', 'y', 'z']].values.reshape((30, 3))
        sequences.append((sequence, label))
    
    return sequences

In [17]:
folder_path_valid = 'data/valid'  
folder_path_invalid = 'data/invalid'  
valid_data = process_folder(folder_path_valid, label=1)
invalid_data = process_folder(folder_path_invalid, label=0)

# Assuming valid_data and invalid_data are already loaded and structured with one file per sequence
valid_sequences = structure_sequences(valid_data, 1)
invalid_sequences = structure_sequences(invalid_data, 0)

all_sequences = valid_sequences + invalid_sequences
random.shuffle(all_sequences)

In [18]:
# Separate features and labels for model training
X = np.array([seq[0] for seq in all_sequences])
y = np.array([seq[1] for seq in all_sequences])

In [24]:
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

In [22]:
def calculate_f1_score(y_true, y_pred):
    """
    Calculate the F1 score between true labels and predictions.

    Parameters:
    - y_true: Array-like of true labels.
    - y_pred: Array-like of predicted labels.
    
    Returns:
    - f1: The F1 score.
    """
    f1 = f1_score(y_true, y_pred)
    return f1

In [21]:
parameter_grid = {
    'KLOPFALGO_EnableThr_s16': [2800, 3000, 3200, 3400, 3600, 3800, 4000], 
    'KLOPFALGO_HfaLatchedCounterThr_u8': [10, 15, 20, 25, 30],
    'KLOPFALGO_VelLatchedCounterThr_u8': [10, 15, 20, 25],
    'KLOPFALGO_HfaVeryHighCounterThr_u8': [3, 5, 7],
    'KLOPFALGO_VelVeryHighCounterThr_u8': [3,5,7],
    'KLOPFALGO_HfaThrZLatched_s16': [7500, 8500, 9500], 
    'KLOPFALGO_VelThrZLatched_s32': [8000, 9000 ,10000],
    'KLOPFALGO_WinkelThrXLatched_s16': [6,8,10], 
    'KLOPFALGO_WinkelThrYLatched_s16': [5000, 6000, 7000], 
    'KLOPFALGO_WinkelThrZLatched_s16': [2900, 3200, 3500, 3800]
}

In [20]:
def evaluate_parameters(params, X_train, y_train, X_val, y_val):
    # This function should implement the following:
    # 1. Run the knock detection algorithm with the given params on X_train.
    # 2. Predict knock events on X_val.
    # 3. Calculate and return the F1 score comparing predictions to y_val.
    # For now, we'll simulate this with a placeholder value.
    return simulated_f1_score

In [None]:
best_score = 0
best_params = {}

for values in itertools.product(*parameter_grid.values()):
    params = dict(zip(parameter_grid.keys(), values))
    
    # Simulate the evaluation of these parameters
    f1_score = evaluate_parameters(params, X_train, y_train, X_val, y_val)
    
    if f1_score > best_score:
        best_score = f1_score
        best_params = params

print("Best Parameters:", best_params)
print("Best F1 Score:", best_score)