In [1]:
import pandas as pd
from pathlib import Path
Catholic_file = 'breathdata/augment_train.csv'
df = pd.read_csv(Catholic_file)
df

Unnamed: 0,filename,category,class
0,0001-1.wav,wheezing,1
1,0001-2.wav,healthy,0
2,0001-3.wav,wheezing,1
3,0001-4.wav,wheezing,1
4,0002-1.wav,healthy,0
...,...,...,...
1465,minus_0614-3.wav,wheezing,1
1466,minus_0614-4.wav,wheezing,1
1467,minus_0615-1.wav,wheezing,1
1468,minus_0615-2.wav,wheezing,1


In [2]:
df['relative_path'] = '/' + df['filename'].astype(str)
df = df[['relative_path', 'class']]
df.head()

Unnamed: 0,relative_path,class
0,/0001-1.wav,1
1,/0001-2.wav,0
2,/0001-3.wav,1
3,/0001-4.wav,1
4,/0002-1.wav,0


In [3]:
data_path = 'breathdata/train'

In [4]:
import math, random
import torch
import torchaudio
from torchaudio import transforms
from IPython.display import Audio

class Breath_sound_Util():
  
  def open(audio_file):
    sig, sr = torchaudio.load(audio_file)
    sig = torchaudio.functional.gain(sig, gain_db=5.0)
    sig = torchaudio.functional.lowpass_biquad(sig, sr, cutoff_freq=3000)
    sig = torchaudio.functional.highpass_biquad(sig, sr, cutoff_freq=2000)
    
    return (sig, sr)

  def resample(aud, newsr):
    sig, sr = aud
    
    if (sr == newsr):
     return aud

    num_channels = sig.shape[0]
    resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
    if (num_channels > 1):
      retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
      resig = torch.cat([resig, retwo])

    return ((resig, newsr))
  

  def pad(aud, max_ms):
    sig, sr = aud
    num_rows, sig_len = sig.shape
    max_len = sr//1000 * max_ms
   
    if (sig_len > max_len):
      sig = sig[:,:max_len]
 
    elif (sig_len < max_len):

      repeated = []
      repeated.append(sig)
      required_len = max_len - sig_len

      while required_len > sig_len : 
        repeated.append(sig)
        require_len -= sig_len
      repeated.append(sig[:, :required_len])
 
      sig = torch.cat(repeated, 1)

    return (sig, sr)


## 형 여기 time shift도 빼는게 좋지않으까요?? 아 이미 되잇는거니가 2번하게대지??네ㅇㅋ
  def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
    sig,sr = aud
    top_db = 80
    
    spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)
    spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
    return (spec)

  def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
    _, n_mels, n_steps = spec.shape
    mask_value = spec.mean()
    aug_spec = spec

    freq_mask_param = max_mask_pct * n_mels
    for _ in range(n_freq_masks):
      aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

    time_mask_param = max_mask_pct * n_steps
    for _ in range(n_time_masks):
      aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

    return aug_spec
    print(aug_spec)

In [5]:
from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio
import torchvision

class breathDS(Dataset):
    
  def __init__(self, df, data_path):
    self.df = df
    self.data_path = str(data_path)
    self.duration = 4000
    self.sr = 48000
            
  def __len__(self):
    return len(self.df)    
    
  def __getitem__(self, idx):
    audio_file = self.data_path + self.df.loc[idx, 'relative_path']
    class_id = self.df.loc[idx, 'class']
    aud = Breath_sound_Util.open(audio_file)
    reaud = Breath_sound_Util.resample(aud, self.sr)
    dur_aud = Breath_sound_Util.pad(reaud, self.duration)
    sgram = Breath_sound_Util.spectro_gram(dur_aud, n_mels=64, n_fft=1024, hop_len=None)
    aug_sgram = Breath_sound_Util.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)

    
    return aug_sgram, class_id

In [6]:
from torch.utils.data import random_split

brds = breathDS(df, data_path)

In [7]:
train_dl = torch.utils.data.DataLoader(brds, batch_size=32, shuffle=True)

In [8]:
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import init

# --------------------------------------------------------------
# 호흡음의 Healthy, Wheezing을 판단하는 Binary Classification Model
# --------------------------------------------------------------

class WhoWheezing(nn.Module):

    def __init__(self):
        super().__init__()
        conv_layers = []
        self.conv1 = nn.Conv2d(1, 8, kernel_size=(3, 3), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.2)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv1.weight, a=0.2)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv1.weight, a=0.2)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv1.weight, a=0.2)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=2)
        self.sigmoid = nn.Sigmoid()

        self.conv = nn.Sequential(*conv_layers)
 
    def forward(self, x):
        x = self.conv(x)
        x = self.ap(x)
        x = x.view(x.shape[0], -1)
        x = self.lin(x)
        x = self.sigmoid(x)
        return x

Model1 = WhoWheezing()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
Model1 = Model1.to(device)
next(Model1.parameters()).device

device(type='cuda', index=0)

In [9]:
from torch.autograd import Variable

In [None]:
from sklearn.metrics import confusion_matrix
import numpy as np
import sklearn.metrics as metrics
from tqdm import tqdm

def mixup_data(x, y, alpha=1.0, use_cuda=True):
    '''Returns mixed inputs, pairs of targets, and lambda'''
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    if use_cuda:
        index = torch.randperm(batch_size).cuda()
    else:
        index = torch.randperm(batch_size)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam


def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)


use_cuda = torch.cuda.is_available()
alpha = 1.0


def training(model, train_dl, num_epochs):
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(),lr=0.01)
  scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.01,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

  for epoch in tqdm(range(num_epochs)):
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0

    for i, data in enumerate(train_dl):
        inputs, labels = data[0].to(device), data[1].to(device)
        inputs_m, inputs_s = inputs.mean(), inputs.std()
        inputs = (inputs - inputs_m) / inputs_s
        inputs, targets_a, targets_b, lam = mixup_data(inputs, labels,
                                                       alpha, use_cuda)
        inputs, targets_a, targets_b = map(Variable, (inputs,
                                                      targets_a, targets_b))
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()
        running_loss += loss.item()
        _, prediction = torch.max(outputs,1)
        correct_prediction += (prediction == labels).sum().item()
        total_prediction += prediction.shape[0]
        

    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction/total_prediction
    print(f'Epoch: {epoch}, Loss: {avg_loss:.4f}, Accuracy: {acc:.4f}')

  print('Finished Training')
    
num_epochs=100
training(Model1, train_dl, num_epochs)

  1%|          | 1/100 [00:34<57:10, 34.65s/it]

Epoch: 0, Loss: 0.6546, Accuracy: 0.6204


  2%|▏         | 2/100 [01:09<56:39, 34.69s/it]

Epoch: 1, Loss: 0.6130, Accuracy: 0.7082


  3%|▎         | 3/100 [01:44<56:04, 34.69s/it]

Epoch: 2, Loss: 0.6001, Accuracy: 0.7068


  4%|▍         | 4/100 [02:18<55:16, 34.54s/it]

Epoch: 3, Loss: 0.5569, Accuracy: 0.7517


  5%|▌         | 5/100 [02:52<54:29, 34.41s/it]

Epoch: 4, Loss: 0.5657, Accuracy: 0.7340


  6%|▌         | 6/100 [03:27<54:03, 34.50s/it]

Epoch: 5, Loss: 0.5791, Accuracy: 0.7116


  7%|▋         | 7/100 [04:01<53:33, 34.55s/it]

Epoch: 6, Loss: 0.5363, Accuracy: 0.7707


  8%|▊         | 8/100 [04:36<52:54, 34.51s/it]

Epoch: 7, Loss: 0.5654, Accuracy: 0.7190


  9%|▉         | 9/100 [05:11<52:36, 34.68s/it]

Epoch: 8, Loss: 0.5732, Accuracy: 0.7197


 10%|█         | 10/100 [05:46<52:07, 34.75s/it]

Epoch: 9, Loss: 0.5484, Accuracy: 0.7429


## TEST

In [None]:
Catholic_file1 = 'breathdata/test.csv'
df1 = pd.read_csv(Catholic_file1)
df1

In [None]:
df1['relative_path'] = '/' + df1['filename'].astype(str)
df1 = df1[['relative_path', 'class']]
df1.head()

In [None]:
data_path = 'breathdata/test'

In [None]:
class Breath_sound_Util1():
  
  def open(audio_file):
    sig, sr = torchaudio.load(audio_file)
    
    return (sig, sr)

  def resample(aud, newsr):
    sig, sr = aud
    
    if (sr == newsr):
     return aud

    num_channels = sig.shape[0]
    resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
    if (num_channels > 1):
      retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
      resig = torch.cat([resig, retwo])

    return ((resig, newsr))
  

  def pad(aud, max_ms):
    sig, sr = aud
    num_rows, sig_len = sig.shape
    max_len = sr//1000 * max_ms
   
    if (sig_len > max_len):
      sig = sig[:,:max_len]
 
    elif (sig_len < max_len):

      repeated = []
      repeated.append(sig)
      required_len = max_len - sig_len

      while required_len > sig_len : 
        repeated.append(sig)
        require_len -= sig_len
      repeated.append(sig[:, :required_len])
 
      sig = torch.cat(repeated, 1)

    return (sig, sr)

  def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
    sig,sr = aud
    top_db = 80
    
    spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)
    spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
    return (spec)

#이상한애들 타임시프트랑 / 마스크랑 / 이상한애들 다뺌


In [None]:
class breathDS1(Dataset):
    
  def __init__(self, df1, data_path):
    self.df1 = df1
    self.data_path = str(data_path)
    self.duration = 4000
    self.sr = 48000
            
  def __len__(self):
    return len(self.df1)    
    
  def __getitem__(self, idx):
    audio_file = self.data_path + self.df1.loc[idx, 'relative_path']
    class_id = self.df1.loc[idx, 'class']
    aud = Breath_sound_Util1.open(audio_file)
    reaud = Breath_sound_Util1.resample(aud, self.sr)
    dur_aud = Breath_sound_Util1.pad(reaud, self.duration)
    sgram = Breath_sound_Util1.spectro_gram(dur_aud, n_mels=64, n_fft=1024, hop_len=None)

    
    return sgram, class_id

In [None]:
brds1 = breathDS1(df1, data_path)

In [None]:
val_dl = torch.utils.data.DataLoader(brds1, batch_size=32, shuffle=True)

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
from sklearn.metrics import roc_curve, roc_auc_score

def inference (Model1, val_dl):
  correct_prediction = 0
  total_prediction = 0
  TP = 0 
  TN = 0
  FN = 0
  FP = 0
  a = []
  b = []
  with torch.no_grad():
    for i, data in enumerate(val_dl):
      
      inputs, labels = data[0].to(device), data[1].to(device)
      cpu_labels = data[1]
      inputs_m, inputs_s = inputs.mean(), inputs.std()
      inputs = (inputs - inputs_m) / inputs_s
      outputs = Model1(inputs)
 
      _, prediction = torch.max(outputs,1)
      
      cpu_prediction = prediction.to('cpu')
      correct_prediction += (prediction == labels).sum().item()
      total_prediction += prediction.shape[0]
          

      TP += ((prediction == 1) & (labels == 1)).sum()
# TN predict and label are both 0
      TN += ((prediction == 0) & (labels == 0)).sum()
# FN    predict 0 label 1
      FN += ((prediction == 0) & (labels == 1)).sum()
# FP    predict 1 label 0
      FP += ((prediction == 1) & (labels == 0)).sum()

      c = cpu_labels.tolist()
      a.extend(c)

      d = cpu_prediction.tolist()
      b.extend(d)
    
    # TP : 휘징인데 휘징이라고 판단(잘함)
    # FP : 헬시인데 휘징이라고 판단(못함)
    # TN : 헬시인데 헬시라고 판단(잘함)
    # FN : 휘징인데 헬시라고 판단(못함)
    
    # POSITIVE : 1(휘징)이라고 판단
    # NEGATIVE : 0(헬시)라고 판단
    # TRUE : 정답값 1(휘징)
    # FALSE : 정답값 0(헬시)
    
    # PRECISION : 휘징이라고 판단한 것 중 진짜 휘징
    # RECALL : 휘징인데 휘징이라고 잘 판단
    

  precision = TP / (TP + FP)
  recall = TP / (TP + FN)
  F1 = 2 * recall * precision / (recall + precision)

  acc = correct_prediction/total_prediction
  print(f'Accuracy: {acc:.4f}, Total items: {total_prediction}')
  print(f'precision: {precision:.4f}, F1: {F1:.4f}')
  print(f'recall: {recall:.4f}')
  target_names = ['healthy', 'wheezing']
  print(classification_report(a, b, target_names = target_names))
  print("AUC:{}".format(roc_auc_score(a, b)))

inference(Model1, val_dl)