### WaveNet

We will implement the [WaveNet](https://arxiv.org/pdf/1609.03499.pdf) Paper introduced by Deepmind as an early generative model, pre-transformer era.

WaveNet is an autoregressive generative neural network, that is: it models conditional distributions based on sequences being passed in the model.

## Core Implementation details:
- Dilated Causal Convolutions
- Quantized softmax
- Gated activation units
- Residual and Skip connections

## Objective:
- Implement core components using torch
- Train and Evaluate
- Build such that logic can be extended to text generation.
- Bonus: Make some music!!

The joint probability of a waveform $x$ is given by $$p(x) = \prod_{t=1}^{T} p(x_t|x_1,...,x_{t-1})   $$

Thus each sample $x_t$ is conditioned on the samples at previous timestamps

### Important Notes from paper that will help me write this up

- There are no pooling layers in the network, and the
output of the model has the same time dimensionality as the input.
-  The model outputs a categorical
distribution over the next value xt with a softmax layer and it is optimized to maximize the loglikelihood of the data w.r.t. the parameters.



In [2]:
!ls

sample_data


In [3]:
### Acquire training data
# !wget -O file1.zip "https://mirg.city.ac.uk/datasets/magnatagatune/mp3.zip.001"
# !wget -O file2.zip "https://mirg.city.ac.uk/datasets/magnatagatune/mp3.zip.002"
# !wget -O file3.zip "https://mirg.city.ac.uk/datasets/magnatagatune/mp3.zip.003"
# !wget "https://mirg.city.ac.uk/datasets/magnatagatune/annotations_final.csv"

# !mkdir audio
# !cat file1.zip file2.zip file3.zip > audio/mp3_all.zip
# !unzip audio/mp3_all.zip
# !mv "annotations_final.csv" "audio/annotations_final.csv"

In [4]:
import os
import pandas as pd
pd.set_option('display.max_rows',200)

In [5]:
df = pd.read_csv('audio/annotations_final.csv',delimiter='\t')

FileNotFoundError: ignored

In [None]:
df['folder'] = df['mp3_path'].apply(lambda x: x.split('/')[0])

In [None]:
#@title music tags

mus_tags =  ['no voice',
 'singer',
 'duet',
 'plucking',
 'hard rock',
 'world',
 'bongos',
 'harpsichord',
 'female singing',
 'clasical',
 'sitar',
 'chorus',
 'female opera',
 'male vocal',
 'vocals',
 'clarinet',
 'heavy',
 'silence',
 'beats',
 'men',
 'woodwind',
 'funky',
 'no strings',
 'chimes',
 'foreign',
 'no piano',
 'horns',
 'classical',
 'female',
 'no voices',
 'soft rock',
 'eerie',
 'spacey',
 'jazz',
 'guitar',
 'quiet',
 'no beat',
 'banjo',
 'electric',
 'solo',
 'violins',
 'folk',
 'female voice',
 'wind',
 'happy',
 'ambient',
 'new age',
 'synth',
 'funk',
 'no singing',
 'middle eastern',
 'trumpet',
 'percussion',
 'drum',
 'airy',
 'voice',
 'repetitive',
 'birds',
 'space',
 'strings',
 'bass',
 'harpsicord',
 'medieval',
 'male voice',
 'girl',
 'keyboard',
 'acoustic',
 'loud',
 'classic',
 'string',
 'drums',
 'electronic',
 'not classical',
 'chanting',
 'no violin',
 'not rock',
 'no guitar',
 'organ',
 'no vocal',
 'talking',
 'choral',
 'weird',
 'opera',
 'soprano',
 'fast',
 'acoustic guitar',
 'electric guitar',
 'male singer',
 'man singing',
 'classical guitar',
 'country',
 'violin',
 'electro',
 'reggae',
 'tribal',
 'dark',
 'male opera',
 'no vocals',
 'irish',
 'electronica',
 'horn',
 'operatic',
 'arabic',
 'lol',
 'low',
 'instrumental',
 'trance',
 'chant',
 'strange',
 'drone',
 'synthesizer',
 'heavy metal',
 'modern',
 'disco',
 'bells',
 'man',
 'deep',
 'fast beat',
 'industrial',
 'hard',
 'harp',
 'no flute',
 'jungle',
 'pop',
 'lute',
 'female vocal',
 'oboe',
 'mellow',
 'orchestral',
 'viola',
 'light',
 'echo',
 'piano',
 'celtic',
 'male vocals',
 'orchestra',
 'eastern',
 'old',
 'flutes',
 'punk',
 'spanish',
 'sad',
 'sax',
 'slow',
 'male',
 'blues',
 'vocal',
 'indian',
 'no singer',
 'scary',
 'india',
 'woman',
 'woman singing',
 'rock',
 'dance',
 'piano solo',
 'guitars',
 'no drums',
 'jazzy',
 'singing',
 'cello',
 'calm',
 'female vocals',
 'voices',
 'different',
 'techno',
 'clapping',
 'house',
 'monks',
 'flute',
 'not opera',
 'not english',
 'oriental',
 'beat',
 'upbeat',
 'soft',
 'noise',
 'choir',
 'female singer',
 'rap',
 'metal',
 'hip hop',
 'quick',
 'water',
 'baroque',
 'women',
 'fiddle',
 'english']

In [None]:
folders = os.listdir('audio')
folders = [x for x in folders if len(x)==1]

In [None]:
import matplotlib.pyplot as plt
# Create a 4x4 grid of histograms
fig, axs = plt.subplots(4, 4, figsize=(15, 15))

# Flatten the axis array for easy iteration
axs = axs.flatten()

# Plot histograms for each Pandas Series
for i, folder_name in enumerate(folders):
    ax = axs[i]  # Select the current axis
    data = df[df['folder']==f'{folder_name}'][mus_tags].sum(axis=0).sort_values(ascending=False)[:10]
    ax.bar(data.index,data.values)  # Plot histogram for the current data
    ax.set_title(f"Data {folder_name}")  # Set title for each subplot
    ax.set_xlabel('Values')  # Set x-axis label
    ax.set_ylabel('Frequency')  # Set y-axis label
    ax.set_xticklabels(data.index,rotation=45)

# Adjust layout and show the plot
plt.tight_layout()
plt.show()

In [None]:
mus_tags

In [None]:
train_tags = ['ambient', 'classical', 'dance', 'electric' ,'electro', 'electronic', 'house','guitar','industrial','piano','jazz', 'jungle', 'metal', 'modern', 'new age', 'techno', 'trance','drums']
df['selected_tag'] = df[mus_tags].apply(lambda row: row[row == 1].index.tolist(), axis=1)
filtered_df = df[df['selected_tag'].apply(lambda x: any(tag in train_tags for tag in x))]

In [None]:
filtered_df.shape[0]

Now that we have picked what data to train/eval our model on, it is important to figure out what it means to evaluate generative data. We will separate our data such that we can store the input matrix X as a fixed sequence and then we store the next step as Y.

Consider our problem where we try to predict $x_t$ from $x_1,...,x_{t-1}$.
We have to partition the soundbite into N-millisec partitions. For eg: $x_1$ would be from 00:00 to 00:0N.

In the paper, for TTS, the receptive field was 240 ms. For generation, they do mention that the receptive field varies, we will experiment with this and see whats up.

Im assuming causal convolution for sound waves come in 1d since the order is inherently preserved.

1 sec = 1000 ms = 25888   
500 ms = 12944  
250 ms = 6472  


In [None]:
import torchaudio

In [None]:
file_path = '/content/audio/6/curandero-curandero-05-corriendo_juntos-0-29.mp3'

In [None]:
waveform, sample_rate = torchaudio.load(f1)

In [None]:
waveform

In [None]:
waveform.shape

In [None]:
465984//18

In [None]:
import torch.nn as nn

In [None]:
t_waveform = torch.randn((1,15))

In [None]:

t_waveform

In [None]:
x = nn.Conv1d(1,1,5,dilation=2)(t_waveform)
# x = nn.Conv1d(1,1,6472,dilation=2)(x)
# x = nn.Conv1d(1,1,6472,dilation=4)(x)
# x = nn.Conv1d(1,1,6472,dilation=8)(x)

In [None]:
x

In [None]:
x

In [None]:
waveform.shape

In [None]:
465984-459513

In [6]:
import torch
import torch.nn as nn

# Example input data
batch_size = 1
channels = 3
input_length = 10

# Define a dilated convolutional layer
dilated_conv = nn.Conv1d(in_channels=channels, out_channels=channels, kernel_size=3, dilation=3)

# Create a random input tensor
input_data = torch.randn(batch_size, channels, input_length)

# Apply dilated convolution to the input
output = dilated_conv(input_data)

print("Input shape:", input_data.shape)
print("Output shape:", output.shape)


Input shape: torch.Size([1, 3, 10])
Output shape: torch.Size([1, 3, 4])


In [None]:
input_data

In [None]:
output

## Blocks to build
- Causal Conv Block
- Dilated Conv Block
- ResidualDilated Conv Block
- TBC

### Causal Conv Block

- Consider an input sample of length N. Construct input matrix $V \in \mathbb{R}^{N\times2}$ where the first column is all elements of V. The second column is all elements of V shifted by 1 (dilation)

- Construct weight matrix $W$. Constructing only 1 layer blocks, the weight matrix will be of dimension $ N \times 2$. Weights can be sampled from $N(0,1)$

- Consider a stride of 1. Update mask after stride. Apply mask. Elem-wise mult between input vec and weight vec.

- Padding.

In [None]:
# Question 1: How much padding?
# Consider filter length K and stride 1.
# Starting from element at position 1, there must be K-1 zeros before.
# When we start the convolution, the only non-zero element would be at the end of the filter, and thats the first float in our original tensor. So we start convolution from there.

# Question 2: Does passing affect anything about the matrix construction here?
# Not really

In [11]:
K = 100
N = 1024
stride = 1
signal = torch.randn((1,N))
signal = torch.cat((torch.zeros((1,K-1)),signal),dim=1)
signal = signal.view(-1).unsqueeze(1)
signal_dilation_1 = torch.cat((signal[1:],torch.zeros(1,1)))
V = torch.cat((signal,signal_dilation_1),dim=1)
W = torch.randn((V.shape[0],2))

In [103]:
start_window_idx = 0
end_window_idx = K
padded_signal_length = len(signal.view(-1))
output_v = torch.zeros(V.shape[0])
while end_window_idx <= padded_signal_length:
  V_window = V[start_window_idx:end_window_idx,:]
  W_window = W[start_window_idx:end_window_idx,:]
  VW_window = V_window * W_window
  VW = VW_window.sum(dim=1,keepdim=True)
  output_v[start_window_idx] = VW.sum()

  start_window_idx += stride
  end_window_idx += stride

In [60]:
import torch
import torch.nn as nn

class CausalConv1d:
  def __init__(self,in_channels,out_channels,kernel_size,dilation,*args,**kwargs):
    self.in_channels = in_channels
    self.out_channels = out_channels
    self.kernel_size = kernel_size
    self.dilation = dilation
    self.pad = (self.kernel_size - 1) * self.dilation
    self.conv1d = nn.Conv1d(self.in_channels,self.out_channels,kernel_size=self.kernel_size,padding=self.pad,dilation=self.dilation,bias=False)

  def __call__(self,x):
    # x should be in shape (batch_size,num_samples,timestamp)
    return self.conv1d(x)[:,:,:-self.pad]

In [61]:
class CausalConvBlock:
  def __init__(self,layers,in_channels,out_channels,kernel_size):
    self.layers = layers
    self.in_channels = in_channels
    self.out_channels = out_channels
    self.kernel_size = kernel_size
    self.module_list = nn.ModuleList()

    for i in range(layers):
      self.module_list.append(CausalConv1d(in_channels,out_channels,kernel_size,dilation=1))

  def forward(self,x):
    return self.module_list(x)

In [69]:
class DilatedConvBlock:
  def __init__(self,in_channels,out_channels,kernel_size,dilation):
    self.in_channels = in_channels
    self.out_channels = out_channels
    self.kernel_size = kernel_size
    self.dilatedconv1d=CausalConv1d(in_channels,out_channels,kernel_size,dilation=dilation)

  def forward(self,x):
    return self.dilatedconv1d(x)

In [None]:
class ConvGate:
  def apply(x):
    tanh_x = torch.tanh(x)
    sigmoid_x = torch.sigmoid(x)
    return tanh_x * sigmoid_x

In [None]:
class DilatedResidualLayer:
  def __init__(self,in_channels,skip_channels,kernel_size,dilation):
    self.dilated_conv_block = DilatedConvBlock(in_channels,in_channels,kernel_size,dilation)
    self.pointwise_conv_residual = nn.Conv1d(in_channels,in_channels,kernel_size=1,bias=True)
    self.pointwise_conv_skip = nn.Conv1d(in_channels,skip_channels,kernel_size=1,bias=True)

  def forward(self,x):
    dilated_convolved_input = self.dilated_conv_block(x)
    gated_input = ConvGate.apply(x)
    pre_skip = self.pointwise_conv_residual(gated_input)
    residual_input = pre_skip + x
    skip_result = self.pointwise_conv_skip(gated_input)
    return x, skip_result

In [None]:
class DilatedResidualBlock:
  def __init__(self,in_channels,skip_channels,kernel_size):
    self.in_channels = in_channels
    self.skip_channels = skip_channels
    self.kernel_size = kernel_size
    self.layers = [2**i for i in range(10)]

  def forward(self,x):
    skip_results = []
    for dilation in self.layers:
      dilated_residual_layer = DilatedResidualLayer(self.in_channels,self.skip_channels,self.kernel_size,dilation)
      x, skip_result = dilated_residual_layer.forward(x)
      skip_results.append(skip_result)
    return torch.vstack(skip_results)

In [3]:
class DenseBlock:
  def __init__(self,in_channels,skip_channels,kernel_size):
    self.in_channels = in_channels
    self.skip_channels = skip_channels

  def quantized_softmax(self,x,mu):
      prob = torch.log(1 + (mu*torch.abs(x)))/torch.log(1+mu)
      return torch.sign(x) * prob

  def forward(self,x):
    x = nn.ReLU(x,dim=2)
    x = nn.Conv1d(self.in_channels,self.skip_channels,kernel_size=1,bias=True)(x)
    x = nn.ReLU(x,dim=2)
    x = nn.Conv1d(self.in_channels,self.skip_channels,kernel_size=1,bias=True)(x)
    output = self.quantized_softmax(x,mu=255)
    return output

In [None]:
'''
                                              Residual Block
                        -------------------------------------------------------------------
                        |                         ------->tanh--------                    |
                        |                         |                   |                   |
x --> CausalConvBlock --|----> DilatedConvBlock---|                   |--->1x1conv------->Add
                                                  |                   |       |
                                                  ------->sigmoid-----        |
                                                                             Add
                                                                             |||
                                                                             |||
                                                                        torch.stack() -> Dense
'''

We can now pass the input thru 1 residual block with fixed dilation. The authors of the paper mention that dilation should be applied as follows `[1,2,4,8,...,512,1,2,...512,...]`.
Thus one resnet block would have 9 layers. We would most probably have more than 1 block.

[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]

In [1]:
2 ** 10

1024

In [72]:
torch.tanh

<function torch._VariableFunctionsClass.tanh>

In [None]:
class ResidualDilatedConvBlock:

In [70]:
DilatedConvBlock(in_channels=2)

TypeError: ignored