In [1]:
import os
import pandas as pd
import numpy as np
import time
from scipy.signal import hilbert
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sktime.transformations.panel.rocket import MiniRocketMultivariateVariable, MiniRocketMultivariate
from sklearn.linear_model import RidgeClassifierCV
from module import preprocess_bluetooth_signals, my_improved_bayesian_change_point_detection

In [2]:
# Function to load data
def load_data(root_dir):
    X = []
    y = []
    for root, dirs, files in os.walk(root_dir):
        for file in files:
            if file.endswith('.txt'):
                file_path = os.path.join(root, file)
                with open(file_path, 'r') as f:
                    values = np.array([float(line.strip()) for line in f])
                X.append(pd.Series(values))           
                # Use folder name as label
                label = os.path.relpath(root, root_dir)
                y.append(label)
    
    df = pd.DataFrame({'signal': X, 'label': y})
    return df

In [3]:
# Set the root directory
root_directory = os.path.join(os.path.join(os.getcwd(), 'Bluetooth Datasets'), 'Dataset 5 Gsps')

In [4]:
# Load the data
data = load_data(root_directory)

In [5]:
preprocessed_data = preprocess_bluetooth_signals(data, signal_column='signal', dataset='A')

Preprocessing 2548 signals from dataset A...
Processing signal 1/2548...
Processing signal 1001/2548...
Processing signal 2001/2548...
Preprocessing complete!


In [6]:
start_time = time.perf_counter()

In [7]:
preprocessed_data['noise_removed'] = None
for idx, row in preprocessed_data.iterrows():
    analytic_signal = row['analytic_signal']
    original_signal = row['signal']
    start_idx, end_idx, _, _ = my_improved_bayesian_change_point_detection(analytic_signal, window_size=600, overlap=0.65, start_threshold=10, end_threshold=2)
    preprocessed_data.at[idx, 'noise_removed'] = original_signal[start_idx:]

In [8]:
# Find the minimum length among all Series in the column
min_len = min(s.size for s in preprocessed_data['noise_removed'])

# Truncate each Series in a loop and assign back as a list
truncated = [s.iloc[:min_len] for s in preprocessed_data['noise_removed']]
preprocessed_data['truncated'] = truncated

In [9]:
real_parts = []
imag_parts = []

for i in range(len(preprocessed_data)):
    signal = preprocessed_data.loc[i, 'truncated']
    analytic = hilbert(signal.values)
    real_parts.append(pd.Series(np.real(analytic)))
    imag_parts.append(pd.Series(np.imag(analytic)))

preprocessed_data['real'] = real_parts
preprocessed_data['imaginary'] = imag_parts

In [10]:
# Split the data into training and test sets, ensuring 30 examples per label for the test set
train_data = pd.DataFrame()
test_data = pd.DataFrame()

In [11]:
for label, group in preprocessed_data[['label', 'real', 'imaginary']].groupby('label'):
    train, test = train_test_split(group, test_size=30, random_state=0)
    train_data = pd.concat([train_data, train])
    test_data = pd.concat([test_data, test])

In [12]:
# Separate the features (signal data) and labels for training and test sets
X_train = train_data[['real', 'imaginary']]
y_train_series = train_data['label']
y_train = y_train_series.values
X_test = test_data[['real', 'imaginary']]
y_test_series = test_data['label']
y_test = y_test_series.values

In [13]:
# Setup pipeline
bluetooth_pipeline = make_pipeline(
    MiniRocketMultivariate(
        random_state=42,
        max_dilations_per_kernel=32,
        n_jobs=-1
    ),
    RidgeClassifierCV(alphas=np.logspace(-3, 3, 10))
)

In [14]:
# Fit the pipeline
bluetooth_pipeline.fit(X_train, y_train)

In [15]:
# Evaluate the pipeline
accuracy = bluetooth_pipeline.score(X_test, y_test)
print(f"Test Accuracy: {accuracy:.3f}")

Test Accuracy: 0.931


In [16]:
end_time = time.perf_counter()
print(f'Execution time: {end_time - start_time:.6f} seconds')

Execution time: 32.177400 seconds


In [17]:
# Predict labels for the test set
y_pred = bluetooth_pipeline.predict(X_test)

# Create a DataFrame to compare true and predicted labels
results = pd.DataFrame({
    'True Label': y_test,
    'Predicted Label': y_pred
})

# Identify incorrect predictions
incorrect_predictions = results[results['True Label'] != results['Predicted Label']]

# Print incorrect predictions
print("Incorrect Predictions:")
print(incorrect_predictions)

Incorrect Predictions:
                                          True Label  \
4                Iphone/5/013409009258565_gamze_uyuk   
6                Iphone/5/013409009258565_gamze_uyuk   
14               Iphone/5/013409009258565_gamze_uyuk   
23               Iphone/5/013409009258565_gamze_uyuk   
138           Iphone/6/354427066558690_mustafa_guclu   
188            Iphone/6s/353308076325778_tugce_ozkan   
204            Iphone/6s/353308076325778_tugce_ozkan   
215           Iphone/6s/355396082974273_melisa_oktem   
217           Iphone/6s/355396082974273_melisa_oktem   
274          LG/G4/352334072148270_mertcan_yurtseven   
282          LG/G4/352334072148270_mertcan_yurtseven   
285          LG/G4/352334072148270_mertcan_yurtseven   
292          LG/G4/352334072148270_mertcan_yurtseven   
293          LG/G4/352334072148270_mertcan_yurtseven   
295          LG/G4/352334072148270_mertcan_yurtseven   
297          LG/G4/352334072148270_mertcan_yurtseven   
299          LG/G4/352334