## Process Labelled Data

This code is used to extract file names of each emotion (happy,sad,suspenseful)

The music files are stored in seperate directories for each emotion and the code below reads those files.

In [None]:
import os
dir = '*/labelled data/happy'
arr = os.listdir(dir)
print(len(arr))
print(arr)

happy
---------

[2161, 2572, 2374, 1873, 2201, 2629, 2417, 1876, 2416, 2576, 2562, 2248, 1812, 1813, 2507, 1756, 2075, 1790, 1829, 1752, 1817, 2502, 2516, 1751, 2080, 2334, 1749, 1763, 2083, 2487, 1819, 2509, 2247, 2119, 1835, 1759, 2480, 2292, 2537, 1728, 1933, 2195, 2618, 2156, 1932, 2234, 1739, 2620, 2379, 2147, 1923, 2596, 2186, 2178, 2568]


sad
-----------
[2161, 2572, 2374, 1873, 2201, 2629, 2417, 1876, 2416, 2576, 2562, 2248, 1812, 1813, 2507, 1756, 2075, 1790, 1829, 1752, 1817, 2502, 2516, 1751, 2080, 2334, 1749, 1763, 2083, 2487, 1819, 2509, 2247, 2119, 1835, 1759, 2480, 2292, 2537, 1728, 1933, 2195, 2618, 2156, 1932, 2234, 1739, 2620, 2379, 2147, 1923, 2596, 2186, 2178, 2568]


suspense
-------------

[2203, 2202, 2214, 1735, 2228, 1734, 2573, 2239, 1730, 2166, 2628, 2238, 2159, 2207, 2506, 2473, 2114, 1757, 2104, 2307, 1791, 2477, 2112, 1760, 2678, 1775, 1777, 2478, 1766, 2131, 2285, 2244, 2127, 2222, 2140, 2632, 1919, 2237, 2619, 2180, 2382, 2432, 2591, 2208, 2218, 2227, 2582, 2151, 2622, 2232]

In [None]:
#!pip install requirements.txt 

In [1]:
#set absolute file paths variables
datapath = '*/SentimentNet/data/'
trainpath = '*/SentimentNet/' 
experimentspath = '*/SentimentNet/experiments'
testfile = '/test_X.npy'

In [None]:
import numpy as np #For matrix processing of pianorolls and spectrograms
import librosa.output
import librosa  #To read audio files
from intervaltree import Interval,IntervalTree # The labels are stored in intervaltree data structure for texturenet 
from scipy import fft 
import pickle
import h5py   #Data format type
import sys

class hyperparams(object):
    def __init__(self):
        self.sr = 44100 # Sampling rate.
        self.n_fft = 2048 # fft points (samples)
        self.stride = 256 # 256 samples hop between windows    
        self.wps = 44100 // 256 # ~86 windows/second
        
        self.sentiment = { 
                            'happy': [2161, 2572, 2374, 1873, 2201, 2629, 2417, 1876, 2416, 2576, 2562, 2248, 1812, 1813, 2507, 1756, 2075, 1790, 1829, 1752, 1817, 2502, 2516, 1751, 2080, 2334, 1749, 1763, 2083, 2487, 1819, 2509, 2247, 2119, 1835, 1759, 2480, 2292, 2537, 1728, 1933, 2195, 2618, 2156, 1932, 2234, 1739, 2620, 2379, 2147, 1923, 2596, 2186, 2178, 2568],
                    
                            'sad': [2161, 2572, 2374, 1873, 2201, 2629, 2417, 1876, 2416, 2576, 2562, 2248, 1812, 1813, 2507, 1756, 2075, 1790, 1829, 1752, 1817, 2502, 2516, 1751, 2080, 2334, 1749, 1763, 2083, 2487, 1819, 2509, 2247, 2119, 1835, 1759, 2480, 2292, 2537, 1728, 1933, 2195, 2618, 2156, 1932, 2234, 1739, 2620, 2379, 2147, 1923, 2596, 2186, 2178, 2568],
            
                            'suspenseful': [2203, 2202, 2214, 1735, 2228, 1734, 2573, 2239, 1730, 2166, 2628, 2238, 2159, 2207, 2506, 2473, 2114, 1757, 2104, 2307, 1791, 2477, 2112, 1760, 2678, 1775, 1777, 2478, 1766, 2131, 2285, 2244, 2127, 2222, 2140, 2632, 1919, 2237, 2619, 2180, 2382, 2432, 2591, 2208, 2218, 2227, 2582, 2151, 2622, 2232]
                            }
    
    #setting windows per sec for each emotion 
        self.hop_sentiment = {'happy': self.wps, 'sad': int(self.wps * 0.5), 'suspenseful': int(self.wps*0.25)} 
                      
        
hp = hyperparams()


def get_data(): 
    '''
    Extract the desired solo data from the dataset.
    Default: Process emotions of labelled songs
    
    '''
    #Read musicNet data files into dataset 
    dataset = np.load(open(datapath+'musicnet.npz','rb'), allow_pickle=True, encoding = 'latin1')
    
    #Preprocess musicNet data files and write values into .hdf5 file below, 
    train_data = h5py.File(trainpath+'train_sentimentMusic.hdf5', 'w') 

    #Process each sentiment and store score and audio features
    for sent in hp.sentiment:
        print ('------ Processing ' + sent + ' ------')
        score = []
        audio = []
        for song in hp.sentiment[sent]: 
            a,b = dataset[str(song)] 
            score.append(a) # zero array
            audio.append(b) #csv attributes

        spec_list, score_list, sentiment_list = process_data(score,audio,sent) 
        
        #For each sentiment, create sepcification list, pianoroll and sentiment label and store in intervaltree
        train_data.create_dataset(sent + "_spec", data=spec_list)
        train_data.create_dataset(sent + "_pianoroll", data=score_list)
        train_data.create_dataset(sent + "_onoff", data=sentiment_list)  

def process_data(X, Y, sent):
    '''
    Data Pre-processing
        
    Score: 
        Generate pianoroll from interval tree data structure
    
    Audio: 
        Convert waveform into power spectrogram
    '''
    #Function to read audio spectrogram representation using librosa fourier transform 
    def process_spectrum(X, step, hop):
        audio = X[i][(step * hop * hp.stride): (step * hop * hp.stride) + ((hp.wps*5 - 1)* hp.stride)] 
        spec = librosa.stft(audio, n_fft= hp.n_fft, hop_length = hp.stride)
        #taking log of spectrogram
        magnitude = np.log1p(np.abs(spec)**2)
        return magnitude

    def process_score(Y, step, hop):
        score = np.zeros((hp.wps*5, 128))  
        onset = np.zeros(score.shape)    
        offset = np.zeros(score.shape) 

        for window in range(score.shape[0]):
            
            #For score, set all notes to 1 if they are played at this window timestep 
            labels = Y[i][(step * hop + window) * hp.stride] 
            for label in labels: 
                score[window,label.data[1]] = 1 
        
            #For onset/offset, set onset to 1 and offset to -1 
            if window != 0:
                onset[window][np.setdiff1d(score[window].nonzero(), score[window-1].nonzero())] = 1
                offset[window][np.setdiff1d(score[window-1].nonzero(), score[window].nonzero())] = -1                    
            else:
                onset[window][score[window].nonzero()] = 1
        
        onset += offset 
        return score, onset
    
    spec_list=[]
    score_list=[]
    sentiment_list=[]
    num_songs = len(X)
    hop = hp.hop_sentiment[sent]
    for i in range(num_songs):
        song_length = len(X[i])
        num_spec = (song_length) // (hop * hp.stride) 
        print ('{} song {} has {} windows'.format(sent, i, num_spec))

        for step in range(num_spec - 30):
            if step % 50 == 0:
                print ('{} steps of {} song {} has been done'.format(step,sent,i))        
            spec_list.append(process_spectrum(X,step,hop))
            score, onoff = process_score(Y,step,hop)
            score_list.append(score)
            sentiment_list.append(onoff)

    return np.array(spec_list), np.array(score_list), np.array(sentiment_list) #return from process_data


def main():  
    get_data()
   

if __name__ == "__main__":
    main()

------ Processing happy ------
happy song 0 has 397 windows
0 steps of happy song 0 has been done
50 steps of happy song 0 has been done
100 steps of happy song 0 has been done
150 steps of happy song 0 has been done
200 steps of happy song 0 has been done
250 steps of happy song 0 has been done
300 steps of happy song 0 has been done
350 steps of happy song 0 has been done
happy song 1 has 201 windows
0 steps of happy song 1 has been done
50 steps of happy song 1 has been done
100 steps of happy song 1 has been done
150 steps of happy song 1 has been done
happy song 2 has 216 windows
0 steps of happy song 2 has been done
50 steps of happy song 2 has been done
100 steps of happy song 2 has been done
150 steps of happy song 2 has been done
happy song 3 has 367 windows
0 steps of happy song 3 has been done
50 steps of happy song 3 has been done
100 steps of happy song 3 has been done
150 steps of happy song 3 has been done
200 steps of happy song 3 has been done
250 steps of happy song 3

550 steps of suspenseful song 1 has been done
600 steps of suspenseful song 1 has been done
650 steps of suspenseful song 1 has been done
suspenseful song 2 has 430 windows
0 steps of suspenseful song 2 has been done
50 steps of suspenseful song 2 has been done
100 steps of suspenseful song 2 has been done
150 steps of suspenseful song 2 has been done
200 steps of suspenseful song 2 has been done
250 steps of suspenseful song 2 has been done
300 steps of suspenseful song 2 has been done
350 steps of suspenseful song 2 has been done
suspenseful song 3 has 2864 windows
0 steps of suspenseful song 3 has been done
50 steps of suspenseful song 3 has been done
100 steps of suspenseful song 3 has been done
150 steps of suspenseful song 3 has been done
200 steps of suspenseful song 3 has been done
250 steps of suspenseful song 3 has been done
300 steps of suspenseful song 3 has been done
350 steps of suspenseful song 3 has been done
400 steps of suspenseful song 3 has been done
450 steps of su

250 steps of suspenseful song 9 has been done
300 steps of suspenseful song 9 has been done
350 steps of suspenseful song 9 has been done
400 steps of suspenseful song 9 has been done
450 steps of suspenseful song 9 has been done
500 steps of suspenseful song 9 has been done
550 steps of suspenseful song 9 has been done
600 steps of suspenseful song 9 has been done
650 steps of suspenseful song 9 has been done
700 steps of suspenseful song 9 has been done
750 steps of suspenseful song 9 has been done
800 steps of suspenseful song 9 has been done
850 steps of suspenseful song 9 has been done
900 steps of suspenseful song 9 has been done
950 steps of suspenseful song 9 has been done
1000 steps of suspenseful song 9 has been done
1050 steps of suspenseful song 9 has been done
1100 steps of suspenseful song 9 has been done
1150 steps of suspenseful song 9 has been done
1200 steps of suspenseful song 9 has been done
1250 steps of suspenseful song 9 has been done
1300 steps of suspenseful so

## Train Model

In [2]:
import torch
torch.cuda.is_available()

True

In [4]:
import torch
import torch.nn as nn
from torch.nn import init
import torch.nn.functional as F
import numpy as np
import torch.optim as optim
from sklearn.model_selection import train_test_split
import torch.utils.data as utils
import h5py 
import sys
import os
import os.path
from os import path
import json
from model import SentimentNet
#cuda = torch.device("cuda")
cuda = torch.device("cpu")

class hyperparams(object):
    def __init__(self):
        #self.sentiment = 'happy'
        self.train_epoch = 300 #change as per required
        self.test_freq = 10 #test after every 10 iterations of training the model
        self.exp_name = 'sentiment_exp_300'
        
        self.iter_train_loss = []
        self.iter_test_loss = []
        self.loss_history = [] #save loss values
        self.test_loss_history = []
        self.best_loss = 1e10 
        self.best_epoch = 0

def Process_Data(sent, exp_dir):
    #process data as per sent(emotion)
    dataset = h5py.File(trainpath+'train_sentimentMusic.hdf5','r')  #hyperparameter tuning
    score = dataset['{}_pianoroll'.format(sent)][:] #converting music to it's symbolic representation
    spec = dataset['{}_spec'.format(sent)][:]
    onoff = dataset['{}_onoff'.format(sent)][:]
    score = np.concatenate((score, onoff),axis = -1)
    score = np.transpose(score,(0,2,1))

    X_train, X_test, Y_train, Y_test = train_test_split(score, spec, test_size=0.2) #20% test set
    test_data_dir = os.path.join(exp_dir,'test_data') 
    test_data_dir += '_'+sent
    
    if (not(path.exists(test_data_dir))):
        os.makedirs(test_data_dir)
        np.save(os.path.join(test_data_dir, "test_X.npy"), X_test)
        np.save(os.path.join(test_data_dir, "test_Y.npy"), Y_test)
        
    train_dataset = utils.TensorDataset(torch.Tensor(X_train, device=cuda), torch.Tensor(Y_train, device=cuda))
    #change batch_size and verify loss
    train_loader = utils.DataLoader(train_dataset, batch_size=5, shuffle=True)
    test_dataset = utils.TensorDataset(torch.Tensor(X_test, device=cuda), torch.Tensor(Y_test,device=cuda))
    test_loader = utils.DataLoader(test_dataset, batch_size=5, shuffle=True) 
    
    return train_loader, test_loader

def train(model, epoch, train_loader, optimizer,iter_train_loss):
    model.train()
    train_loss = 0
    for batch_idx, (data, target) in enumerate(train_loader):        
        optimizer.zero_grad()
        split = torch.split(data, 128, dim=1)
        y_pred = model(split[0].cuda(),split[1].cuda())
        loss_function = nn.MSELoss()
        loss = loss_function(y_pred, target.cuda())
        loss.backward()
        iter_train_loss.append(loss.item())
        train_loss += loss
        optimizer.step()    
         
        #if batch_idx % 2 == 0:
         #   print ('Train Epoch: {} [{}/{} ({:.0f}%)]\t Loss: {:.6f}'.format(epoch, batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx/len(train_loader), loss.item()/len(data)))

    print('====> Epoch: {} Average loss: {:.4f}'.format(epoch, train_loss/ len(train_loader.dataset)))
    return train_loss/ len(train_loader.dataset)

def test(model, epoch, test_loader, scheduler, iter_test_loss):
    with torch.no_grad():
        model.eval()
        test_loss = 0
        for idx, (data, target) in enumerate(test_loader):
            split = torch.split(data,128,dim = 1)
            y_pred = model(split[0].cuda(),split[1].cuda())
            loss_function = nn.MSELoss() 
            loss = loss_function(y_pred,target.cuda())    
            iter_test_loss.append(loss.item())
            test_loss += loss    
        test_loss/= len(test_loader.dataset)
        scheduler.step(test_loss)
        print ('====> Test set loss: {:.4f}'.format(test_loss))
        return test_loss


def main():    
    hp = hyperparams()

    try:
        exp_root = os.path.join(os.path.abspath(trainpath),'experiments')
        os.makedirs(exp_root)
    except FileExistsError:
        pass
    
    exp_dir = os.path.join(exp_root, hp.exp_name)
    #check if already exists
    if (not(path.exists(exp_dir))):
        os.makedirs(exp_dir)

    model = SentimentNet()
    model.cuda()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    model.zero_grad()
    optimizer.zero_grad()
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')
    #train_loader, test_loader = Process_Data(hp.sentiment, exp_dir)
    
    s = input("Enter type of emotion to generate (happy, sad, suspenseful) : ")
    
    if(s not in ['happy','sad','suspenseful']):
        print('Enter valid emotion')
    else:
        train_loader, test_loader = Process_Data(s, exp_dir)
    
        for epoch in range(hp.train_epoch):
            loss = train(model, epoch, train_loader, optimizer,hp.iter_train_loss)
            hp.loss_history.append(loss.item())
            if epoch % hp.test_freq == 0:
                test_loss = test(model, epoch, test_loader, scheduler, hp.iter_test_loss)
                hp.test_loss_history.append(test_loss.item())
                if test_loss < hp.best_loss:         
                    torch.save({'epoch': epoch + 1, 'state_dict': model.state_dict(), 'optimizer' : optimizer.state_dict()}, os.path.join(exp_dir, 'checkpoint-{}.tar'.format(str(epoch + 1 ))))
                    hp.best_loss = test_loss.item()    
                    hp.best_epoch = epoch + 1
                    
                    hpath = exp_dir + '/test_data_'+s
                    with open(os.path.join(hpath,'hyperparams.json'), 'w') as outfile:
                        json.dump(hp.__dict__, outfile)
       
if __name__ == "__main__":
    main()

Enter type of emotion to generate (happy, sad, suspenseful) : happy
====> Epoch: 0 Average loss: 0.1273
====> Test set loss: 0.0795
====> Epoch: 1 Average loss: 0.0669
====> Epoch: 2 Average loss: 0.0480
====> Epoch: 3 Average loss: 0.0397
====> Epoch: 4 Average loss: 0.0355
====> Epoch: 5 Average loss: 0.0331
====> Epoch: 6 Average loss: 0.0346
====> Epoch: 7 Average loss: 0.0343
====> Epoch: 8 Average loss: 0.0281
====> Epoch: 9 Average loss: 0.0269
====> Epoch: 10 Average loss: 0.0258
====> Test set loss: 0.0272
====> Epoch: 11 Average loss: 0.0255
====> Epoch: 12 Average loss: 0.0254
====> Epoch: 13 Average loss: 0.0244
====> Epoch: 14 Average loss: 0.0244
====> Epoch: 15 Average loss: 0.0273
====> Epoch: 16 Average loss: 0.0246
====> Epoch: 17 Average loss: 0.0235
====> Epoch: 18 Average loss: 0.0234
====> Epoch: 19 Average loss: 0.0236
====> Epoch: 20 Average loss: 0.0232
====> Test set loss: 0.0248
====> Epoch: 21 Average loss: 0.0230
====> Epoch: 22 Average loss: 0.0208
====> E

## Model

In [8]:
import torch
import torch.nn as nn
from torch.nn import init
import torch.nn.functional as F
import numpy as np
import torch.optim as optim
from sklearn.model_selection import train_test_split
import torch.utils.data as utils
import sys
import pickle as pkl

#cuda = torch.device("cuda") #if available
cuda = torch.device("cpu")

def conv1x3(in_channels, out_channels, stride=1, padding=1, bias=True,groups=1):
    return nn.Conv1d(
        in_channels,
        out_channels,
        kernel_size=3,
        stride=stride,
        padding=padding,
        bias=bias,
        groups=groups)

def upconv1x2(in_channels, out_channels, kernel):
    return nn.ConvTranspose1d(
        in_channels,
        out_channels,
        kernel_size=kernel,
        stride=2,
        padding=1
        )

class DownConv(nn.Module):
    def __init__(self, in_channels, out_channels, block_id, pooling = True):
        super(DownConv,self).__init__()
        self.in_channels = in_channels 
        self.out_channels = out_channels 
        self.pooling = pooling            
        self.activation = nn.LeakyReLU(0.01)
        self.conv1 = conv1x3(self.in_channels, self.out_channels) 
        self.conv1_BN = nn.InstanceNorm1d(self.out_channels)
        self.conv2 = conv1x3(self.out_channels, self.out_channels) 
        self.conv2_BN = nn.InstanceNorm1d(self.out_channels)  
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
    def forward(self,x):
        x = self.activation(self.conv1_BN(self.conv1(x)))
        x = self.activation(self.conv1_BN(self.conv2(x)))
        before_pool = x
        if self.pooling:
            x = self.pool(x)
        return x, before_pool

class UpConv(nn.Module):
    def __init__(self, in_channels, out_channels, skip_channels, cond_channels, block_id, activation = nn.LeakyReLU(0.01), upconv_kernel=2):
        super(UpConv, self).__init__()
        self.skip_channels = skip_channels  
        self.in_channels = in_channels 
        self.out_channels = out_channels
        self.cond_channels = cond_channels
        self.activation = activation
        self.upconv = upconv1x2(self.in_channels, self.out_channels,kernel=upconv_kernel)
        self.upconv_BN = nn.InstanceNorm1d(self.out_channels) 
        self.conv1 = conv1x3( self.skip_channels + self.out_channels, self.out_channels)   
        self.conv1_BN = nn.InstanceNorm1d(self.out_channels)
        self.conv2 = conv1x3(self.out_channels + self.cond_channels, self.out_channels) 
        self.conv2_BN = nn.InstanceNorm1d(self.out_channels)

    def crop_and_concat(self, upsampled, bypass):
        c = (bypass.size()[2] - upsampled.size()[2]) // 2
        bypass = F.pad(bypass, (-c, -c))
        if bypass.shape[2] > upsampled.shape[2]:
            bypass =  F.pad(bypass, (0, -(bypass.shape[2] - upsampled.shape[2])))  
        else:
            bypass =  F.pad(bypass, ((0, bypass.shape[2] - upsampled.shape[2]) ))
        return torch.cat((upsampled, bypass), 1)
 
    def forward(self, res, dec, cond):
        x = self.activation(self.upconv_BN(self.upconv(dec)))
        x = self.crop_and_concat(x, res)
        x = self.activation(self.conv1_BN(self.conv1(x)))

        if self.cond_channels:
            x = self.crop_and_concat(x, cond)

        x = self.conv2(x)
        x = self.activation(self.conv2_BN(x))
        return x   

class Onset_Offset_Encoder(nn.Module):
    def __init__(self, depth = 3, start_channels = 128):
        super(Onset_Offset_Encoder, self).__init__()
        self.start_channels = start_channels
        self.depth = depth
        self.down_convs = [] 
        self.construct_layers()    
        self.down_convs = nn.ModuleList(self.down_convs)
        self.reset_params()
    def construct_layers(self):
        for i in range(self.depth):
            ins = self.start_channels if i == 0 else outs
            outs = self.start_channels * (2 ** (i+1))
            pooling = True if i < self.depth else False
            DC = DownConv(ins, outs, pooling=pooling, block_id = i + 9)
            self.down_convs.append(DC)
    @staticmethod
    def weight_init(m):
        if isinstance(m, nn.Conv1d):
            init.xavier_normal_(m.weight)
            init.constant_(m.bias, 0)
    def reset_params(self):
        for i, m in enumerate(self.modules()):
            self.weight_init(m)
    def forward(self, x):
        condition_tensors = []
        for i, module in enumerate(self.down_convs):
            x,_ = module(x)
            if (i > self.depth - 3):
                condition_tensors.append(x)
        return condition_tensors

class MBRBlock(nn.Module):
    def __init__(self, in_channels, num_of_band):
        super(MBRBlock, self).__init__()
        self.in_dim = in_channels
        self.num_of_band = num_of_band
        self.conv_list1 = []
        self.bn_list1 = []
        self.conv_list2 = []
        self.bn_list2 = []
        self.activation = nn.LeakyReLU(0.01)
        self.band_dim = self.in_dim // self.num_of_band
        for i in range(self.num_of_band):
            self.conv_list1.append(nn.Conv1d(in_channels = self.band_dim, out_channels = self.band_dim, kernel_size = 3, padding = 1))
        for i in range(self.num_of_band):
            self.conv_list2.append(nn.Conv1d(in_channels = self.band_dim, out_channels = self.band_dim, kernel_size = 3, padding = 1))
        for i in range(self.num_of_band):
            self.bn_list1.append(nn.InstanceNorm1d(self.band_dim))
        for i in range(self.num_of_band):  
            self.bn_list2.append(nn.InstanceNorm1d(self.band_dim))  
        self.conv_list1 = nn.ModuleList(self.conv_list1)
        self.conv_list2 = nn.ModuleList(self.conv_list2)        
        self.bn_list1 = nn.ModuleList(self.bn_list1)
        self.bn_list2 = nn.ModuleList(self.bn_list2)

    def forward(self,x):
        bands = torch.chunk(x, self.num_of_band, dim = 1)
        for i in range(len(bands)):
            t = self.activation(self.bn_list1[i](self.conv_list1[i](bands[i])))
            t = self.bn_list2[i](self.conv_list2[i](t))
            torch.add(bands[i],1,t)
        x = torch.add(x,1,torch.cat(bands, dim = 1))
        return x 
     
class SentimentNet(nn.Module):
    def __init__(self, depth = 5,start_channels = 128):
        super(SentimentNet, self).__init__()
        self.depth = depth
        self.start_channels = start_channels  
        self.construct_layers()
        self.reset_params()               
        
    #@staticmethod  
    def construct_layers(self):
        self.down_convs = []
        self.up_convs = []
        for i in range(self.depth):
            ins = self.start_channels if i == 0 else outs
            outs = self.start_channels * (2 ** (i+1))
            pooling = True if i < self.depth-1 else False
            DC = DownConv(ins, outs, pooling=pooling, block_id=i)
            self.down_convs.append(DC)  
        self.up_convs.append(UpConv(4096,2048,2048, 1024, block_id = 5, upconv_kernel=6))
        self.up_convs.append(UpConv(2048,1024,1024, 512, block_id = 6, upconv_kernel=4))
        self.up_convs.append(UpConv(1024,1024,512,0,block_id= 7, upconv_kernel=3))
        self.up_convs.append(UpConv(1024,1024,256,0, block_id = 8))
        self.down_convs = nn.ModuleList(self.down_convs)
        self.up_convs = nn.ModuleList(self.up_convs)
        self.MBRBlock1 = MBRBlock(1024,2) 
        self.MBRBlock2 = MBRBlock(1024,4)
        self.MBRBlock3 = MBRBlock(1024,8)
        self.MBRBlock4 = MBRBlock(1024,16)
        self.lastconv = nn.ConvTranspose1d(1024,1025,kernel_size=3, stride=1, padding=1)
        self.lrelu = nn.LeakyReLU(0.01)
        self.onset_offset_encoder = Onset_Offset_Encoder()
        
    @staticmethod  
    def weight_init(m):
        if isinstance(m, nn.Conv1d):
            init.xavier_normal_(m.weight)
            init.constant_(m.bias, 0)
        if isinstance(m, nn.ConvTranspose1d):
            init.xavier_normal_(m.weight)
            init.constant_(m.bias, 0)


    def reset_params(self):
        for i, m in enumerate(self.modules()):
            self.weight_init(m)
    
    def forward(self, x, cond):
        encoder_layer_outputs = []
        for i, module in enumerate(self.down_convs):
            x, before_pool = module(x)
            encoder_layer_outputs.append(before_pool)

        Onoff_Conditions = self.onset_offset_encoder(cond)   

        for i, module in enumerate(self.up_convs):
            before_pool = encoder_layer_outputs[-(i+2)]
            if i < self.onset_offset_encoder.depth - 1:
                x = module(before_pool, x, Onoff_Conditions[i-1])            
            else:
                x = module(before_pool, x, None)
                
        x = self.MBRBlock1(x)
        x = self.MBRBlock2(x)
        x = self.MBRBlock3(x)
        x = self.MBRBlock4(x)
        x = self.lrelu(self.lastconv(x)) 
        return x

## Model inference

In [10]:
import torch
import pretty_midi
import numpy as np
import h5py
import pickle
import torch.nn as nn
import torch.utils.data as utils
import json
import os
from model import SentimentNet
import librosa
from tqdm import tqdm
import sys

class AudioSynthesizer():
    def __init__(self, checkpoint, exp_dir, data_source, inputsentiment):
        self.sentiment = inputsentiment
        self.exp_dir = exp_dir
        self.checkpoint = torch.load(os.path.join(exp_dir,checkpoint))
        self.sample_rate = 44100
        self.wps = 44100//256
        self.data_source = data_source
                
    def get_test_midi(self):
        exppath = os.path.join(self.exp_dir,'test_data')
        exppath += '_'+ self.sentiment
        
        X = np.load(exppath + testfile)
        
        rand = np.random.randint(len(X),size=)1 ## output file
        score = [X[i] for i in rand]
        return torch.Tensor(score).cuda()

    def process_custom_midi(self, midi_filename):
        midi_dir = os.path.join('*/SentimentNet/data','midi')
        midi = pretty_midi.PrettyMIDI(os.path.join(midi_dir,'goldsaucer_happy'))
        pianoroll = midi.get_piano_roll(fs=self.wps).T
        pianoroll[pianoroll.nonzero()] = 1
        onoff = np.zeros(pianoroll.shape) 
        
        for i in range(pianoroll.shape[0]):
            if i == 0:
                onoff[i][pianoroll[i].nonzero()] = 1
            else:
                onoff[i][np.setdiff1d(pianoroll[i-1].nonzero(), pianoroll[i].nonzero())] = -1
                onoff[i][np.setdiff1d(pianoroll[i].nonzero(), pianoroll[i-1].nonzero())] = 1 
        
        return pianoroll, onoff


    def inference(self):
        model = SentimentNet()
        #model.cpu() #CUDA cout of memory control
        model.cuda()
        model.load_state_dict(self.checkpoint['state_dict'])

        if self.data_source == 'TEST_DATA':
            score = self.get_test_midi()
            score, onoff = torch.split(score, 128, dim=1)
        else:
            score, onoff = self.process_custom_midi(self.data_source)
                   
        print ('Generating output wav file......')

        with torch.no_grad():
            model.eval()    
            test_results = model(score, onoff)
            test_results = test_results.cpu().numpy()
 
        output_dir = self.create_output_dir()

        for i in range(len(test_results)):
            audio = self.griffinlim(test_results[i], audio_id = i+1)
            librosa.output.write_wav(os.path.join(output_dir,'output-{}.wav'.format(i+1)), audio, self.sample_rate)
    
    def create_output_dir(self):
        success = False
        dir_id = 1
        while not success:
            try:
                audio_out_dir = os.path.join(self.exp_dir,'audio_output_{}_{}'.format(self.sentiment,dir_id))
                #audio_out_dir = os.path.join(self.exp_dir,'audio_output_{}'.format(dir_id))
                os.makedirs(audio_out_dir)
                success = True
            except FileExistsError:
                dir_id += 1
        return audio_out_dir

    def griffinlim(self, spectrogram, audio_id, n_iter = 300, window = 'hann', n_fft = 2048, hop_length = 256, verbose = False):
        
        print ('Synthesizing audio {}'.format(audio_id))

        if hop_length == -1:
            hop_length = n_fft // 4
            spectrogram[0:5] = 0 #audio representaion of music

        spectrogram[150:] = 0
        angles = np.exp(2j * np.pi * np.random.rand(*spectrogram.shape))

        t = tqdm(range(n_iter), ncols=100, mininterval=2.0, disable=not verbose)
        for i in t:
            full = np.abs(spectrogram).astype(np.complex) * angles
            inverse = librosa.istft(full, hop_length = hop_length, window = window)
            rebuilt = librosa.stft(inverse, n_fft = n_fft, hop_length = hop_length, window = window)
            angles = np.exp(1j * np.angle(rebuilt))

            if verbose:
                diff = np.abs(spectrogram) - np.abs(rebuilt)
                t.set_postfix(loss=np.linalg.norm(diff, 'fro'))

        full = np.abs(spectrogram).astype(np.complex) * angles
        inverse = librosa.istft(full, hop_length = hop_length, window = window)

        return inverse


def main():
    exp_dir = '*/SentimentNet/experiments/sentiment_exp_300'
    data_source = 'TEST_DATA' # test with testing data or customized data 

    s = input("Enter type of emotion to generate (happy, sad, suspense) : ")
    if(s not in ['happy','sad','suspenseful']):
        print('Enter valid emotion')
    else:
        #hp = hyperparams()
        #hp.sentiment = s
        hpath = exp_dir + '/test_data_'+s
        with open(os.path.join(hpath,'hyperparams.json'), 'r') as hpfile:
            hp = json.load(hpfile)
            
        checkpoints = 'checkpoint-{}.tar'.format(hp['best_epoch'])
        AudioSynth = AudioSynthesizer(checkpoints, exp_dir, data_source,s) 
        AudioSynth.inference()


if __name__ == "__main__":
    main()

Enter type of emotion to generate (happy, sad, suspense) : happy
Generating output wav file......
Synthesizing audio 1
Synthesizing audio 2
Synthesizing audio 3
