In [1]:
%cd ..

/home/philipp/ai-audio-enhancer


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio as T
import torchaudio.transforms as TT
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import seaborn
from moviepy.editor import *
import wandb
import seaborn as sns
import librosa

  from .autonotebook import tqdm as notebook_tqdm


Convert mp4 to mp3

In [3]:
def convert(in_path, out_path):
    os.system(f'ffmpeg -i {in_path} -c copy {out_path}')

convert('tmp/test.mp4', 'tmp/test.aac')

ffmpeg version 4.3 Copyright (c) 2000-2020 the FFmpeg developers
  built with gcc 7.3.0 (crosstool-NG 1.23.0.449-a04d0)
  configuration: --prefix=/home/philipp/anaconda3/envs/audio --cc=/opt/conda/conda-bld/ffmpeg_1597178665428/_build_env/bin/x86_64-conda_cos6-linux-gnu-cc --disable-doc --disable-openssl --enable-avresample --enable-gnutls --enable-hardcoded-tables --enable-libfreetype --enable-libopenh264 --enable-pic --enable-pthreads --enable-shared --disable-static --enable-version3 --enable-zlib --enable-libmp3lame
  libavutil      56. 51.100 / 56. 51.100
  libavcodec     58. 91.100 / 58. 91.100
  libavformat    58. 45.100 / 58. 45.100
  libavdevice    58. 10.100 / 58. 10.100
  libavfilter     7. 85.100 /  7. 85.100
  libavresample   4.  0.  0 /  4.  0.  0
  libswscale      5.  7.100 /  5.  7.100
  libswresample   3.  7.100 /  3.  7.100
Input #0, mov,mp4,m4a,3gp,3g2,mj2, from 'tmp/test.mp4':
  Metadata:
    major_brand     : dash
    minor_version   : 0
    compatible_brands: iso6

In [12]:
audio, sr = librosa.load('tmp/test.aac', mono=True)
audio

array([0., 0., 0., ..., 0., 0., 0.], dtype=float32)

In [None]:
from torch_receptive_field import receptive_field

model = nn.Sequential(
    nn.Conv2d(1, 1, kernel_size=7, stride=2, padding=3),
    nn.Conv2d(1, 1, kernel_size=7, stride=1, padding=6, dilation=2),
    nn.Conv2d(1, 1, kernel_size=7, stride=2, padding=3),
    nn.Conv2d(1, 1, kernel_size=7, stride=1, padding=6, dilation=2),
    nn.Conv2d(1, 1, kernel_size=7, stride=2, padding=3),
    nn.Conv2d(1, 1, kernel_size=7, stride=1, padding=6, dilation=2),
    nn.Conv2d(1, 1, kernel_size=7, stride=2, padding=3),
    nn.Conv2d(1, 1, kernel_size=7, stride=1, padding=6, dilation=2),
    nn.Conv2d(1, 1, kernel_size=7, stride=2, padding=3),
    nn.Conv2d(1, 1, kernel_size=7, stride=1, padding=6, dilation=2),
    nn.Conv2d(1, 1, kernel_size=7, stride=2, padding=3),
    nn.Conv2d(1, 1, kernel_size=7, stride=1, padding=6, dilation=2),
    nn.Conv2d(1, 1, kernel_size=7, stride=2, padding=3),
    nn.Conv2d(1, 1, kernel_size=7, stride=1, padding=6, dilation=2),
    nn.Conv2d(1, 1, kernel_size=7, stride=2, padding=3),
    nn.Conv2d(1, 1, kernel_size=7, stride=1, padding=6, dilation=2),
    nn.Conv2d(1, 1, kernel_size=7, stride=2, padding=3),
    nn.Conv2d(1, 1, kernel_size=7, stride=1, padding=6, dilation=2),
    nn.Conv2d(1, 1, kernel_size=7, stride=2, padding=3),
    nn.Conv2d(1, 1, kernel_size=7, stride=1, padding=6, dilation=2),
    nn.Conv2d(1, 1, kernel_size=7, stride=2, padding=3),
    nn.Conv2d(1, 1, kernel_size=7, stride=1, padding=6, dilation=2),
    nn.Conv2d(1, 1, kernel_size=7, stride=2, padding=3),
    nn.Conv2d(1, 1, kernel_size=7, stride=1, padding=6, dilation=2),
)

size = 44100 * 5
receptive_field(model.cuda(), input_size=(1, size, 1))

In [None]:
def receptive_field(kernel_size, num_layers, dilation_cycle):
    return (kernel_size - 1) * sum(dilation_cycle[i % len(dilation_cycle)] for i in range(num_layers)) + 1

receptive_field(7, 30, [2**i for i in range(0, 9+1)])

Diffusion Models:
x -> x1, x2, xT ~ N(0, I)
Train to reverse noise: x(t) -> x(t-1)

Idea:
Given input clip x, instead of reversing the process directly, add noise to it and then try to reverse the noise.
Hypothesis: The output will not sound like the original but instead more like an improved version of the original but of course more different.
When we add a lot of noise, the output will sound completely different than the original. 
In order to keep the original information, use features from another encoder (for example trained on contrasting cover songs).

Diffusion Model TODOs:
- Dataset
- DataLoader
- Colate function
- Model
- Training loop
- Inference

Memory consumption:
- perform checkpointing

Prototype Requirements:
    - Denoise 5s segments
    - Sampling rate: 44100

Model architecture:
- DiffWave:
    - Each layer has the full output resolution
    - Uses exponential dilation factors to have a receptive field that spans the entire input
    - Problem: consumes a lot of memory
    - Possible solution: trade compute for memory by using checkpointing -> too slow
- U-Net WaveNet:
    - Downsamples the sequence to reduce memory footprint and increase performance
    - Problem: the ear is very sensitive to errors in the high frequencies which are troublesome during the upscaling operations
-> Final decision: 
    - Use U-Net because WaveNet is either way too memory demanding or way too slow when using checkpointing
    - Also, we can try to optimize the hell out of the U-Net architecture (skip connections, attention, etc.)

In [22]:
class DiffusionEmbedding(nn.Module):
    def __init__(self, num_diffusion_steps, num_channels):
        super().__init__()
        self.dim_encoding = int(4 * np.ceil(np.log(num_diffusion_steps)))
        self.dim_embedding = num_channels
        self.register_buffer('encoding', self._build_encoding(num_diffusion_steps), persistent=False)
        self.projection1 = nn.Linear(self.dim_encoding, self.dim_embedding)
        self.projection2 = nn.Linear(self.dim_embedding, self.dim_embedding)

    def forward(self, diffusion_step):
        if diffusion_step.dtype in [torch.int32, torch.int64]:
            x = self.encoding[diffusion_step]
        else:
            x = self._lerp_encoding(diffusion_step)
        x = F.leaky_relu(self.projection1(x)) + x
        x = F.leaky_relu(self.projection2(x)) + x
        return x

    def _lerp_encoding(self, t):
        low_idx = torch.floor(t).long()
        high_idx = torch.ceil(t).long()
        low = self.encoding[low_idx]
        high = self.encoding[high_idx]
        return low + (high - low) * (t - low_idx)

    def _build_encoding(self, num_diffusion_steps):
        steps = torch.arange(num_diffusion_steps).unsqueeze(1)
        dims = torch.arange(self.dim_encoding).unsqueeze(0)
        encoding = 2. * np.pi * steps * np.exp(-np.log(2) * dims)
        encoding = torch.cat([torch.sin(encoding), torch.cos(encoding)], dim=1)
        return encoding


class DownsamplingBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(DownsamplingBlock, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv1d(in_channels, out_channels, kernel_size=7, stride=2, padding=3),
            nn.LeakyReLU(inplace=True),
            nn.Conv1d(out_channels, out_channels, kernel_size=7, stride=1, padding=6, dilation=2),
            nn.LeakyReLU(inplace=True))
    
    def forward(self, x):
        return self.conv(x)


class UpsamplingBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UpsamplingBlock, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv1d(in_channels, out_channels, kernel_size=7, stride=1, padding=3),
            nn.LeakyReLU(inplace=True),
            nn.Conv1d(out_channels, out_channels, kernel_size=7, stride=1, padding=6, dilation=2),
            nn.LeakyReLU(inplace=True))
    
    def forward(self, x, skip):
        x = F.upsample(x, size=skip.shape[2], mode='linear')
        x = torch.cat((x, skip), dim=1)
        x = self.conv(x)
        return x


class DiffusionModel(nn.Module):
    def __init__(self, num_diffusion_steps):
        super(DiffusionModel, self).__init__()
        self.diffusion_embedding = DiffusionEmbedding(
            num_diffusion_steps=num_diffusion_steps,
            num_channels=32)
        self.in_proj = nn.Conv1d(1, 64, 1)
        self.out_proj = nn.Conv1d(64, 1, 1)
        self.down = nn.ModuleList([
            DownsamplingBlock(64, 64),
            DownsamplingBlock(64, 64),
            DownsamplingBlock(64, 64),
            DownsamplingBlock(64, 64),
            DownsamplingBlock(64, 128),
            DownsamplingBlock(128, 128),
            DownsamplingBlock(128, 128),
            DownsamplingBlock(128, 128),
            DownsamplingBlock(128, 256),
            DownsamplingBlock(128, 256),
            DownsamplingBlock(256, 256),
            DownsamplingBlock(256, 256),
        ])
        self.up = nn.ModuleList([
            UpsamplingBlock(256, 256),
            UpsamplingBlock(256, 256),
            UpsamplingBlock(256, 256),
            UpsamplingBlock(256, 128),
            UpsamplingBlock(128, 128),
            UpsamplingBlock(128, 128),
            UpsamplingBlock(128, 128),
            UpsamplingBlock(128, 64),
            UpsamplingBlock(64, 64),
            UpsamplingBlock(64, 64),
            UpsamplingBlock(64, 64),
            UpsamplingBlock(64, 64),
        ])
        self.init_weights()
    
    def init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_uniform_(m.weight)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
            elif isinstance(m, nn.BatchNorm1d):
                nn.init.constant_(m.weight.data, 1)
                nn.init.constant_(m.bias.data, 0)
    
    def forward(self, audio, diffusion_step):
        embedding = self.diffusion_embedding(diffusion_step)
        x = audio.unsqueeze(1) # channel dimension
        x = self.in_proj(x)

        skip_connections = []
        for layer in self.down:
            embedding_padded = torch.zeros(x.shape[1], device=x.device)
            embedding_padded[:len(embedding)] = embedding
            embedding_padded = embedding_padded.view(1, -1, 1)
            x = x + embedding_padded
            x = layer(x)
            skip_connections.append(x)

        for i, layer in enumerate(self.up):
            skip = skip_connections[len(skip_connections) - i - 1]
            x = layer(x, skip)

        x = self.out_proj(x)
        return x


In [23]:
model = DiffusionModel(num_diffusion_steps=10).cuda()
sum([param.nelement() for param in model.parameters()])

AttributeError: 'DiffusionModel' object has no attribute 'weight'