<a href="https://colab.research.google.com/github/CAU2022-CAPSTONE-PACETIME/BreathDetector/blob/BreathClassifier/BreathClassifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%cd /content/drive/MyDrive/ColabNotebooks

/content/drive/MyDrive/ColabNotebooks


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from imu import *
from BreathDataset import *
from torch.utils.data import DataLoader, random_split
from Sound import *
import torchaudio

BATCH_SIZE = 4
EPOCHS = 20 # 반복 횟수

In [3]:
print(torch.__version__)

1.12.1+cu113


In [4]:
# input : mfcc(40, 44) --> (Batchsize, channel, h, w) --> (BATCH_SIZE, 1, 40, 44)
# sound --> batchsize, 1, 22050
class BreathClassifier(nn.Module):
  def __init__(self, device, transformation):
    super().__init__()
    self.device = device
    self.transformation_mfcc = transformation

    self.conv1 = nn.Sequential(
        nn.Conv2d(
            in_channels = 1,
            out_channels = 4,
            kernel_size = (3, 3),
            stride = 1,
            padding = 1
        ),
        nn.BatchNorm2d(4),
        nn.ReLU()
    )
 
    self.conv2 = nn.Sequential(
        nn.Conv2d(
            in_channels = 4,
            out_channels = 4,
            kernel_size = (3, 3),
            stride = 1,
            padding = 1
        ),
        nn.BatchNorm2d(4),
        nn.ReLU()
    )

    self.fc1 = nn.Sequential(
        nn.Linear(4*40*87, 500),
        nn.ReLU()
    )
    
    self.fc2 = nn.Sequential(
        nn.Linear(500, 200),
        nn.ReLU()
    )

    self.fc3 = nn.Sequential(
        nn.Linear(200, 50),
        nn.ReLU()
    )

    self.fc4 = nn.Sequential(
        nn.Linear(50, 1),
        nn.Sigmoid()
    )

    self.dropout = nn.Dropout(0.4)
  
  def forward(self, input):
    # High pass Filtering

    mfcc = self.transformation_mfcc(input)
    mfcc = mfcc.view(-1, 1, 40, 87)
  
    x = self.conv1(mfcc)
    x = self.dropout(x)
    x = self.conv2(x)
    x = self.dropout(x)
    x = x.view(x.size(0), -1)
    
    x = self.fc1(x)
    x = self.fc2(x)
    x = self.fc3(x)
    x = self.fc4(x)

    return x

In [5]:
def create_data_loader(train_data, batch_size):
  data_loader = DataLoader(train_data, batch_size = batch_size, shuffle = True, drop_last = True)
  return data_loader

def train_single_epoch(model, data_loader, loss_fn, optimizer, device):
  for input, target in data_loader:
    input = input.view(58, -1, 22050) # sound : batch*58x22050->58*batch*22050 imu : batch*60 -> 60*batch
    target = target.view(58, -1) 

    for i in range(len(input)):
      sound = input[i].view(-1, 22050).to(device) # sound.shape : Batchx22050
      imu = target[i].view(-1, 1).to(device)
      prediction = model(sound)
      loss = loss_fn(prediction, imu)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

def train(model, data_loader, loss_fn, optimizer, device, epochs):
  for i in range(epochs):
    print(f"Epoch {i + 1}")
    train_single_epoch(model, data_loader, loss_fn, optimizer, device)
    print("------------------------------")
  print("Finished training ")


def test(model, data_loader):
  with torch.no_grad():
    for input, target in data_loader:
      input = input.view(58, -1, 22050) # sound : batch*60x40x44->60*batch*40*44 imu : batch*60 -> 60*batch
      target = target.view(58, -1) 
        #print("input shape : {} target shape {}".format(input.shape, target.shape))
      accuracy = 0
      total = 0
      for i in range(len(input)):
        sound = input[i].view(-1, 1, 22050).to(device)
        imu = target[i].view(-1, 1).to(device)
   
        prediction = 1 if model(sound) >= 0.5 else 0
        total += 1
        accuracy += (prediction == imu).sum().item()
    print("accuracy : {}".format(100*accuracy/total))

In [6]:
if __name__ == "__main__":
  audio_list = ["/content/drive/MyDrive/ColabNotebooks/Data/note9-budslive-sync", "/content/drive/MyDrive/ColabNotebooks/Data/A21s-airpodspro-sync"]
  device = 'cpu'
  dataset = BreathDataset(audio_list, device)
  print("Data length : {} Device : {}".format(len(dataset), device))

  train_ratio = 0.8
  train_data_length = int(train_ratio * len(dataset))
  test_data_length = len(dataset) - train_data_length

  train_dataset, test_dataset = random_split(dataset, [train_data_length, test_data_length])
  train_data_loader = create_data_loader(train_dataset, BATCH_SIZE)
  test_data_loader = create_data_loader(test_dataset, 1)
  
  mfcc_transform = torchaudio.transforms.MFCC(
      sample_rate=44100,
      n_mfcc=40,
      melkwargs={
          "n_fft": 500,
          "hop_length": 256,
          "n_mels" : 40
      },
  )


  cnn = BreathClassifier(device, mfcc_transform).to(device)
  #print(cnn)

  loss_fn = nn.BCELoss()
  optimizer = optim.Adam(cnn.parameters(), lr = 0.001)

  #train(cnn, train_data_loader, loss_fn, optimizer, device, EPOCHS)
  #cnn.eval()
  #torch.save(cnn.state_dict(), "BreathClassifierVer1.2.pth")

Data length : 85 Device : cpu


In [None]:
for i in range(10):
  test(cnn, test_data_loader)

In [7]:
device = 'cpu'

model = BreathClassifier(device, mfcc_transform).to(device)
model.load_state_dict(torch.load("BreathClassifierVer1.2.pth", map_location = device))
print(model)
model.eval()
for i in range(10):
  test(model, test_data_loader)

BreathClassifier(
  (transformation_mfcc): MFCC(
    (amplitude_to_DB): AmplitudeToDB()
    (MelSpectrogram): MelSpectrogram(
      (spectrogram): Spectrogram()
      (mel_scale): MelScale()
    )
  )
  (conv1): Sequential(
    (0): Conv2d(1, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (conv2): Sequential(
    (0): Conv2d(4, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (fc1): Sequential(
    (0): Linear(in_features=13920, out_features=500, bias=True)
    (1): ReLU()
  )
  (fc2): Sequential(
    (0): Linear(in_features=500, out_features=200, bias=True)
    (1): ReLU()
  )
  (fc3): Sequential(
    (0): Linear(in_features=200, out_features=50, bias=True)
    (1): ReLU()
  )
  (fc4): Sequential(
    (0): Linear(in_features=50, out_features=1, bias=True)
 

In [None]:
import pandas as pd
import numpy as np
import time

def sound_process(path, idx):
  data = pd.read_csv(path)
  data = data["sound"].dropna()
  data = np.array(data)
  sample = data[idx*44100:idx*44100+22050]

  return sample

path = "/content/drive/MyDrive/ColabNotebooks/Data/note9-airpodspro/Data_2022-11-01_18_53_03.csv"
sample= sound_process(path, 5)

start = time.time()
average = 0
for i in range(20):
  total = model(torch.Tensor(sample).to(device))
  print(total)
  average += total
end = time.time()-start

print(end)
print(average/20)

In [10]:
# Model test

path = "/content/drive/MyDrive/ColabNotebooks/Data/A21s-airpodspro/Data_2022-11-04_00_55_32.csv"
#path = "/content/drive/MyDrive/ColabNotebooks/Data/note9-budslive-sync/Data_2022-11-01_14_47_29.csv"
path = "/content/drive/MyDrive/ColabNotebooks/Data/zflip-budspro/Data_2022-11-15_19_42_56.csv"

def model_check(model, path, device = 'cpu'):
  data = pd.read_csv(path)
  sound = np.array(data['sound'].dropna())
  imu = np.array(Imu(data).get_item())
  
  print("Model Test")
  for s in range(58):
    st = s * 22050
    sound_data = sound[st:st+22050]

    print(s*0.5, " : ", model(torch.Tensor(sound_data).to(device)), end = " ")
    print("Real : ", imu[s])  

model_check(model, path)

Model Test
0.0  :  tensor([[0.0004]], grad_fn=<SigmoidBackward0>) Real :  0
0.5  :  tensor([[1.]], grad_fn=<SigmoidBackward0>) Real :  1
1.0  :  tensor([[0.1166]], grad_fn=<SigmoidBackward0>) Real :  1
1.5  :  tensor([[0.0002]], grad_fn=<SigmoidBackward0>) Real :  0
2.0  :  tensor([[0.0004]], grad_fn=<SigmoidBackward0>) Real :  0
2.5  :  tensor([[0.0403]], grad_fn=<SigmoidBackward0>) Real :  0
3.0  :  tensor([[0.1148]], grad_fn=<SigmoidBackward0>) Real :  1
3.5  :  tensor([[0.0593]], grad_fn=<SigmoidBackward0>) Real :  1
4.0  :  tensor([[0.0002]], grad_fn=<SigmoidBackward0>) Real :  1
4.5  :  tensor([[0.1756]], grad_fn=<SigmoidBackward0>) Real :  0
5.0  :  tensor([[0.9999]], grad_fn=<SigmoidBackward0>) Real :  0
5.5  :  tensor([[0.0554]], grad_fn=<SigmoidBackward0>) Real :  0
6.0  :  tensor([[0.0215]], grad_fn=<SigmoidBackward0>) Real :  0
6.5  :  tensor([[0.0047]], grad_fn=<SigmoidBackward0>) Real :  1
7.0  :  tensor([[0.0233]], grad_fn=<SigmoidBackward0>) Real :  1
7.5  :  tensor([[0

# Pytorch model to Pytorch mobile

In [8]:
from torch.utils.mobile_optimizer import optimize_for_mobile
scripted_model = torch.jit.script(model)
opt_model = optimize_for_mobile(scripted_model)
torch.jit.save(opt_model, "/content/drive/MyDrive/ColabNotebooks/Data/Model/model1206_2.pt")