In [1]:
import matplotlib.pyplot as plt
import os, glob
import numpy as np
from scipy.fft import fft, fftfreq
import scipy.signal
import pywt
from scipy.signal import butter, lfilter
import json
from scipy.stats import kurtosis, skew


In [5]:
def read_file(file_path):

    with open(file_path, 'r') as file:
        data = json.load(file)
        piezo_data = data.get('piezo', [])
        piezo_set = np.array(piezo_data)
        mic_data = data.get('mic', [])
        mic_set = np.array(mic_data)

        denoised_piezo_data = wavelet_denoise(piezo_set, wavelet='db2', level=2) # 这里使用了Daubechies小波（'db1'），也就是Haar小波，分解层数为1。
        denoised_mic_data = wavelet_denoise(mic_set, wavelet='db2', level=2)
        return denoised_piezo_data, denoised_mic_data
    # return piezo_set, mic_set

def signal_visualization(piezo, mic, filename):
    plt.figure()
    plt.plot(piezo, linestyle='-', label='piezo Data') #可视化一个拳头敲击周期的动作
    plt.plot(mic, linestyle='-', label='mic Data')
    plt.title(f'Sensor Data Over Time for {filename}')
    plt.xlabel('Time (arbitrary units)')
    plt.ylabel('Sensor Value')
    # plt.ylim(0,6000)
    plt.legend()
    plt.grid(True)
    plt.show()

def fft_visualization(signal, fft_result, filename):

    fs = 6000
    
    frequencies = np.fft.fftfreq(len(fft_result), 1/fs)
    magnitude = np.abs(fft_result)

    plt.figure(figsize=(14, 7))
    plt.plot(frequencies, magnitude)
    plt.title(f'FFT of {filename}')
    plt.xlabel('Frequency (Hz)')
    plt.ylabel('Magnitude')
    plt.grid(True)
    plt.xlim(0, fs / 2)  # Limit x-axis to positive frequencies only up to Nyquist frequency
    plt.show()

    return frequencies, magnitude

def fft_features(fft_results):
    # Assume fft_results is an array of FFT coefficients
    magnitude_spectrum = np.abs(fft_results)
    print(magnitude_spectrum)
    
    # Spectral Centroid： Indicates the "center of mass" of the spectrum, giving an idea of the "brightness" of a sound.
    spectral_centroid = np.sum(magnitude_spectrum * np.arange(len(magnitude_spectrum))) / np.sum(magnitude_spectrum)
    
    # Spectral Rolloff：The frequency below which a certain percentage of the total spectral energy (commonly 85% or 95%) is contained, which helps in differentiating between harmonic content and noise.
    spectral_rolloff_threshold = 0.85 * np.sum(magnitude_spectrum)
    cumulative_sum = np.cumsum(magnitude_spectrum)
    spectral_rolloff = np.where(cumulative_sum >= spectral_rolloff_threshold)[0][0]
    
    # Spectral Flux：Measures the amount of local spectral change between successive frames, useful for detecting events.
    spectral_flux = np.sum((np.diff(magnitude_spectrum) ** 2))
    
    # Total Spectral Energy：Sum of squares of the FFT coefficients can serve as a measure of the overall signal energy.
    total_spectral_energy = np.sum(magnitude_spectrum ** 2)
    
    return {
        'spectral_centroid': spectral_centroid,
        'spectral_rolloff': spectral_rolloff,
        'spectral_flux': spectral_flux,
        'total_spectral_energy': total_spectral_energy
    }

def wavelet_features(signal):
    wavelet_transform = scipy.signal.cwt(signal, scipy.signal.ricker, widths=np.arange(1, 31))
    wavelet_energy = np.sum(wavelet_transform**2, axis=0)
    
    features = {
        "wavelet_energy": wavelet_energy
    }
    return features


def wavelet_denoise(data, wavelet, level):
    # 小波变换
    coeff = pywt.wavedec(data, wavelet, mode='per', level=level)
    
    # 计算阈值
    sigma = np.median(np.abs(coeff[-1])) / 0.6745
    threshold = sigma * np.sqrt(2 * np.log(len(data)))
    
    # 阈值处理
    coeff[1:] = [pywt.threshold(i, value=threshold, mode='soft') for i in coeff[1:]]
    
    # 重构信号
    reconstructed_signal = pywt.waverec(coeff, wavelet, mode='per')
    return reconstructed_signal

def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = scipy.signal.butter(order, [low, high], btype='band')
    y = scipy.signal.lfilter(b, a, data)
    return y

def butter_bandstop_filter(data, lowcut, highcut, fs, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = scipy.signal.butter(order, [low, high], btype='bandstop')
    y = scipy.signal.lfilter(b, a, data)
    return y


def load_data():
    features, labels = [], []
    # count = 0
    for file in glob.glob("../data/combined_new/*/*.json"):
        file_name = os.path.basename(file)
        label = os.path.dirname(file).split("/")[3]
        # print(file_name,label)
        piezo_signal, mic_signal = read_file(file)
        # piezo_signal_centered = piezo_signal - np.mean(piezo_signal)
        # mic_signal_centered = mic_signal - np.mean(mic_signal)
        
        piezo_fft_result = np.fft.fft(piezo_signal)
        mic_fft_result = np.fft.fft(mic_signal)
        # print(wavelet_features(piezo_signal))
        # print(fft_features(piezo_fft_result))

        piezo_spectral_feature = fft_features(piezo_fft_result)
        piezo_spectral_feature = np.array(list(piezo_spectral_feature.values()))
        piezo_wavelet_feature = wavelet_features(piezo_signal)['wavelet_energy']
        # piezo_feature = np.concatenate((piezo_spectral_feature, piezo_wavelet_feature,np.abs(piezo_fft_result)))
        piezo_feature = np.abs(piezo_fft_result)
        # print(piezo_spectrial_feature_values)

        mic_spectral_feature = fft_features(mic_fft_result)
        mic_spectral_feature = np.array(list(mic_spectral_feature.values()))
        mic_wavelet_feature = wavelet_features(mic_signal)['wavelet_energy']
        # mic_feature = np.concatenate((mic_spectral_feature, mic_wavelet_feature,np.abs(piezo_fft_result)))
        mic_feature = np.abs(mic_fft_result)

        combined_feature = np.concatenate((piezo_feature, mic_feature))

        features.append(combined_feature)
        labels.append(label)
    # print(features)
    return features, labels

In [3]:
# for file in glob.glob("../data/combined_new/water-bottle/*.json"):
#     file_name = os.path.basename(file)
#     piezo_signal, mic_signal = read_file(file)
#     signal_visualization(piezo_signal, mic_signal, file_name)

In [6]:
from sklearn.preprocessing import StandardScaler

# Assuming load_data() loads your features and labels
features, labels = load_data()

# Now padded_features should be convertible to a numpy array
features_array = np.array(features)
labels_array = np.array(labels)

# Proceed with scaling or other ML preparations
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features_array)

ValueError: Expected 2D array, got 1D array instead:
array=[].
Reshape your data either using array.reshape(-1, 1) if your data has a single feature or array.reshape(1, -1) if it contains a single sample.

In [7]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report

# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(features_scaled, labels, test_size=0.25, random_state=42)

# Train a model
model = RandomForestClassifier(n_estimators=1000, random_state=42)
model.fit(X_train, y_train)

# Predict and evaluate the model
predictions = model.predict(X_test)
print(classification_report(y_test, predictions))

# # Output the predictions and their corresponding true labels
# for i, (pred, label) in enumerate(zip(predictions, y_test)):
#     print(f"Sample {i}: Prediction = {pred}, True Label = {label}")

print(f'Support RandomForestClassifier\'s accuracy on training set is {100*model.score(X_train, y_train):.2f}%')
print(f'Support RandomForestClassifier\'s accuracy on test set is {100*model.score(X_test, y_test):.2f}%\n')


NameError: name 'features_scaled' is not defined

In [8]:
from sklearn.metrics import accuracy_score

features=[]
labels=[]
for file in glob.glob("data/5-23-gestures/*.json"):
    with open(file, 'r') as f:
        data = json.load(f)
        for d in data:
            features.append(d)
            labels.append(os.path.basename(file).split('.')[0])

# Now padded_features should be convertible to a numpy array
features_array = np.array(features)
labels_array = np.array(labels)

# Proceed with scaling or other ML preparations
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features_array)

# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(features_array, labels, test_size=0.25, random_state=42)

# Train a model
model = RandomForestClassifier(n_estimators=1000, random_state=42)
model.fit(X_train, y_train)


# 预测并打印概率
def predict_with_threshold(model, X, threshold=0.6):
    probas = model.predict_proba(X)
    max_probas = np.max(probas, axis=1)
    predictions = model.predict(X)
    results = []
    for pred, max_proba, proba in zip(predictions, max_probas, probas):
        if max_proba >= threshold:
            result = (pred, max_proba, proba)
        else:
            result = ("None", max_proba, proba)
        results.append(result)
        print(f"Prediction: {result[0]}, Max Proba: {result[1]:.4f}, All Probas: {result[2]}")
    return [result[0] for result in results]

# 设置置信度阈值
threshold = 0.6
# 对测试集进行预测
y_pred = predict_with_threshold(model, X_test, threshold=threshold)


# 计算准确度（排除返回 "None" 的情况）
filtered_y_test = [y for y, pred in zip(y_test, y_pred) if pred != "None"]
filtered_y_pred = [pred for pred in y_pred if pred != "None"]
accuracy = accuracy_score(filtered_y_test, filtered_y_pred)
print(f'Accuracy: {accuracy}')

# 输出预测结果
print(y_pred)


Prediction: None, Max Proba: 0.5930, All Probas: [0.205 0.593 0.202]
Prediction: None, Max Proba: 0.5890, All Probas: [0.178 0.233 0.589]
Prediction: None, Max Proba: 0.5790, All Probas: [0.115 0.579 0.306]
Prediction: Tap, Max Proba: 0.6630, All Probas: [0.22  0.117 0.663]
Prediction: Single-Finger-Tap, Max Proba: 0.7910, All Probas: [0.181 0.791 0.028]
Prediction: None, Max Proba: 0.4950, All Probas: [0.495 0.196 0.309]
Prediction: Tap, Max Proba: 0.6440, All Probas: [0.23  0.126 0.644]
Prediction: None, Max Proba: 0.4840, All Probas: [0.208 0.308 0.484]
Prediction: Tap, Max Proba: 0.6350, All Probas: [0.111 0.254 0.635]
Prediction: Fist, Max Proba: 0.7760, All Probas: [0.776 0.126 0.098]
Prediction: None, Max Proba: 0.5730, All Probas: [0.212 0.573 0.215]
Prediction: None, Max Proba: 0.4110, All Probas: [0.383 0.206 0.411]
Prediction: None, Max Proba: 0.5650, All Probas: [0.182 0.565 0.253]
Prediction: None, Max Proba: 0.5820, All Probas: [0.361 0.582 0.057]
Prediction: Fist, Max Pr