In [20]:
import numpy as np
from scipy.signal import butter, filtfilt, iirnotch, hilbert
import scipy.signal as signal
import pandas as pd
import glob
import os
from scipy.io import loadmat
import matplotlib.pyplot as plt

In [21]:
class LowPassFilter:
    def __init__(self, order=2, fc_bp=4, freq=1000):
        nyq = 0.5 * freq
        low = fc_bp / nyq
        self.b, self.a = butter(order, low, btype='lowpass', output='ba')
    
    def filtfilt(self, x):
        return filtfilt(self.b, self.a, x)

class HighPassFilter:
    def __init__(self, order=2, fc_bp=30, freq=1000):
        nyq = 0.5 * freq
        high = fc_bp / nyq
        self.b, self.a = butter(order, high, btype='highpass', output='ba')
    
    def filtfilt(self, x):
        return filtfilt(self.b, self.a, x)
    
class NotchFilter:
    def __init__(self, f0=60, freq=1000):
        Q = 30.0  # Quality factor
        self.b, self.a = iirnotch(f0, Q, freq)
    
    def filtfilt(self, x):
        return filtfilt(self.b, self.a, x)

In [22]:
class EMGProcessor:
    def __init__(self, sampling_rate=1000, highpass_order=4, highpass_fc=30,
                 lowpass_order=4, lowpass_fc=6, notch_f0=60):
        self.h_filter = HighPassFilter(order=highpass_order, fc_bp=highpass_fc, freq=sampling_rate)
        self.l_filter = LowPassFilter(order=lowpass_order, fc_bp=lowpass_fc, freq=sampling_rate)
        # self.n_filter = NotchFilter(f0=notch_f0, freq=sampling_rate)

    def process(self, data):
        filted_emg = self.h_filter.filtfilt(data)
        rect_emg = np.abs(filted_emg)
        envelope = self.l_filter.filtfilt(rect_emg)
        return envelope
    
    def normalize_data(self, data):
        min_val = np.min(data)
        max_val = np.max(data)
        if max_val == min_val:
            return data
        return (data - min_val) / (max_val - min_val)

In [23]:
filename = 'data/path1_01/sorted_subset_data.csv'
data = np.loadtxt(filename, delimiter=',', skiprows=1, usecols=range(8))
print(data)

[[-4426.43 -1751.92  -748.94 ... -1810.84   -97.54  -709.96]
 [-4424.5  -1771.42  -745.33 ... -1811.     -95.66  -710.11]
 [-4423.54 -1810.57  -744.3  ... -1811.26   -94.37  -709.88]
 ...
 [-4476.3  -1371.54  -776.09 ... -1594.37   -40.48  -853.51]
 [-4482.13 -1356.6   -775.54 ... -1594.98   -42.17  -852.5 ]
 [-4488.04 -1343.23  -774.66 ... -1594.15   -35.25  -849.23]]


In [24]:
data.shape

(19375, 8)

In [62]:
data = data.T

processor = EMGProcessor()
for i in range(8):
    data[i] = processor.process(data[i])
    data[i] = processor.normalize_data(data[i])

fs = 1000  # 原始採樣率為 1000 Hz
target_fs = 30  # 目標採樣率為 30 Hz

# 計算重採樣的比例
resample_ratio = target_fs / float(fs)
new_length = int(np.ceil(data.shape[1] * resample_ratio))


# 為每個欄位進行重新採樣，結果轉置為 (8, new_length)
resampled_data = np.zeros((data.shape[0], new_length))

for i in range(data.shape[0]):
    # 對每個通道進行重新採樣
    resampled_data[i, :] = signal.resample(data[:, i], new_length)

print(resampled_data.shape)

(8, 582)


In [63]:
resampled_data = resampled_data.T

# 標籤
labels = ["LGL", "RGL", "LTA", "RTA", "LBF", "RBF", "LRF", "RRF"]

# 輸出檔案名稱
output_filename = 'resampled_data.sto'

# 寫入 .sto 檔案
with open(output_filename, 'w') as f:
    # 寫入標頭
    f.write("Coordinates\n")
    f.write("version=1\n")
    f.write("nRows={}\n".format(resampled_data.shape[0]))
    f.write("nColumns={}\n".format(resampled_data.shape[1] + 1))  # 加上時間列
    f.write("inDegrees=no\n")
    f.write("endheader\n")
    
    # 寫入標籤（時間和各個數據列標籤）
    f.write("time\t" + "\t".join(labels) + "\n")
    
    # 寫入數據部分
    for i, row in enumerate(resampled_data):
        time = i / 30.0  # 假設 30 Hz 的目標頻率，生成時間列
        row_str = "\t".join([f"{x:.6f}" for x in row])
        f.write(f"{time:.6f}\t{row_str}\n")

print(f"數據已保存到 {output_filename}")

數據已保存到 resampled_data.sto
