In [9]:
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from scipy.signal import stft
import matplotlib.pyplot as plt
from neurodsp.utils import create_times

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

# 샘플 데이터 생성
fpath = "../../data/DNN_data/v3/"

x_data = np.load(fpath + "data_with_artifact_test" + ".npy")
y_data = np.load(fpath + "data_signal" + ".npy")

print(x_data.shape)
print(y_data.shape)

cuda:0
(1000, 4000)
(1000, 4000)


In [10]:
fs = 2000
num_signals = 1000
n_seconds = 2
times = create_times(n_seconds, fs)

fft_win_time = 0.5  # 윈도우 길이 (초)
n_fft = int(fft_win_time * fs)  # FFT 윈도우 크기, 1000 샘플
fft_overlap_time = int(fft_win_time / 2 * fs)  # 오버랩 크기, 500 샘플, 50%
freqs = torch.fft.rfftfreq(n_fft, 1 / fs)

In [11]:
# stft
x_stft = []
def compute_stft(data, fs, fft_win_time):
    n_fft = int(fft_win_time * fs)
    fft_overlap_time = int(fft_win_time / 2 * fs)
    
    stft_results = []
    for sample in data:
        # torch.tensor로 변환할 때 dtype=torch.float32로 명시적으로 지정
        sample_tensor = torch.tensor(sample, dtype=torch.float32)
        Zxx = torch.stft(sample_tensor, n_fft, hop_length=n_fft - fft_overlap_time, return_complex=True)
        real_part = Zxx.real.unsqueeze(-1)  # 마지막 차원에 차원 추가 (frequency_bins, time_frames, 1)
        imag_part = Zxx.imag.unsqueeze(-1)  # 마지막 차원에 차원 추가 (frequency_bins, time_frames, 1)
        complex_tensor = torch.cat((real_part, imag_part), dim=-1)  # 마지막 차원을 기준으로 복소수 텐서 생성 (frequency_bins, time_frames, 2)
        # stft_results.append(Zxx)
        stft_results.append(complex_tensor)

    # 넘파이 배열로 변환
    stft_results_np = np.array(stft_results)
    
    return stft_results_np

x_stft = compute_stft(x_data, fs, fft_win_time)
y_stft = compute_stft(y_data, fs, fft_win_time)

print(f"x_data shape: {x_data.shape}")
print(f"x_stft shape: {x_stft.shape}")
print(f"x_data[0] shape: {x_data[0].shape}")
print(f"x_stft[0] shape: {x_stft[0].shape}")

print(f"y_data shape: {y_data.shape}")
print(f"y_stft shape: {y_stft.shape}")
print(f"y_data[0] shape: {y_data[0].shape}")
print(f"y_stft[0] shape: {y_stft[0].shape}")

print(f"x_stft: {x_stft[0][0]}")
print(f"y_stft: {y_stft[0][0][0]}")

# 각 요소는 (frequency_bins, time_frames, 2)의 복소수(실수, 허수 성분) 텐서입니다.
# x_data[0] shape: (4000,)
# x_stft[0] shape: (1000, 501, 9)

x_data shape: (1000, 4000)
x_stft shape: (1000, 501, 9, 2)
x_data[0] shape: (4000,)
x_stft[0] shape: (501, 9, 2)
y_data shape: (1000, 4000)
y_stft shape: (1000, 501, 9, 2)
y_data[0] shape: (4000,)
y_stft[0] shape: (501, 9, 2)
x_stft: [[  505.7691      0.     ]
 [ 1306.8297      0.     ]
 [ 1027.0393      0.     ]
 [ -143.7783      0.     ]
 [  157.88266     0.     ]
 [  435.17474     0.     ]
 [  282.8852      0.     ]
 [ -930.9418      0.     ]
 [-2116.931       0.     ]]
y_stft: [-3.4719515  0.       ]


In [12]:
# # power spectrum
# power_spectrum = np.abs(Zxx) ** 2  # 파워 스펙트럼 계산


In [13]:
# scaler = StandardScaler()
# scaler = MinMaxScaler()

# X = scaler.fit_transform(x_stft.T)
# Y = (y_stft.T-scaler.mean_)/np.sqrt(scaler.var_) # X와 동일한 평균, 표준 편차로 scaling

# X = X.T
# Y = Y.T

# print(X.shape)
# print(Y.shape) 

In [14]:
# Data preprocessing
X_train, X_test, y_train, y_test = train_test_split(x_stft, y_stft, test_size=0.2, random_state=42)

print("X_train shape: {}, y_train shape: {}".format(X_train.shape, y_train.shape))
print("X_test shape: {}, y_test shape: {}".format(X_test.shape, y_test.shape))

train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32))
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32))

train_loader = DataLoader(dataset=train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=32, shuffle=False)

X_train shape: (800, 501, 9, 2), y_train shape: (800, 501, 9, 2)
X_test shape: (200, 501, 9, 2), y_test shape: (200, 501, 9, 2)


In [15]:
# 모델 정의
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=2, out_channels=8, kernel_size=(3, 3), stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=(3, 3), stride=1, padding=1)
        self.gelu = nn.GELU()

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.gelu(x)
        return x

model = SimpleCNN().to(device)
print(model)

SimpleCNN(
  (conv1): Conv2d(2, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(8, 2, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (gelu): GELU(approximate='none')
)


In [16]:
# 모델 훈련
learning_rate = 0.01
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

num_epochs = 200

for epoch in range(num_epochs):
    for batch in train_loader:   
        x = batch[0] # x : B x 1 x T
        y = batch[1] # y : B x T
        x, y = x.to(device), y.to(device)
        
        y_pred = model(x) # y_pred : B x T
        loss = criterion(y_pred,y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if (epoch+1)%5 == 0:
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')

RuntimeError: Given groups=1, weight of size [8, 2, 3, 3], expected input[32, 501, 9, 2] to have 2 channels, but got 501 channels instead

In [None]:
# # 모델 평가
# Contaminated = torch.tensor([])
# Clean = torch.tensor([])
# SACed = torch.tensor([])

# model.eval()
# with torch.no_grad():
#     for batch in test_loader:
#         x = batch[0] # x : 
#         y = batch[1] # y : 
#         x, y = x.to(device), y.to(device)
        
#         y_pred = model(x) # y_pred :
        
#         Contaminated = torch.cat((Contaminated, x.squeeze().cpu()), 0)
#         SACed = torch.cat((SACed, y_pred.cpu()), 0)
#         Clean = torch.cat((Clean, y.cpu()), 0)

# val_loss = criterion(Clean, SACed)
# print(f'Validation Loss: {val_loss.item()}')    

In [None]:
# # 결과
# print(f"Mean Absolute Error: {mean_absolute_error(Clean_signal, SACed_signal)}")
# print(f"Mean Squared Error: {mean_squared_error(Clean_signal, SACed_signal)}")