# Fibre-sensing classification

## Analiza eksploracyjna

In [None]:
import pandas as pd
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from scipy.fft import rfft, rfftfreq
from scipy.signal import find_peaks
from scipy.stats import kurtosis, skew
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, balanced_accuracy_score
from dataclasses import dataclass
from utils import show_histogram, combine_features_and_labels, print_classification_report, show_confusion_matrix
from sklearn.utils.class_weight import compute_sample_weight
from sklearn.model_selection import RandomizedSearchCV

### Wczytanie zbioru danych z etykietami

In [None]:
train_dataset = pd.read_csv('train_dataset.csv')

train_dataset.head()

### Sprawdzenie klas występujących w zbiorze treningowym

In [None]:
class_mapping = {
    0: 'car',
    1: 'construction works',
    2: 'running',
    3: 'going over the fence',
    4: 'cable manipulation',
    5: 'walking',
    6: 'opening/closing manhole',
    7: 'longboard',
    8: 'regular (background noise)'
}

train_dataset['label'].unique()

In [None]:
class_mapping_df = pd.DataFrame(list(class_mapping.items()), columns=['class_id', 'class_name'])

class_mapping_df

In [None]:
sns.set_theme(style="whitegrid")
plt.figure(figsize=(8, 5))
ax = sns.countplot(data=train_dataset, x='label', palette='viridis', hue='label', legend=False)

plt.title('Liczność poszczególnych klas w zbiorze treningowym')
plt.xlabel('Klasa')
plt.ylabel('Liczba wystąpień')

plt.show()

### Wczytanie zbioru danych z cechami

In [None]:
train_features = np.load('train_features.npy')

In [None]:
print(train_features.shape)
print(len(train_dataset))

In [None]:
train_features[0]

In [None]:
zero_count = np.count_nonzero(train_features == 0)
zero_count

In [None]:
train_features.min(), train_features.max()

In [None]:
plt.figure(figsize=(10, 5))
plt.plot(train_features[0])
plt.title('Feature values for first sample')
plt.xlabel('Time')
plt.ylabel('Strain value')
plt.grid()
plt.show()

In [None]:
row_means = train_features.mean(axis=1)

In [None]:
show_histogram(
    data=row_means, 
    title='Histogram of row means (all samples)', 
    xlabel='Mean feature value', 
    ylabel='Count'
)

In [None]:
row_mins = train_features.min(axis=1)

In [None]:
show_histogram(
    data=row_mins, 
    title='Histogram of row mins (all samples)', 
    xlabel='Min feature value', 
    ylabel='Count'
)

In [None]:
row_maxes = train_features.max(axis=1)

In [None]:
show_histogram(
    data=row_maxes, 
    title='Histogram of row maxes (all samples)', 
    xlabel='Max feature value', 
    ylabel='Count'
)

In [None]:
row_stds = train_features.std(axis=1)

In [None]:
show_histogram(
    data=row_stds, 
    title='Histogram of row stds (all samples)', 
    xlabel='Std feature value', 
    ylabel='Count'
)

## Transformacje danych

### Stworzenie zbioru treningowego dla podstawowego modelu

#### Wybór podstawowych agregatów

In [None]:
basemodel_train_features_df = pd.DataFrame({
    'mean': row_means,
    'min': row_mins,
    'max': row_maxes,
    'std': row_stds,
})

basemodel_train_features_df.head()

In [None]:
train_labels_df = train_dataset[['label']].copy()
train_labels_df.head()

#### Dodanie etykiet

In [None]:
basemodel_train_data_df = combine_features_and_labels(basemodel_train_features_df, train_labels_df)
basemodel_train_data_df.head()

#### Standaryzacja atrybutów

In [None]:
scaler = StandardScaler()
features_df = basemodel_train_data_df.drop('label', axis=1)

X_train, X_val, y_train, y_val = train_test_split(
    basemodel_train_data_df.iloc[:, basemodel_train_data_df.columns != 'label'],
    basemodel_train_data_df['label'],
    test_size=0.2, 
    random_state=42, 
    stratify=basemodel_train_data_df['label']
)

base_X_train = scaler.fit_transform(X_train)
base_X_val = scaler.transform(X_val)

base_X_train = pd.DataFrame(base_X_train, columns=X_train.columns)
base_X_val = pd.DataFrame(base_X_val, columns=X_val.columns)

base_X_train.head()

### Stworzenie zbioru treningowego przetransformowanego do dziedziny częstotliwości

In [None]:
train_features_df = pd.DataFrame(train_features)

train_features_df.head()

In [None]:
fs = 20000 
sample_signal = train_features[2]
N = 8192

clean_signal = sample_signal - np.mean(sample_signal)
windowed_signal = clean_signal * np.hanning(N)

fft_vals = np.abs(rfft(windowed_signal))
xf = rfftfreq(N, d=1/fs) 

fft_vals = (2.0 / N) * fft_vals

peaks, properties = find_peaks(fft_vals, height=np.max(fft_vals)*0.2, distance=20)

plt.figure(figsize=(14, 6))
plt.plot(xf, fft_vals, color='royalblue', label='Signal Strength')
plt.plot(xf[peaks], fft_vals[peaks], "x", color='red', markersize=10, label='Significant Peaks')

for peak in peaks:
    plt.annotate(f"{xf[peak]:.1f} Hz", 
                 (xf[peak], fft_vals[peak]), 
                 textcoords="offset points", 
                 xytext=(0,10), 
                 ha='center', 
                 fontsize=9, 
                 fontweight='bold')

plt.title(f"Frequency Spectrum (Sampling Rate: {fs} Hz)", fontsize=14)
plt.xlabel("Frequency (Hz)", fontsize=12)
plt.ylabel("Amplitude", fontsize=12)
plt.grid(alpha=0.3)
plt.legend()

plt.show()

print(f"Top Significant Frequencies at fs={fs}Hz:")
for p in peaks:
    print(f"- Frequency: {xf[p]:.2f} Hz | Magnitude: {fft_vals[p]:.2e}")

In [None]:
properties['peak_heights']

In [None]:
len(fft_vals), len(xf)

In [None]:
def calculate_fft_features(signal, fs=20000):
    N = len(signal)
    clean_signal = signal - np.mean(signal)
    windowed_signal = clean_signal * np.hanning(N)
    fft_vals = np.abs(rfft(windowed_signal))
    xf = rfftfreq(N, d=1/fs) 
    fft_vals = (2.0 / N) * fft_vals
    
    peaks, properties = find_peaks(fft_vals, height=np.max(fft_vals)*0.2, distance=20)
    
    if len(peaks) > 0:
        top_peak_idx = peaks[np.argmax(properties['peak_heights'])]
        top_freq = xf[top_peak_idx]
    else:
        top_freq = 0
    

    return fft_vals, xf, top_freq

In [None]:
freqs = []

for i in range(len(train_features)):
    fft_vals, xf, top_f = calculate_fft_features(train_features[i])
    freqs.append(top_f)

unique, counts = np.unique(freqs, return_counts=True)

In [None]:
i = 0

freq_counts = zip(unique, counts)
freq_counts = sorted(zip(unique, counts), key=lambda x: x[1], reverse=True)

for k, v in freq_counts:
    if v > 5:
        i += 1

print(i)

filtered_freqs = [(k, v) for k, v in freq_counts if v > 10]
sorted_filtered_freqs = sorted(filtered_freqs, key=lambda x: x[0], reverse=True)

for k, v in sorted_filtered_freqs:
    print(f"Frequency: {k:.2f} Hz | Count: {v}")

plt.figure(figsize=(10, 5))
plt.bar(
    [f[0] for f in sorted_filtered_freqs], 
    [f[1] for f in sorted_filtered_freqs], 
    width=1.0, 
    color='royalblue'
)


In [None]:
filter_freq_below_60 = [(k, v) for k, v in freq_counts if k < 60 and v > 5]

plt.figure(figsize=(10, 5))
plt.bar(
    [f[0] for f in filter_freq_below_60], 
    [f[1] for f in filter_freq_below_60], 
    width=0.5, 
    color='seagreen'
)

In [None]:
def get_extended_features(spectrum_row):
    fft_vals, xf, _ = calculate_fft_features(spectrum_row)
    
    time_signal = spectrum_row 

    features = {}

    # --- FREQUENCY BAND SUMS ---
    features['low_freq_sum']  = np.sum(fft_vals[(xf >= 0) & (xf < 20)])
    features['mid_freq_sum']  = np.sum(fft_vals[(xf >= 20) & (xf < 40)])
    features['high_freq_sum'] = np.sum(fft_vals[(xf >= 40) & (xf < 60)])
    
    # --- SPECTRAL SHAPE FEATURES ---
    total_sum = np.sum(fft_vals) + 1e-9 # avoid division by zero
    
    features['spectral_centroid'] = np.sum(xf * fft_vals) / total_sum
    features['spectral_spread'] = np.sqrt(np.sum(((xf - features['spectral_centroid'])**2) * fft_vals) / total_sum)

    # --- PEAK FEATURES ---
    max_idx = np.argmax(fft_vals)
    features['dominant_freq'] = xf[max_idx]
    features['peak_amplitude'] = fft_vals[max_idx]
    
    # --- RATIO FEATURES (Very helpful for ML) ---
    features['low_to_total_ratio'] = features['low_freq_sum'] / total_sum
    features['low_to_mid_ratio'] = features['low_freq_sum'] / (features['mid_freq_sum'] + 1e-9)

    # --- TIME DOMAIN FEATURES ---
    features['rms'] = np.sqrt(np.mean(time_signal**2))
    
    features['time_kurtosis'] = kurtosis(time_signal)
    features['time_skew'] = skew(time_signal)
    
    features['zcr'] = np.sum(np.diff(np.sign(time_signal)) != 0) / len(time_signal)

    return features

In [None]:
feature_list = [get_extended_features(row) for row in train_features]
df_features = pd.DataFrame(feature_list)

In [None]:
df_features.head()

In [None]:
extended_train_data_df = combine_features_and_labels(df_features, train_dataset['label'])

X_train, X_val, y_train, y_val = train_test_split(
    extended_train_data_df.iloc[:, extended_train_data_df.columns != 'label'],
    extended_train_data_df['label'],
    test_size=0.2, 
    random_state=42, 
    stratify=extended_train_data_df['label']
)

extended_scaler = StandardScaler()

extended_X_train = extended_scaler.fit_transform(X_train)
extended_X_val = extended_scaler.transform(X_val)

extended_X_train = pd.DataFrame(extended_X_train, columns=X_train.columns)
extended_X_val = pd.DataFrame(extended_X_val, columns=X_val.columns)

In [None]:
extended_X_train.head()

In [None]:
@dataclass(frozen=True)
class Dataset:
    X: pd.DataFrame
    y: pd.Series

@dataclass
class DataSplits:
    train: Dataset
    val: Dataset
    test: Dataset = None

base_train_set = Dataset(X=base_X_train, y=y_train)
base_val_set = Dataset(X=base_X_val, y=y_val)
base_data = DataSplits(train=base_train_set, val=base_val_set)

extended_train_set = Dataset(X=extended_X_train, y=y_train)
extended_val_set = Dataset(X=extended_X_val, y=y_val)
data = DataSplits(train=extended_train_set, val=extended_val_set)

## Trening modeli ML

### Model 1 (KNN)

Dane treningowe: podstawowe agregaty wyliczone na surowych danych

* średnia
* min
* max 
* odchylenie standardowe

#### Trening i ewaluacja podstawowego modelu KNN

In [None]:
class KNNClassifierModel:
    def __init__(self, n_neighbors=5):
        self.model = KNeighborsClassifier(n_neighbors=n_neighbors)
    
    def train(self, X, y):
        self.model.fit(X, y)
    
    def predict(self, X):
        return self.model.predict(X)

In [None]:
knn_classifier = KNNClassifierModel(n_neighbors=5)
knn_classifier.train(base_data.train.X, base_data.train.y)
y_pred_knn_base = knn_classifier.predict(base_data.val.X)

In [None]:
print(f"Validation Accuracy (KNN base): {accuracy_score(base_data.val.y, y_pred_knn_base):.2f}")
print(f"Validation Balanced Accuracy (KNN base): {balanced_accuracy_score(base_data.val.y, y_pred_knn_base):.2f}")

In [None]:
print_classification_report(base_data.val.y, y_pred_knn_base, class_mapping)

In [None]:
show_confusion_matrix(base_data.val.y, y_pred_knn_base, class_mapping)

### Model 2 (KNN)

Cechy związane z dziedziną częstotliwości

In [None]:
knn_classifier_extended = KNNClassifierModel(n_neighbors=5)
knn_classifier_extended.train(data.train.X, data.train.y)
y_pred_knn_extended = knn_classifier_extended.predict(data.val.X)

In [None]:
print(f"Validation Accuracy (KNN extended): {accuracy_score(data.val.y, y_pred_knn_extended):.2f}")
print(f"Validation Balanced Accuracy (KNN extended): {balanced_accuracy_score(data.val.y, y_pred_knn_extended):.2f}")

In [None]:
print_classification_report(data.val.y, y_pred_knn_extended, class_mapping)

In [None]:
show_confusion_matrix(data.val.y, y_pred_knn_extended, class_mapping)

### Model 3 (XGBoost)

Predykcja

In [None]:
xgb_classifier = XGBClassifier(n_estimators=100, learning_rate=0.1, random_state=42, eval_metric='logloss')
xgb_classifier.fit(data.train.X, data.train.y)
y_pred_xgb = xgb_classifier.predict(data.val.X)

Wyniki

In [None]:
print(f"Validation Accuracy (XGBoost): {accuracy_score(data.val.y, y_pred_xgb):.2f}")
print(f"Validation Balanced Accuracy (XGBoost): {balanced_accuracy_score(data.val.y, y_pred_xgb):.2f}")

In [None]:
print_classification_report(data.val.y, y_pred_xgb, class_mapping)

## Tuning XGBoost

### Zwiększenie wag dla mniej licznych klas

In [None]:
sample_weights = compute_sample_weight(
    class_weight='balanced',
    y=data.train.y
)

print(f"Przykładowe wagi: {sample_weights}")

### Tuning hiperparametrów

In [None]:
param_dist = {
    'n_estimators': [100, 200, 300, 400, 500],
    'learning_rate': [0.01, 0.05, 0.1, 0.2],
    'max_depth': [3, 5, 7, 9],
    'min_child_weight': [1, 3, 5],
    'subsample': [0.7, 0.8, 0.9, 1.0],
    'colsample_bytree': [0.6, 0.8, 1.0],
}

xgb_clf = XGBClassifier(
    objective='multi:softmax',
    num_class=9,
    random_state=42,
    eval_metric='mlogloss'
)

random_search = RandomizedSearchCV(
    estimator=xgb_clf,
    param_distributions=param_dist,
    n_iter=20,
    scoring='balanced_accuracy',
    cv=3,
    verbose=1,
    random_state=42,
    n_jobs=16
)

print("Rozpoczynam tuning...")
random_search.fit(
    X_train, 
    y_train, 
    sample_weight=sample_weights
)

print(f"\nNajlepsze parametry: {random_search.best_params_}")
print(f"Najlepszy wynik balanced_accuracy: {random_search.best_score_:.4f}")

### Model z najlepszymi hiperparametrami

In [None]:
best_params = random_search.best_params_

xgb_tuned = XGBClassifier(
    **best_params,
    objective='multi:softmax',
    num_class=9,
    eval_metric='mlogloss',
    random_state=42,
    n_jobs=-1
)

xgb_tuned.fit(data.train.X, data.train.y)
y_pred_xgb_tuned = xgb_tuned.predict(data.val.X)
print_classification_report(data.val.y, y_pred_xgb_tuned, class_mapping)

In [None]:
print(f"Validation Balanced Accuracy (XGBoost) after tunning: {balanced_accuracy_score(data.val.y, y_pred_xgb_tuned):.2f}")

## Predykcja na danych testowych

### Wczytanie zbioru danych z cechami

In [None]:
test_features = np.load('test_features.npy')
test_features.shape

### Zastosowanie tych samych transformacji co dla danych treningowych

In [None]:
feature_list = [get_extended_features(row) for row in test_features]
X_test = pd.DataFrame(feature_list)

extended_X_test = extended_scaler.transform(X_test)
extended_X_test = pd.DataFrame(extended_X_test, columns=X_test.columns)

extended_X_test.head()

### Predykcja etykiet

In [None]:
extended_knn_y_pred = knn_classifier_extended.predict(extended_X_test)
xgb_y_pred = xgb_classifier.predict(extended_X_test)
xgb_tuned_y_pred = xgb_tuned.predict(extended_X_test)

### Zapis wyniku w wymaganym formacie

In [None]:
def save_predictions_to_csv(predictions, filename):
    submission_df = pd.DataFrame({
        'data_id': np.arange(len(predictions)),
        'label': predictions
    })
    submission_df.to_csv(filename, index=False)

In [None]:
save_predictions_to_csv(extended_knn_y_pred, 'submission_knn.csv')
save_predictions_to_csv(xgb_y_pred, 'submission_xgb.csv')
save_predictions_to_csv(xgb_tuned_y_pred, 'submission_xgb_tuned.csv')