In [1]:
%load_ext autoreload
%autoreload 2

In [1]:
from fastai2.vision.all import *
#import librosa.core
#import librosa.display
import torchaudio
import torchaudio.transforms

In [2]:
path = untar_data('https://storage.googleapis.com/ml-animal-sounds-datasets/macaques_24414Hz.zip')
path

Path('/Users/florian/.fastai/data/macaques_24414Hz')

In [3]:
class TensorAudio(TensorBase):
          
    @classmethod
    def create(cls, o, norm=True):
        o, sr = torchaudio.load(o, normalization=norm)
        #o = cls(tensor(o).float())
        o = cls(o)
        o.sr = sr
        o.mode = 'raw'
        return o
    '''
    def show(self, ctx=None):
        if self.mode == 'raw':
            print(self.shape)
            librosa.display.waveplot(np.asarray(self.squeeze()), sr=self.sr)
            #print(img.shape)
    '''   

class AudioFixLength(Transform):

    def __init__(self, length=0.0):
        self.length = length

    def encodes(self, o: TensorAudio):
        if self.length > 0.0:
            n_samples = int(o.sr * self.length)
            if n_samples < len(o.squeeze()):
                o = torch.split(o, n_samples, dim=1)[0]
            else:
                n_pad = int(o.sr * self.length - len(o.squeeze()))
                n_pre = (torch.rand(1) * n_pad).int()
                n_post = n_pad - n_pre
                o = F.pad(input=o, pad=(n_pre,n_post), mode='constant', value=0)
        return o
    
class AudioResample(Transform):

    def __init__(self, target_sr=0, device='cpu'):
        self.target_sr = target_sr
        self.device = device

    def encodes(self, o: TensorAudio):
        if self.target_sr != o.sr:
            resample = torchaudio.transforms.Resample(orig_freq=o.sr, new_freq=self.target_sr).to(self.device)
            o = TensorAudio(resample(o))
            o.sr = self.target_sr
        return o
    
'''    
class AudioNormalize(Transform):
    def __init__(self, mean, std, device='cpu'):
        self.mean = mean
        self.std = std
        self.device = device
        
    def encodes(self, o: TensorAudio):
        return ((o - self.mean) / self.std).to(self.device)
'''    
    
class AudioRandomCrop(RandTransform):
    
    def __init__(self, p=1.0, length=0.0): 
        super().__init__(p=p)
        self.length = length
        
    def encodes(self, o: TensorAudio): 
        if length > 0.0:
            n_samples = int(o.sr * self.length)
            if n_samples < len(o[0]):
                n_cut = len(o[0]) - n_samples
                n_pre = (n_cut * torch.rand(1)).int()
                o = o[:,n_pre:(n_samples + n_pre)]
        return o    
    

class AudioAddNoise(RandTransform):
    "Randomly add noise with probability `p`"
    def __init__(self, p=0.5, device='cpu'): 
        super().__init__(p=p)
        self.device=device
        
    def encodes(self, o: TensorAudio): 
        noise_amp = (0.001*torch.rand(1) * torch.max(o)).to(self.device)
        o = o + noise_amp * torch.empty(o.shape).normal_().to(self.device)
        return o
    

class AudioToTensor(Transform):

    def encodes(self, o: TensorAudio):
        o = tensor(o).float()
        return o

def AudioBlock(length=0.0):
    return TransformBlock(type_tfms=TensorAudio.create, batch_tfms=AudioToTensor)

In [4]:
length = 0.5
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
dblocks = DataBlock(blocks = (AudioBlock,CategoryBlock),
                 get_items=get_files, 
                 splitter=RandomSplitter(seed=42),
                 get_y=parent_label,
                 #item_tfms=[AudioFixLength(length=length)],
                 item_tfms=[AudioRandomCrop(length=length),
                            AudioFixLength(length=length),
                            #AudioResample(target_sr=(24414//2))
                           ],
                 batch_tfms=[AudioAddNoise(device=device)]
                 )

dls=dblocks.dataloaders(path, bs=128)

In [5]:
dls.one_batch()

(TensorAudio([[[ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000]],
 
         [[ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000]],
 
         [[ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000]],
 
         ...,
 
         [[ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000]],
 
         [[ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000]],
 
         [[ 0.0363,  0.0484,  0.0604,  ..., -0.0080,  0.0003,  0.0103]]]),
 TensorCategory([1, 0, 5, 5, 2, 2, 6, 0, 5, 6, 5, 4, 7, 3, 0, 3, 2, 0, 0, 7, 6, 5, 6, 2,
         6, 6, 4, 2, 2, 3, 7, 1, 2, 5, 0, 0, 2, 2, 1, 0, 5, 4, 3, 0, 2, 4, 6, 3,
         6, 0, 4, 2, 6, 3, 6, 6, 2, 6, 3, 5, 6, 6, 2, 6, 4, 2, 0, 3, 0, 5, 7, 7,
         6, 5, 0, 6, 4, 6, 6, 5, 6, 3, 1, 3, 3, 6, 3, 7, 7, 1, 2, 4, 7, 7, 3, 0,
         1, 0, 4, 0, 2, 2, 4, 0, 3, 6, 3, 3, 6, 6, 4, 5, 5, 6, 0, 5, 2, 5, 4, 4,
         5, 2, 0, 6, 6, 0, 4, 4]))

In [6]:
# https://www.kaggle.com/readilen/resnet-for-mnist-with-pytorch

def conv1xk(in_channels, out_channels, kernel_size=3, stride=1):
    padding = kernel_size // 2
    return nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size,
                    stride=stride, padding=padding, bias=False)

# Residual block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = conv1xk(in_channels, out_channels, kernel_size, stride)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv1xk(out_channels, out_channels, kernel_size)
        self.bn2 = nn.BatchNorm1d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        #print("in", x.shape)
        residual = x
        out = self.conv1(x)
        #print("conv1", out.shape)
        out = self.bn1(out)
        #print("bn1", out.shape)
        out = self.relu(out)
        out = self.conv2(out)
        #print("conv2", out.shape)
        out = self.bn2(out)
        #print("bn2", out.shape)
        if self.downsample:
            #print("downsample")
            residual = self.downsample(residual)
        #print("+", out.shape, residual.shape)
        out += residual
        out = self.relu(out)
        return out
    
# ResNet
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10, kernel_size=3, stride=2):
        super(ResNet, self).__init__()
        self.in_channels = 16
        self.conv = conv1xk(1, 16)
        self.bn = nn.BatchNorm1d(16)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self.make_layer(block, 16, layers[0], kernel_size)
        self.layer2 = self.make_layer(block, 32, layers[0], kernel_size, stride)
        self.layer3 = self.make_layer(block, 64, layers[1], kernel_size, stride)
        self.layer4 = self.make_layer(block, 128, layers[2], kernel_size, stride)
        self.layer5 = self.make_layer(block, 256, layers[3], kernel_size, stride)
        self.layer6 = self.make_layer(block, 512, layers[4], kernel_size, stride)
        #self.avg_pool = nn.AvgPool1d(8)
        # replaced with AdaptiveAvgPool to handle different input sizes
        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        self.fc1 = nn.Linear(512, 48)
        self.fc2 = nn.Linear(48, num_classes)
        self.flatten = nn.Flatten()

    def make_layer(self, block, out_channels, blocks, kernel_size=3, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels):
            downsample = nn.Sequential(
                conv1xk (self.in_channels, out_channels, kernel_size=kernel_size, stride=stride),
                nn.BatchNorm1d(out_channels))
        layers = []
        layers.append(block(self.in_channels, out_channels, kernel_size, stride, downsample))
        self.in_channels = out_channels
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = self.layer6(x)
        x = self.avg_pool(x)
        x = out.view(x.size(0), -1)
        #x = self.flatten(x)
        x = self.fc1(x)
        x = self.fc2(x)
        return x
    
def init_cnn_1d(m):
    if getattr(m, 'bias', None) is not None: nn.init.constant_(m.bias, 0)
    if isinstance(m, (nn.Conv1d,nn.Linear)): nn.init.kaiming_normal_(m.weight)
    for l in m.children(): init_cnn_1d(l)
    
class ResNetSeq(nn.Sequential):
    def __init__(self, block, layers, num_classes=10, kernel_size=3, stride=2):
        l1 = []
        l2 = []
        l3 = []
        self.in_channels = 16
        l1.append(conv1xk(1, 16))
        l1.append(nn.BatchNorm1d(16))
        l1.append(nn.ReLU(inplace=True))
        l2.append(self.make_layer(block, 16, layers[0], kernel_size))
        l2.append(self.make_layer(block, 32, layers[0], kernel_size, stride))
        l2.append(self.make_layer(block, 64, layers[1], kernel_size, stride))
        l2.append(self.make_layer(block, 128, layers[2], kernel_size, stride))
        l2.append(self.make_layer(block, 256, layers[3], kernel_size, stride))
        l2.append(self.make_layer(block, 512, layers[4], kernel_size, stride))
        # replaced with nn.AvgPool1d(8) AdaptiveAvgPool(1) to handle different input sizes
        l3.append(nn.AdaptiveAvgPool1d(1))
        l3.append(nn.Linear(512, 48))
        l3.append(nn.Linear(48, num_classes))
        
        super().__init__(nn.Sequential(*l1, *l2),nn.Sequential(*l3))
        init_cnn_1d(self)
        
    def make_layer(self, block, out_channels, blocks, kernel_size=3, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels):
            downsample = nn.Sequential(
                conv1xk (self.in_channels, out_channels, kernel_size=kernel_size, stride=stride),
                nn.BatchNorm1d(out_channels))
        layers = []
        layers.append(block(self.in_channels, out_channels, kernel_size, stride, downsample))
        self.in_channels = out_channels
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)
        

In [7]:
kernel_size = 15
stride = 4

net_args = {
    "block": ResidualBlock,
    "layers": [2, 2, 2, 2, 2, 2],
    "kernel_size": kernel_size,
    "stride": stride,
    "num_classes": 8
}
resseq = ResNetSeq(**net_args)

def  splitter(m):
    return L(m[0][:6], m[0][6:], m[1]).map(params)

In [11]:
resseq[-1][-1]

Linear(in_features=48, out_features=8, bias=True)

In [30]:
learn = Learner(dls, resseq, loss_func=F.cross_entropy, metrics=accuracy, splitter=splitter, cbs=ShowGraphCallback())

In [18]:
learn.lr_find()

KeyboardInterrupt: 

In [None]:
learn.unfreeze()
learn.fit_one_cycle(5, 3e-4)