<a href="https://colab.research.google.com/github/Eunchae-L/Denoising/blob/main/denoising.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import librosa
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.nn.parallel import DataParallel
from torch.utils.data import DataLoader, Dataset, TensorDataset

training_dataset_path = '/content/drive/MyDrive/고려대학교/Audio_detection/denoising/Data_Sources/training_dataset1.npy'

In [None]:
class CustomModel(nn.Module):
    def __init__(self, n_fft, hop_length):
        super(CustomModel, self).__init__()

        self.conv_blocks = nn.Sequential(
            nn.Conv2d(2, 12, kernel_size=13, padding=6), nn.ReLU(), nn.BatchNorm2d(12),
            nn.Conv2d(12, 16, kernel_size=11, padding=5), nn.ReLU(), nn.BatchNorm2d(16),
            nn.Conv2d(16, 20, kernel_size=9, padding=4), nn.ReLU(), nn.BatchNorm2d(20),
            nn.Conv2d(20, 24, kernel_size=7, padding=3), nn.ReLU(), nn.BatchNorm2d(24),
            nn.Conv2d(24, 32, kernel_size=7, padding=3), nn.ReLU(), nn.BatchNorm2d(32),
            nn.Conv2d(32, 24, kernel_size=7, padding=3), nn.ReLU(), nn.BatchNorm2d(24),
            nn.Conv2d(24, 20, kernel_size=9, padding=4), nn.ReLU(), nn.BatchNorm2d(20),
            nn.Conv2d(20, 16, kernel_size=11, padding=5), nn.ReLU(), nn.BatchNorm2d(16),
            nn.Conv2d(16, 12, kernel_size=13, padding=6), nn.ReLU(), nn.BatchNorm2d(12),
        )

        self.additional_conv = nn.Conv2d(12, 2, kernel_size=129, padding=64)

        self.n_fft = n_fft
        self.hop_length = hop_length

    def forward(self, x):
        x_stft = torch.stft(x, n_fft=self.n_fft, hop_length=self.hop_length, return_complex=False)
        x_stft = x_stft.permute(0, 3, 1, 2)

        x_stft = self.conv_blocks(x_stft)
        x_stft = self.additional_conv(x_stft)

        x_real = x_stft[:, 0, :, :]
        x_imag = x_stft[:, 1, :, :]
        x_stft = torch.view_as_complex(torch.stack([x_real, x_imag], dim=-1))

        x_istft = torch.istft(x_stft, n_fft=self.n_fft, hop_length=self.hop_length, return_complex=False, length=len(x[0]))

        return x_istft

In [None]:
class SNRMetric:
    def __init__(self):
        self.batch_snr_list = []  # 存储每个 batch 中每个样本的 SNR

    def update(self, clean_audio, denoised_audio):
        # 计算每个样本的SNR并存储在列表中
        snr = 10 * torch.log10(torch.sum(clean_audio**2, dim=1) / torch.sum((clean_audio - denoised_audio)**2, dim=1))
        self.batch_snr_list.extend(snr.detach().cpu().numpy())  # 将 Tensor 转换为 NumPy 数组并存储在列表中

    def compute(self):
        # 计算所有样本的平均 SNR
        avg_snr = np.mean(self.batch_snr_list) if len(self.batch_snr_list) > 0 else 0.0

        # 重置 batch_snr_list
        self.batch_snr_list = []

        return avg_snr

In [None]:
training_dataset = np.load(training_dataset_path)
noisy_data = np.array([item[0] for item in training_dataset])
clean_data = np.array([item[1] for item in training_dataset])
# 转换输入数据为张量
noisy_data = torch.from_numpy(noisy_data)
clean_data = torch.from_numpy(clean_data)

In [None]:
# STFT参数
n_fft = 1023
hop_length = 500

# 创建数据集和数据加载器
dataset = TensorDataset(noisy_data, clean_data)
batch_size = 40  # 设置批次大小
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# 定义模型、损失函数和优化器
model = CustomModel(n_fft, hop_length)
criterion = nn.MSELoss()  # 使用均方误差损失
optimizer = optim.Adam(model.parameters(), lr=0.001)  # 使用 Adam 优化器

#Use Gpus
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if torch.cuda.device_count() > 1:
  print("Using", torch.cuda.device_count(), "GPUs!")
  model = nn.DataParallel(model)
model = model.to(device)

# 在训练循环中使用SNR作为指标
snr_metric = SNRMetric()

In [None]:
epochs = 1

for epoch in range(epochs):
    model.train()
    for batch_no, (noisy_batch, clean_batch) in enumerate(data_loader):
        optimizer.zero_grad()

        noisy_batch = noisy_batch.to(device)
        clean_batch = clean_batch.to(device)

        # 前向传播
        outputs = model(noisy_batch)
        outputs = outputs.to(device)

        # 计算损失
        loss = criterion(outputs, clean_batch)

        # 计算SNR并更新指标
        snr_metric.update(clean_batch, outputs)

        print(f'Epoch [{epoch+1}/{epochs}], Batch [{batch_no+1}/{len(data_loader)}], Loss: {loss.item()}, SNR: {snr_metric.compute()}')

        # 反向传播和优化
        loss.backward()
        optimizer.step()


# 训练结束后你可以保存模型
torch.save(model.state_dict(), '/content/drive/MyDrive/고려대학교/Audio_detection/denoising/R-CED(10 Covn).pth')

In [None]:
# Google Drive에 있는 zip 파일의 경로를 설정
zip_file_path = '/content/drive/MyDrive/고려대학교/Audio_detection/data/open.zip'  # zip 파일의 경로로 변경

# 압축 해제할 경로 설정
extract_path = '/content/drive/MyDrive/고려대학교/Audio_detection/data'  # 압축을 해제할 경로로 변경

# 디렉토리 생성 (존재하지 않는 경우)
import os
os.makedirs(extract_path, exist_ok=True)

# 압축 해제
!unzip {zip_file_path} -d {extract_path}

[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
  inflating: /content/drive/MyDrive/고려대학교/Audio_detection/data/train/YGLREVCI.ogg  
  inflating: /content/drive/MyDrive/고려대학교/Audio_detection/data/train/YGLXWFHV.ogg  
  inflating: /content/drive/MyDrive/고려대학교/Audio_detection/data/train/YGMDZXNH.ogg  
  inflating: /content/drive/MyDrive/고려대학교/Audio_detection/data/train/YGNBHIJR.ogg  
  inflating: /content/drive/MyDrive/고려대학교/Audio_detection/data/train/YGNYUVFD.ogg  
  inflating: /content/drive/MyDrive/고려대학교/Audio_detection/data/train/YGOBSHQC.ogg  
  inflating: /content/drive/MyDrive/고려대학교/Audio_detection/data/train/YGOHGPYC.ogg  
  inflating: /content/drive/MyDrive/고려대학교/Audio_detection/data/train/YGOJCZPI.ogg  
  inflating: /content/drive/MyDrive/고려대학교/Audio_detection/data/train/YGOPJRGB.ogg  
  inflating: /content/drive/MyDrive/고려대학교/Audio_detection/data/train/YGOUDQAW.ogg  
  inflating: /content/drive/MyDrive/고려대학교/Au

In [None]:
import os
import torch
import librosa
import soundfile as sf
from tqdm import tqdm

# CustomModel 클래스 정의가 필요합니다.

# 모델 로드
model_path = '/content/drive/MyDrive/고려대학교/Audio_detection/denoising/R-CED(10 Covn).pth'
model = CustomModel(n_fft=1023, hop_length=500)
model.load_state_dict(torch.load(model_path))
model.eval()

# 디바이스 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# 입력 및 출력 폴더 경로 설정
input_folder = '/content/drive/MyDrive/고려대학교/Audio_detection/data/test'
output_folder = '/content/drive/MyDrive/고려대학교/Audio_detection/data/denoise/denoise_test'
os.makedirs(output_folder, exist_ok=True)

# 모든 파일에 대해 노이즈 제거
for filename in tqdm(os.listdir(input_folder)):
    if filename.endswith('.ogg'):
        input_path = os.path.join(input_folder, filename)
        output_path = os.path.join(output_folder, filename)

        # 오디오 파일 로드
        y, sr = librosa.load(input_path, sr=None)

        # 데이터 전처리 (STFT)
        y_tensor = torch.from_numpy(y).float().unsqueeze(0).to(device)

        # 노이즈 제거
        with torch.no_grad():
            denoised_output = model(y_tensor)

        # ISTFT로 복원
        denoised_audio = denoised_output.squeeze().cpu().numpy()

        # 결과 저장
        sf.write(output_path, denoised_audio, sr)

print(f'노이즈 제거된 오디오 파일이 {output_folder}에 저장되었습니다.')

In [None]:
!git clone https://github.com/yxlu-0102/MP-SENet.git

Cloning into 'MP-SENet'...
remote: Enumerating objects: 797, done.[K
remote: Counting objects: 100% (141/141), done.[K
remote: Compressing objects: 100% (37/37), done.[K
remote: Total 797 (delta 128), reused 104 (delta 104), pack-reused 656[K
Receiving objects: 100% (797/797), 477.72 MiB | 63.89 MiB/s, done.
Resolving deltas: 100% (203/203), done.
Updating files: 100% (247/247), done.
