In [1]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter
from scipy.io import wavfile
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from skimage.transform import resize
from torchvision import transforms
import torchaudio.transforms as ta_transforms
import math
import torchaudio
import time
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

In [2]:
PATH=os.getenv("HOME")+"/aiffel/ECG_data/physionet.org/files/circor-heart-sound/1.0.3/training_data"
SAMPLE_RATE = 4000
HOP_LENGTH = 40

In [3]:

# txt파일 불러오기
file_list = os.listdir(PATH)
txt_list = [os.path.join(PATH, file) for file in file_list if file.endswith(".txt")]

# 환자 아이디를 훈련 데이터셋과 테스트 데이터셋으로 나눔
train_patient_txt, test_patient_txt = train_test_split(txt_list, test_size=0.9, random_state=42)

# 결과 출력
print("Train Patient IDs:", train_patient_txt[:3])
print("Test Patient IDs:", test_patient_txt[:3])

Train Patient IDs: ['/aiffel/aiffel/ECG_data/physionet.org/files/circor-heart-sound/1.0.3/training_data/84706.txt', '/aiffel/aiffel/ECG_data/physionet.org/files/circor-heart-sound/1.0.3/training_data/85262.txt', '/aiffel/aiffel/ECG_data/physionet.org/files/circor-heart-sound/1.0.3/training_data/84699.txt']
Test Patient IDs: ['/aiffel/aiffel/ECG_data/physionet.org/files/circor-heart-sound/1.0.3/training_data/50826.txt', '/aiffel/aiffel/ECG_data/physionet.org/files/circor-heart-sound/1.0.3/training_data/50671.txt', '/aiffel/aiffel/ECG_data/physionet.org/files/circor-heart-sound/1.0.3/training_data/85174.txt']


In [4]:
class Biquad:

  # pretend enumeration
    LOWPASS, HIGHPASS, BANDPASS, NOTCH, PEAK, LOWSHELF, HIGHSHELF = range(7)

    def __init__(self,typ, freq, srate, Q, dbGain = 0):
        types = {
            Biquad.LOWPASS : self.lowpass,
            Biquad.HIGHPASS : self.highpass,
            Biquad.BANDPASS : self.bandpass,
            Biquad.NOTCH : self.notch,
            Biquad.PEAK : self.peak,
            Biquad.LOWSHELF : self.lowshelf,
            Biquad.HIGHSHELF : self.highshelf
          }
        assert typ in types
        freq = float(freq)
        self.srate = float(srate)
        Q = float(Q)
        dbGain = float(dbGain)
        self.a0 = self.a1 = self.a2 = 0
        self.b0 = self.b1 = self.b2 = 0
        self.x1 = self.x2 = 0
        self.y1 = self.y2 = 0
        # only used for peaking and shelving filter types
        A = math.pow(10, dbGain / 40)
        omega = 2 * math.pi * freq / self.srate
        sn = math.sin(omega)
        cs = math.cos(omega)
        alpha = sn / (2*Q)
        beta = math.sqrt(A + A)
        types[typ](A,omega,sn,cs,alpha,beta)
        # prescale constants
        self.b0 /= self.a0
        self.b1 /= self.a0
        self.b2 /= self.a0
        self.a1 /= self.a0
        self.a2 /= self.a0

    def lowpass(self,A,omega,sn,cs,alpha,beta):
        self.b0 = (1 - cs) /2
        self.b1 = 1 - cs
        self.b2 = (1 - cs) /2
        self.a0 = 1 + alpha
        self.a1 = -2 * cs
        self.a2 = 1 - alpha

    def highpass(self,A,omega,sn,cs,alpha,beta):
        self.b0 = (1 + cs) /2
        self.b1 = -(1 + cs)
        self.b2 = (1 + cs) /2
        self.a0 = 1 + alpha
        self.a1 = -2 * cs
        self.a2 = 1 - alpha

    def bandpass(self,A,omega,sn,cs,alpha,beta):
        self.b0 = alpha
        self.b1 = 0
        self.b2 = -alpha
        self.a0 = 1 + alpha
        self.a1 = -2 * cs
        self.a2 = 1 - alpha

    def notch(self,A,omega,sn,cs,alpha,beta):
        self.b0 = 1
        self.b1 = -2 * cs
        self.b2 = 1
        self.a0 = 1 + alpha
        self.a1 = -2 * cs
        self.a2 = 1 - alpha

    def peak(self,A,omega,sn,cs,alpha,beta):
        self.b0 = 1 + (alpha * A)
        self.b1 = -2 * cs
        self.b2 = 1 - (alpha * A)
        self.a0 = 1 + (alpha /A)
        self.a1 = -2 * cs
        self.a2 = 1 - (alpha /A)

    def lowshelf(self,A,omega,sn,cs,alpha,beta):
        self.b0 = A * ((A + 1) - (A - 1) * cs + beta * sn)
        self.b1 = 2 * A * ((A - 1) - (A + 1) * cs)
        self.b2 = A * ((A + 1) - (A - 1) * cs - beta * sn)
        self.a0 = (A + 1) + (A - 1) * cs + beta * sn
        self.a1 = -2 * ((A - 1) + (A + 1) * cs)
        self.a2 = (A + 1) + (A - 1) * cs - beta * sn

    def highshelf(self,A,omega,sn,cs,alpha,beta):
        self.b0 = A * ((A + 1) + (A - 1) * cs + beta * sn)
        self.b1 = -2 * A * ((A - 1) + (A + 1) * cs)
        self.b2 = A * ((A + 1) + (A - 1) * cs - beta * sn)
        self.a0 = (A + 1) - (A - 1) * cs + beta * sn
        self.a1 = 2 * ((A - 1) - (A + 1) * cs)
        self.a2 = (A + 1) - (A - 1) * cs - beta * sn

  # perform filtering function
    def __call__(self,x):
        y = self.b0 * x + self.b1 * self.x1 + self.b2 * self.x2 - self.a1 * self.y1 - self.a2 * self.y2
        self.x2, self.x1 = self.x1, x
        self.y2, self.y1 = self.y1, y
        
        return y

  # provide a static result for a given frequency f
    def result(self,f):
        phi = (math.sin(math.pi * f * 2/(2.0 * self.srate)))**2
        return ((self.b0+self.b1+self.b2)**2 - \
        4*(self.b0*self.b1 + 4*self.b0*self.b2 + \
        self.b1*self.b2)*phi + 16*self.b0*self.b2*phi*phi) / \
        ((1+self.a1+self.a2)**2 - 4*(self.a1 + \
        4*self.a2 + self.a1*self.a2)*phi + 16*self.a2*phi*phi)

    def log_result(self,f):
        try:
            r = 10 * math.log10(self.result(f))
        except:
            r = -200
        return r

  # return computed constants
    def constants(self):
        return self.b0,self.b1,self.b2,self.a1,self.a2

In [5]:
class CustomDataset(Dataset):
    def __init__(self, path, txt_list,
                 target_size=(40, 2500),
                 th=25,
                 resizing=False,
                 filtering=False,
                 filter_hz=500):
        self.path = path
        self.txt_list = txt_list
        self.target_size = target_size
        self.th = int(th * SAMPLE_RATE / HOP_LENGTH)
        self.resizing = resizing
        self.filtering = filtering
        self.filter_hz = filter_hz

        self.get_file_list()
        
        self.delete_list=[]
        self.x = self.get_mel_spectrogram()
        self.y = self.get_label()
        self.delete_data()
    
    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

    def get_file_list(self):
        self.heas = []
        self.wavs = []
        self.tsvs = []

        for path_txt in self.txt_list:
            with open(path_txt, "r") as f:
                P_id, n, sr = f.readline().split()
                for _ in range(int(n)):
                    _, hea, wav, tsv = f.readline().split()
                    self.heas.append(hea)
                    self.wavs.append(wav)
                    self.tsvs.append(tsv)
        self.heas.sort()
        self.wavs.sort()
        self.tsvs.sort()

    # torchaudio로 필터링 적용
    def apply_filter_torchaudio(self, audio, cutoff_hz):
        biquad = Biquad(typ=Biquad.LOWPASS, freq=cutoff_hz, srate=SAMPLE_RATE, Q=0.707)
        b0, b1, b2, a1, a2 = biquad.constants()
        b0 = torch.tensor(b0)
        b1 = torch.tensor(b1)
        b2 = torch.tensor(b2)
        a0 = 1
        a1 = torch.tensor(a1)
        a2 = torch.tensor(a2)
        filtered_audio = torchaudio.functional.biquad(audio, b0, b1, b2, a0, a1, a2)
        return filtered_audio

    def padding(self, spec, target_length, padding_value=0):
        pad_width = target_length - spec.shape[-1]
        padded_spec = torch.nn.functional.pad(spec, (0, pad_width, 0, 0), "constant", 0)
        return padded_spec

    def resize_spectrogram(self, spec, new_shape):
        resized_spec = transforms.functional.resize(img=spec, size=new_shape, antialias=None)
        return resized_spec

    # # 나누기 전 이미지에 대한 정규화를 위해 사용
    # def normalize_spectrogram(self, wav, spec):
    #     wav_max = np.max(np.array(wav))
    #     spec_max = np.max(np.array(spec))
    #     norm_max = spec_max / wav_max
    #     print(norm_max)
    #     normalized = spec / norm_max
    #     return normalized

    # 나누어진 개별 이미지의 최대값을 이용한 정규화
    def normalize_spectrogram2(self, spec):
        spec_max = np.max(np.array(spec))
        spec_min = np.min(np.array(spec))
        normalized = (spec-spec_min) / (spec_max-spec_min)
        return normalized

    # # torchvision을 사용한 정규화
    # def normalize_spectrogram3(self, spec):
    #     normalized = transforms.Normalize((0), (0.5))(spec)
    #     return normalized

    def get_mel_spectrogram(self):
        audio_list = []
        self.scale_list = []
        self.iter_list = []

        for path_wav in self.wavs:
            path = os.path.join(PATH, path_wav)

            # Torchaudio 이용하여 파일 로드
            x = torchaudio.load(path)[0]
            # Filtering
            if self.filtering is True:
                cutoff_frequency = self.filter_hz
                x = self.apply_filter_torchaudio(x, cutoff_frequency)
            ms = ta_transforms.MelSpectrogram(sample_rate=SAMPLE_RATE,
                                        n_fft=128,
                                        win_length=100,
                                        n_mels=40,
                                        hop_length=HOP_LENGTH)(x)

            # 스펙트로그램의 형태를 유지하면서 torchaudio로 불러왔을 때와 같이 0~1로 정규화
            # ms = self.normalize_spectrogram(x, ms)

            # 원본 wav의 길이가 th보다 길다면 Slicing
            if ms.shape[-1] > self.th:
                scale = 1
                num_splits = ms.shape[-1] // self.th    # wav길이 == th의 배수
                if ms.shape[-1] % self.th != 0: # wav길이 != th의 배수
                    num_splits += 1
                self.iter_list.append(num_splits)

                for i in range(num_splits):
                    start_idx = i * self.th
                    end_idx = (i + 1) * self.th
                    split = ms[..., start_idx:end_idx]

                    # th보다 길이가 짧다면
                    if split.shape[-1] < self.th:
                        # Resizing
                        if self.resizing is True:
                            scale = self.th / split.shape[-1]
                            target_shape = (split.shape[-2], self.th)
                            split = self.resize_spectrogram(split, target_shape)
                        # Padding
                        else:
                            split = self.padding(split, self.th)
                    # 최종 Resizing
                    resized = self.resize_spectrogram(split, self.target_size)
                    resized = self.normalize_spectrogram2(resized)  # 정규화
                    audio_list.append(resized)
                    if self.resizing is True:
                        self.scale_list.append(scale)

            # 원본 wav의 길이가 th보다 짧거나 같다면
            else:
                self.iter_list.append(1)
                scale = 1
                # th보다 짧다면
                if ms.shape[-1] < self.th:
                    # Resizing
                    if self.resizing is True:
                        scale = self.th / ms.shape[-1]
                        target_shape = (ms.shape[-2], self.th)
                        ms = self.resize_spectrogram(ms, target_shape)
                    # Padding
                    else:
                        ms = self.padding(ms, self.th)
                # 최종 resizing
                ms = self.resize_spectrogram(ms, self.target_size)
                ms = self.normalize_spectrogram2(ms)    # 정규화
                audio_list.append(ms)
                if self.resizing is True:
                    self.scale_list.append(scale)
        return torch.stack(audio_list)

    def get_label(self):
        labels = []
        idx=0
        for i, path_tsv in enumerate(self.tsvs):
            path = os.path.join(PATH, path_tsv)
            tsv_data = pd.read_csv(path, sep='\t', header=None)
            iter = self.iter_list[i]
            continuous = False
            next_end = 0
            next_class = 0
            for _iter in range(iter):
                label = []
                if self.resizing is True:
                    scale = self.scale_list[sum(self.iter_list[:i]) + _iter]
                for _, tsv_row in tsv_data.iterrows():
                    # 이전 구간에서 이어진다면 tsv_row[0] = 0
                    if continuous is True:
                        tsv_row[0] = 0
                        tsv_row[1] = next_end
                        tsv_row[2] = next_class
                        continuous = False
                        # resize를 하였다면 라벨 값도 스케일링
                        if self.resizing is True:
                            tsv_row[0] *= scale
                            tsv_row[1] *= scale

                    # 이전 구간에서 이어지지 않는다면 새로 데이터 가져오기
                    elif tsv_row[2] in [1, 3]:
                        # 구간 불러와서 sr값 곱하고 hop_legth로 나누기
                        tsv_row[0] = tsv_row[0] * SAMPLE_RATE / HOP_LENGTH - (_iter * self.th)
                        tsv_row[1] = tsv_row[1] * SAMPLE_RATE / HOP_LENGTH - (_iter * self.th)
                        tsv_row[2] = 0 if tsv_row[2] == 1 else 1    # S1=0, S2=1 
                        # 시작점이 th 이상이라면, 이전 구간 이미지의 라벨이라면 continue
                        if tsv_row[0] >= self.th or tsv_row[0] < 0:
                            continue
                        # th보다 길이가 짧다면(나뉜 이미지의 가장 마지막 이미지라면)
                        if _iter == iter - 1:
                            # resize를 하였다면 라벨 값도 스케일링
                            if self.resizing is True:
                                tsv_row[0] *= scale
                                tsv_row[1] *= scale

                        # th보다 길이가 길다면 Slicing
                        elif tsv_row[1] >= self.th:
                            next_end = tsv_row[1] - self.th
                            next_class = tsv_row[2]
                            tsv_row[1] = self.th - 1
                            continuous = True
                            # 최종 resize한 값 으로 보간
                            tsv_row[0] *= self.target_size[1] / self.th
                            tsv_row[1] *= self.target_size[1] / self.th
                            
                            # S1=1, S2=2 
                            label.append([tsv_row[0] / self.target_size[0], 0 / self.target_size[0],
                                  tsv_row[1] / self.target_size[0], self.target_size[0] / self.target_size[0],
                                  int(tsv_row[2])+1])# xmin, ymin, xmax, ymax, cls
                            break

                    # 이전 값에서 이어지지 않으면서 S1, S2가 아닌 경우 continue
                    else: continue

                    # 최종 resize한 값 으로 보간
                    tsv_row[0] *= self.target_size[1] / self.th
                    tsv_row[1] *= self.target_size[1] / self.th
                    
                    label.append([tsv_row[0] / self.target_size[0], 0 / self.target_size[0],
                                  tsv_row[1] / self.target_size[0], self.target_size[0] / self.target_size[0],
                                  int(tsv_row[2])+1])# xmin, ymin, xmax, ymax, cls
                    
                    #label.append((int(tsv_row[2]), tsv_row[0], tsv_row[1]))
                if(len(label)==0):
                    self.delete_list.append(idx)
                idx+=1
                labels.append(label)      
        return labels
    
    def delete_data(self):
        delete_count=0
        for i in self.delete_list:
            del self.y[i-delete_count]
            delete_count+=1
        
        self.x = self.x[[i for i in range(self.x.size(0)) if i not in self.delete_list]]
        
        

In [6]:
dataset = CustomDataset(PATH, train_patient_txt, target_size=(300, 300), th=5, resizing=True, filtering=True)

In [7]:
len(dataset.delete_list)
dataset.delete_list
print(len(dataset.y),len(dataset.x))
dataset.x.size(0)

1049 1049


1049

# data loader 생성

In [8]:
from torch.utils.data import DataLoader

def my_collate_fn(batch):

    targets = []
    imgs = []
    for sample in batch:
        imgs.append(sample[0])  # sample[0]은 화상 gt
        targets.append(torch.FloatTensor(sample[1]))  # sample[1]은 어노테이션 gt

    imgs = torch.stack(imgs, dim=0)
    return imgs, targets

train_dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=my_collate_fn)

In [9]:
print(len(train_dataloader))
for images, labels in train_dataloader:
    print(images.size())
    print(labels[0].size())
    break

33
torch.Size([32, 1, 300, 300])
torch.Size([20, 5])


# model 생성

In [10]:
from torchvision import models
import torch
device = torch.device("cuda")

backbone = models.mobilenet_v3_large(pretrained=True)

#for param in backbone.parameters():
#    param.requires_grad = False

In [11]:
for name, module in backbone.named_children():
    print(name)

for n, x in enumerate(backbone.features.children()):
    if n==0:
        seq=x.children()
        seq=next(seq)
        
        prev_weight = seq.weight
        new_weight = prev_weight[:, :1, :, :]
        seq.weight = nn.Parameter(new_weight)
        seq.in_channels = 1
    if n==13:
        seq=x.children()
        seq=next(seq)
        print(seq[0])

features
avgpool
classifier
ConvBNActivation(
  (0): Conv2d(112, 672, kernel_size=(1, 1), stride=(1, 1), bias=False)
  (1): BatchNorm2d(672, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
  (2): Hardswish()
)


In [12]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

class SSD(nn.Module):
    def __init__(self, backbone, n_class=3, default_box_n=[4,6,6,6,4,4], state = "Train"):
        
        super().__init__()
        
        self.input_layer = nn.Conv2d(1, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        
        self.n_class = n_class
        self.default_box_n = default_box_n
        self.state = state
        if self.state != "Train":
            self.softmax = nn.Softmax(dim=-1)
            
        #가중치 초기화 인자
        self.rescale_factors = nn.Parameter(torch.FloatTensor(1, 112, 1, 1))  # there are 512 channels in conv4_3_feats
        nn.init.constant_(self.rescale_factors, 20)
        
        #backbone
        
        self.backbone_layer = nn.Sequential(
            backbone.features,
        )
        
        #extra layer
        self.extra_layer_1 = self.extra_layers(960,512,4)
        self.extra_layer_2 = self.extra_layers(512,256,4)
        self.extra_layer_3 = self.extra_layers(256,256,2)
        self.extra_layer_4 = self.extra_layers(256,128,2)
        
        self.extra_layers = [self.extra_layer_1, 
                            self.extra_layer_2, 
                            self.extra_layer_3, 
                            self.extra_layer_4]
        
        
        #detection output
        
        self.cls_layers = nn.ModuleList([
                                        nn.Conv2d(672, default_box_n[0]*(self.n_class), kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
                                        nn.Conv2d(960, default_box_n[1]*(self.n_class), kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
                                        nn.Conv2d(512, default_box_n[2]*(self.n_class), kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
                                        nn.Conv2d(256, default_box_n[3]*(self.n_class), kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
                                        nn.Conv2d(256, default_box_n[4]*(self.n_class), kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
                                        nn.Conv2d(128, default_box_n[5]*(self.n_class), kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
                                        ])
        
        self.loc_layers = nn.ModuleList([
                                        nn.Conv2d(672, default_box_n[0]*4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
                                        nn.Conv2d(960, default_box_n[1]*4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
                                        nn.Conv2d(512, default_box_n[2]*4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
                                        nn.Conv2d(256, default_box_n[3]*4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
                                        nn.Conv2d(256, default_box_n[4]*4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
                                        nn.Conv2d(128, default_box_n[5]*4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
                                        ])
        
        
    
    def init_conv2d(self):
        #가중치 초기화 함수
        
        #모델학습이 순조롭지않다면 향후 추가 예정
        return
    
    def extra_layers(self, input_size, output_size, div):
        layer = nn.Sequential(
            #conv2D 해상도낮추기
            nn.Conv2d(input_size, output_size, kernel_size=(1, 1), stride=(1, 1), bias=False),
            nn.BatchNorm2d(output_size, eps=0.001, momentum=0.01, affine=True, track_running_stats=True),
            nn.Hardswish(),
            
            #Inverted Residual (mobilev2 + squeeze)
            
            #depthwise 
            # kernel size 5 고려해보기
            nn.Conv2d(output_size, output_size, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=output_size, bias=False),
            nn.BatchNorm2d(output_size, eps=0.001, momentum=0.01, affine=True, track_running_stats=True),
            nn.Hardswish(),
            
            #SqueezeExcitation
            nn.Conv2d(output_size, output_size//div, kernel_size=(1, 1), stride=(1, 1)),
            nn.ReLU(inplace=True),
            nn.Conv2d(output_size//div, output_size, kernel_size=(1, 1), stride=(1, 1)),
            
            #Point-wise
            nn.Conv2d(output_size, output_size, kernel_size=(1, 1), stride=(1, 1)),
            nn.Hardswish(),
            nn.Identity()
            
        ) 
        return layer
    
    def forward(self, x):
        f_maps=[]
        for n, layer in enumerate(backbone.features):#12 16
            if n==13: 
                #L2 norm
                #norm = x.pow(2).sum(dim=1, keepdim=True).sqrt()     # (N, 1, 19, 19)
                #featureMap_1 = x / norm                                  # (N, 112, 19, 19)
                #featureMap_1= conv4_3 * self.rescale_factors            # (N, 512, 19, 19)
                
                #13계층 bottleneck까지만
                seq_layer = next(layer.children())
                for seq_n in range(4):
                    x = seq_layer[seq_n](x)
                    if seq_n==0:
                        f_maps.append(x)
                continue
                
            x = layer(x)
            #print("size : {0}, number = {1}".format(x.size(), n))
            if n==16:
                f_maps.append(x) 
                
        for extra_layer in self.extra_layers:
            x = extra_layer(x)
            
            f_maps.append(x)
            
        cls = []
        loc = []
        for f_map, cls_layer, loc_layer in zip(f_maps,self. cls_layers, self.loc_layers):
            output_cls = cls_layer(f_map)
            output_loc = loc_layer(f_map)
            
            cls.append(output_cls.permute(0, 2, 3, 1).contiguous())
            loc.append(output_loc.permute(0, 2, 3, 1).contiguous())
            #cls.append(output_cls)
            #loc.append(output_loc)
            
        cls = torch.cat([o.view(o.size(0), -1) for o in cls], 1)
        loc = torch.cat([o.view(o.size(0), -1) for o in loc], 1)

        loc = loc.view(loc.size(0), -1, 4)
        if self.state == "Train":
            cls = cls.view(cls.size(0), -1, self.n_class)
        else:
            cls = self.softmax(cls.view(cls.size(0), -1, self.n_class))
            
            
        return cls, loc

# default box, loss 함수 기타 기능 구현

In [13]:
# -*- coding: utf-8 -*-
import torch


def point_form(boxes):
    return torch.cat((boxes[:, :2] - boxes[:, 2:]/2,     # xmin, ymin
                     boxes[:, :2] + boxes[:, 2:]/2), 1)  # xmax, ymax


def center_size(boxes):
    return torch.cat((boxes[:, 2:] + boxes[:, :2])/2,  # cx, cy
                     boxes[:, 2:] - boxes[:, :2], 1)  # w, h


def intersect(box_a, box_b):
    A = box_a.size(0)
    B = box_b.size(0)
    max_xy = torch.min(box_a[:, 2:].unsqueeze(1).expand(A, B, 2),
                       box_b[:, 2:].unsqueeze(0).expand(A, B, 2))
    min_xy = torch.max(box_a[:, :2].unsqueeze(1).expand(A, B, 2),
                       box_b[:, :2].unsqueeze(0).expand(A, B, 2))
    inter = torch.clamp((max_xy - min_xy), min=0)
    return inter[:, :, 0] * inter[:, :, 1]


def jaccard(box_a, box_b):# iou
    inter = intersect(box_a, box_b)
    area_a = ((box_a[:, 2]-box_a[:, 0]) *
              (box_a[:, 3]-box_a[:, 1])).unsqueeze(1).expand_as(inter)  # [A,B]
    area_b = ((box_b[:, 2]-box_b[:, 0]) *
              (box_b[:, 3]-box_b[:, 1])).unsqueeze(0).expand_as(inter)  # [A,B]
    union = area_a + area_b - inter
    return inter / union  # [A,B]


def match(threshold, truths, priors, variances, labels, loc_t, conf_t, idx):
    # jaccard index
    # defuatl box와 truths box iou 계산
    overlaps = jaccard(
        truths,
        point_form(priors)
    )
    # (Bipartite Matching)
    # [1,num_objects] best prior for each ground truth
    best_prior_overlap, best_prior_idx = overlaps.max(1, keepdim=True)
    # [1,num_priors] best ground truth for each prior
    best_truth_overlap, best_truth_idx = overlaps.max(0, keepdim=True)
    best_truth_idx.squeeze_(0)
    best_truth_overlap.squeeze_(0)
    best_prior_idx.squeeze_(1)
    best_prior_overlap.squeeze_(1)
    best_truth_overlap.index_fill_(0, best_prior_idx, 2)  # ensure best prior
    # TODO refactor: index  best_prior_idx with long tensor
    # ensure every gt matches with its prior of max overlap
    for j in range(best_prior_idx.size(0)):
        best_truth_idx[best_prior_idx[j]] = j
    matches = truths[best_truth_idx]          # Shape: [num_priors,4]
    conf = labels[best_truth_idx]#+1         # Shape: [num_priors]
    conf[best_truth_overlap < threshold] = 0  # label as background
    loc = encode(matches, priors, variances)
    loc_t[idx] = loc    # [num_priors,4] encoded offsets to learn
    conf_t[idx] = conf  # [num_priors] top class label for each prior
    


def encode(matched, priors, variances):
    # dist b/t match center and prior's center
    g_cxcy = (matched[:, :2] + matched[:, 2:])/2 - priors[:, :2]
    # encode variance
    g_cxcy /= (variances[0] * priors[:, 2:])
    # match wh / prior wh
    g_wh = (matched[:, 2:] - matched[:, :2]) / priors[:, 2:]
    g_wh = torch.log(g_wh) / variances[1]
    # return target for smooth_l1_loss
    return torch.cat([g_cxcy, g_wh], 1)  # [num_priors,4]


# Adapted from https://github.com/Hakuyume/chainer-ssd
def decode(loc, priors, variances):
    boxes = torch.cat((
        priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:],
        priors[:, 2:] * torch.exp(loc[:, 2:] * variances[1])), 1)
    boxes[:, :2] -= boxes[:, 2:] / 2
    boxes[:, 2:] += boxes[:, :2]
    return boxes


def log_sum_exp(x):
    x_max = x.data.max()
    return torch.log(torch.sum(torch.exp(x-x_max), 1, keepdim=True)) + x_max


def nms(boxes, scores, overlap=0.5, top_k=200):
    keep = scores.new(scores.size(0)).zero_().long()
    if boxes.numel() == 0:
        return keep
    x1 = boxes[:, 0]
    y1 = boxes[:, 1]
    x2 = boxes[:, 2]
    y2 = boxes[:, 3]
    area = torch.mul(x2 - x1, y2 - y1)
    v, idx = scores.sort(0)  # sort in ascending order
    # I = I[v >= 0.01]
    idx = idx[-top_k:]  # indices of the top-k largest vals
    xx1 = boxes.new()
    yy1 = boxes.new()
    xx2 = boxes.new()
    yy2 = boxes.new()
    w = boxes.new()
    h = boxes.new()

    # keep = torch.Tensor()
    count = 0
    while idx.numel() > 0:
        i = idx[-1]  # index of current largest val
        # keep.append(i)
        keep[count] = i
        count += 1
        if idx.size(0) == 1:
            break
        idx = idx[:-1]  # remove kept element from view
        # load bboxes of next highest vals
        torch.index_select(x1, 0, idx, out=xx1)
        torch.index_select(y1, 0, idx, out=yy1)
        torch.index_select(x2, 0, idx, out=xx2)
        torch.index_select(y2, 0, idx, out=yy2)
        # store element-wise max with next highest score
        xx1 = torch.clamp(xx1, min=x1[i])
        yy1 = torch.clamp(yy1, min=y1[i])
        xx2 = torch.clamp(xx2, max=x2[i])
        yy2 = torch.clamp(yy2, max=y2[i])
        w.resize_as_(xx2)
        h.resize_as_(yy2)
        w = xx2 - xx1
        h = yy2 - yy1
        # check sizes of xx1 and xx2.. after each iteration
        w = torch.clamp(w, min=0.0)
        h = torch.clamp(h, min=0.0)
        inter = w*h
        # IoU = i / (area(a) + area(b) - i)
        rem_areas = torch.index_select(area, 0, idx)  # load remaining areas)
        union = (rem_areas - inter) + area[i]
        IoU = inter/union  # store result in iou
        # keep only elements with an IoU <= overlap
        idx = idx[IoU.le(overlap)]
    return keep, count

In [14]:
import torch.nn.functional as F

class MultiBoxLoss(nn.Module):
    """SSD의 손실함수 클래스 """

    def __init__(self, thresh=0.5, neg_pos=3, device='cpu'):
        super(MultiBoxLoss, self).__init__()
        self.jaccard_thresh = thresh  # 0.5 match 함수의 jaccard 계수의 임계치
        self.negpos_ratio = neg_pos  # 3:1 Hard Negative Mining의 음과 양 비율
        self.device = device  # 계산 device(CPU | GPU)

    def forward(self, predictions, targets, dboxs):
        """
        파라미터 설명
        ----------
        predictions :모델의 예측값 cls와 loc
        cls는 batch dbox의 개수, 클래스 개수로 이루어짐
        loc은 batch dbox의 개수, 4
        
        targets : [num_batch, 객체개수, 5]
            5는 라벨 정보[xmin, ymin, xmax, ymax, label_ind]

        """

        conf_data, loc_data = predictions
        dbox_list = dboxs

        num_batch = loc_data.size(0)  # 배치 크기
        num_dbox = loc_data.size(1)  # DBox의 수 
        num_classes = conf_data.size(2)  # 클래스 수

        # 손실 계산에 사용할 것을 저장하는 변수 작성
        # conf_t_label：각 DBox에 가장 가까운 정답 BBox의 라벨을 저장 
        # loc_t: 각 DBox에 가장 가까운 정답 BBox의 위치 정보 저장 
        conf_t_label = torch.LongTensor(num_batch, num_dbox).to(self.device)
        
        #conf_t_label.fill_(0)#테스트용도
        loc_t = torch.Tensor(num_batch, num_dbox, 4).to(self.device)

        # loc_t와 conf_t_label에 
        # DBox와 정답 어노테이션 targets를 amtch한 결과 덮어쓰기
        for idx in range(num_batch):  # 미니 배치 루프

            truths = targets[idx][:, :-1].to(self.device)
            labels = targets[idx][:, -1].to(self.device)
            dbox = dbox_list.to(self.device)

            # match 함수를 실행하여 loc_t와 conf_t_label 내용 갱신
            # loc_t: 각 DBox에 가장 가까운 정답 BBox 위치 정보가 덮어써짐.
            # conf_t_label：각 DBox에 가장 가까운 정답 BBox 라벨이 덮어써짐.
            # 단, 가장 가까운 BBox와 iou가 0.5보다 작은 경우,
            # 정답 BBox의 라벨 conf_t_label은 배경 클래스 0으로 한다.
            variance = [0.1, 0.2]
            
            # 라벨을 dbox에 대한 offset으로 변환
            match(self.jaccard_thresh, truths, dbox,
                  variance, labels, loc_t, conf_t_label, idx)

        #물체를 발견한 offset만 손실 계산
        pos_mask = conf_t_label > 0  

        # pos_mask를 loc_data 크기로 변형
        pos_idx = pos_mask.unsqueeze(pos_mask.dim()).expand_as(loc_data)

        # Positive DBox의 loc_data와 offset loc_t 취득
        #print(loc_data.size())
        #print(pos_mask.size())
        loc_p = loc_data[pos_idx].view(-1, 4).to(device)
        loc_t = loc_t[pos_idx].view(-1, 4)

        # 물체를 발견한 Positive DBox의 오프셋 정보 loc_t의 손실(오차)를 계산
        loss_l = F.smooth_l1_loss(loc_p, loc_t, reduction='sum')

        # ----------
        # 클래스 예측의 손실 : loss_c를 계산
        # 교차 엔트로피 오차 함수로 손실 계산. 단 배경 클래스가 정답인 DBox가 압도적으로 많으므로,
        # Hard Negative Mining을 실시하여 물체 발견 DBox 및 배경 클래스 DBox의 비율이 1:3이 되도록 한다.
        # 배경 클래스 DBox로 예상한 것 중 손실이 적은 것은 클래스 예측 손실에서 제외
        # ----------
        batch_conf = conf_data.view(-1, num_classes)

        # 클래스 예측의 손실함수 계산(reduction='none'으로 하여 합을 취하지 않고 차원 보존)
        loss_c = F.cross_entropy(
            batch_conf, conf_t_label.view(-1), reduction='none')
        
        
        #------------이 아래 부분은 이해 못함 

        # -----------------
        # Negative DBox중 Hard Negative Mining으로 
        # 추출하는 것을 구하는 마스크 작성
        # -----------------

        # 물체를 발견한 Positive DBox의 손실을 0으로 한다.
        # (주의) 물체는 label이 1 이상, 라벨 0은 배경을 의미
        num_pos = pos_mask.long().sum(1, keepdim=True)  # 미니 배치별 물체 클래스 예측 수
        loss_c = loss_c.view(num_batch, -1)  # torch.Size([num_batch, 8732])
        loss_c[pos_mask] = 0  # 물체를 발견한 DBox는 손실 0으로 한다.

        # Hard Negative Mining
        # 각 DBox 손실의 크기 loss_c 순위 idx_rank를 구함
        _, loss_idx = loss_c.sort(1, descending=True)
        _, idx_rank = loss_idx.sort(1)

        #  (주의) 구현된 코드는 특수하며 직관적이지 않음.
        # 위 두 줄의 요점은 각 DBox에 대해 손실 크기가 몇 번째인지의 정보를 
        # idx_rank 변수로 빠르게 얻는 코드이다.
        
        # DBox의 손실 값이 큰 쪽부터 내림차순으로 정렬하여, 
        # DBox의 내림차순의 index를 loss_idx에 저장한다.
        # 손실 크기 loss_c의 순위 idx_rank를 구한다.
        # 내림차순이 된 배열 index인 loss_idx를 0부터 8732까지 오름차순으로 다시 정렬하기 위하여
        # 몇 번째 loss_idx의 인덱스를 취할 지 나타내는 것이 idx_rank이다.
        # 예를 들면 idx_rank 요소의 0번째 = idx_rank[0]을 구하는 것은 loss_idx의 값이 0인 요소,
        # 즉 loss_idx[?] =0은 원래 loss_c의 요소 0번째는 내림차순으로 정렬된 loss_idx의 
        # 몇 번째입니까? 를구하는 것이 되어 결과적으로, 
        # ? = idx_rank[0]은 loss_c의 요소 0번째가 내림차순으로 몇 번째인지 나타냄

        # 배경 DBox의 수 num_neg를 구한다. HardNegative Mining으로 
        # 물체 발견 DBox으 ㅣ수 num_pos의 세 배 (self.negpos_ratio 배)로 한다.
        # DBox의 수를 초과한 경우에는 DBox의 수를 상한으로 한다.
        num_neg = torch.clamp(num_pos*self.negpos_ratio, max=num_dbox)

        # idx_rank에 각 DBox의 손실 크기가 위에서부터 몇 번째인지 저장되었다.
        # 배경 DBox의 수 num_neg보다 순위가 낮은(손실이 큰) DBox를 취하는 마스크 작성
        # torch.Size([num_batch, 8732])
        neg_mask = idx_rank < (num_neg).expand_as(idx_rank)

        # -----------------
        # (종료) 지금부터 Negative DBox 중 Hard Negative Mining으로 추출할 것을 구하는 마스크를 작성
        # -----------------

        # 마스크 모양을 고쳐 conf_data에 맞춘다
        # pos_idx_mask는 Positive DBox의 conf를 꺼내는 마스크이다.
        # neg_idx_mask는 Hard Negative Mining으로 추출한 Negative DBox의 conf를 꺼내는 마스크이다.
        # pos_mask：torch.Size([num_batch, 8732])
        # --> pos_idx_mask：torch.Size([num_batch, 8732, 21])
        pos_idx_mask = pos_mask.unsqueeze(2).expand_as(conf_data)
        neg_idx_mask = neg_mask.unsqueeze(2).expand_as(conf_data)

        # conf_data에서 pos와 neg만 꺼내서 conf_hnm으로 한다. 
        # 형태는 torch.Size([num_pos+num_neg, 21])
        conf_hnm = conf_data[(pos_idx_mask+neg_idx_mask).gt(0)
                             ].view(-1, num_classes)
        # gt는 greater than (>)의 약칭. mask가 1인 index를 꺼낸다
        # pos_idx_mask+neg_idx_mask는 덧셈이지만 index로 mask를 정리할 뿐임.
        # pos이든 neg이든 마스크가 1인 것을 더해 하나의 리스트로 만들어 이를 gt로 췯그한다.

        # 마찬가지로 지도 데이터인 conf_t_label에서 pos와 neg만 꺼내, conf_t_label_hnm 으로 
        # torch.Size([pos+neg]) 형태가 된다
        conf_t_label_hnm = conf_t_label[(pos_mask+neg_mask).gt(0)]

        # confidence의 손실함수 계산(요소의 합계=sum을 구함)
        loss_c = F.cross_entropy(conf_hnm, conf_t_label_hnm, reduction='sum')

        # 물체를 발견한 BBox의 수 N (전체 미니 배치의 합계) 으로 손실을 나눈다.
        N = num_pos.sum()
        loss_l /= N
        loss_c /= N

        return loss_l, loss_c

In [15]:
import math
from math import sqrt
from itertools import product as product

class default:
    
    def __init__(self, image_size=300, feature_maps=[19, 10, 5, 3, 2, 1], min_sizes=[]):
        super(default, self).__init__()
        self.image_size = 300
        # number of priors for feature map location (either 4 or 6)
        self.feature_maps = feature_maps
        self.min_sizes = [16, 30, 60, 100, 150, 300]  #0.2, 0.34, 0.48, 0.62, 0.76, 0.9?
        self.max_sizes = [30, 60, 100, 150, 300, 300]
        self.steps = [19,10,5,3,2,1] # 이미지 그리드로 나눈 개수
        self.aspect_ratios = [[2], [2, 3], [2, 3], [2, 3], [2], [2]]
        self.clip = True

    def forward(self):
        mean = []
        for k, f in enumerate(self.feature_maps):
            for i, j in product(range(f), repeat=2):
                f_k = self.steps[k] # 그리드 개수
                # default 박스 중점
                cx = (j + 0.5) / f_k
                cy = (i + 0.5) / f_k
                # aspect_ratio: 1
                # default 박스 공식이 아닌 임의로 설정 그리드 크기를 기준으로 나눔
                s_k = self.min_sizes[k]/self.image_size
                mean += [cx, cy, s_k, 1] # s_k

                # aspect_ratio: 1
                # s_k*s_k+1
                s_k_prime = sqrt(s_k * (self.max_sizes[k]/self.image_size))
                
                #print(cx,cy,s_k_prime,self.max_sizes[k]/self.image_size)
                mean += [cx, cy, s_k_prime, 1] #s_k

                # rest of aspect ratios
                for ar in self.aspect_ratios[k]:
                    mean += [cx, cy, s_k*sqrt(ar), 1] # s_k/sqrt(ar)
                    mean += [cx, cy, s_k/sqrt(ar), 1]
        # back to torch land
        output = torch.Tensor(mean).view(-1, 4)
        if self.clip:
            output.clamp_(max=1, min=0)
        return output
    def cxcy_to_xy(self, tensor_default_box):
        return tensor_default_box*self.image_size
    
    def xy_to_cxcy(self, tensor_default_box):
        return tensor_default_box/self.image_size

In [16]:
from torch import optim
def train_step(model, Data_loader, epoch_num, lr=0.0000001):
    d=default()
    tensor_d = d.forward()
    optimizer = optim.SGD(model.parameters(), lr=lr)
    loss_func = MultiBoxLoss(device=device)
    
    epoch_train_loss = 0.0 # 에포크 손실 합
    #epoch_val_loss = 0.0
    
    
    for epoch in range(epoch_num):
        epoch_start = time.time()
        print("Epoch : {0} / {1}".format(epoch+1,epoch_num))
        
        #targets=np.array([[[1,2,3,4,5],[1,2,3,4,5]],[[1,2,3,4,5],[1,2,3,4,5]]])
        for idx, data in enumerate(Data_loader):
            iter_start = time.time()
            #img=torch.zeros((32,3,300,300),dtype=torch.float)

            images = data[0].to(device)

            labels = [label.to(device) for label in data[1]]
            optimizer.zero_grad()#이전 값 들에 대한 가중치 기울기 초기화
            
            with torch.set_grad_enabled(True):
                cls, loc = model(images)
                tensor_d.to(device)
                loss_l, loss_c = loss_func((cls, loc), labels, tensor_d)
                
                loss = loss_l + loss_c
                loss.backward()
                optimizer.step() # 파라미터 갱신
            if(idx % 10 == 0):
                iter_end = time.time()
                print("Current Batch {0} / {1} | Cls Loss : {2:.3f}, Loc Loss : {3:.3f}, Total Loss : {4:.3f} | 10 iter time {5:.4f}: "
                      .format(idx, len(Data_loader), loss_l.item(), loss_c.item(), loss.item(), iter_end - iter_start))

                
                iter_start =time.time()
                
            epoch_train_loss+=loss.item()
                
        print("Epoch : {0} / {1} of Total Loss : Total Loss : {2:.3f}".format(epoch+1, epoch_num, epoch_train_loss))
        print("-----------------------------------------------")
        epoch_train_loss=0
        
        torch.save(model, 
                './objectdetection_model/ssd300_' + str(epoch+1) + '.pth')
        torch.save(model.state_dict(), 
                './objectdetection_model/ssd300_weight_' + str(epoch+1) + '.pth')

In [17]:
model = SSD(backbone, n_class=3)
model = model.to(device)

In [18]:
train_step(model, train_dataloader, epoch_num = 10)

Epoch : 1 / 10
Current Batch 0 / 33 | Cls Loss : nan, Loc Loss : 7.699, Total Loss : nan | 10 iter time 0.4890: 
Current Batch 3 / 33 | Cls Loss : nan, Loc Loss : nan, Total Loss : nan | 10 iter time 0.3060: 
Current Batch 6 / 33 | Cls Loss : nan, Loc Loss : nan, Total Loss : nan | 10 iter time 0.3059: 
Current Batch 9 / 33 | Cls Loss : nan, Loc Loss : nan, Total Loss : nan | 10 iter time 0.3068: 
Current Batch 12 / 33 | Cls Loss : nan, Loc Loss : nan, Total Loss : nan | 10 iter time 0.3063: 
Current Batch 15 / 33 | Cls Loss : nan, Loc Loss : nan, Total Loss : nan | 10 iter time 0.3062: 
Current Batch 18 / 33 | Cls Loss : nan, Loc Loss : nan, Total Loss : nan | 10 iter time 0.3070: 
Current Batch 21 / 33 | Cls Loss : nan, Loc Loss : nan, Total Loss : nan | 10 iter time 0.3175: 
Current Batch 24 / 33 | Cls Loss : nan, Loc Loss : nan, Total Loss : nan | 10 iter time 0.3053: 
Current Batch 27 / 33 | Cls Loss : nan, Loc Loss : nan, Total Loss : nan | 10 iter time 0.3086: 
Current Batch 30 

KeyboardInterrupt: 

In [None]:
for idx, data in enumerate(train_dataloader):
    print(idx)
    print(data[1][0][:,-1])
    break

In [None]:
len(train_dataloader)
print(os.getcwd())