In [1]:
import keras.layers
from keras import layers
from keras.losses import categorical_crossentropy,binary_crossentropy
from keras.optimizers import RMSprop, rmsprop
import numpy as np
import pandas as pd
import PIL
from PIL import ImageOps
import math
import matplotlib.pyplot as plt
import cv2
import io
import keras_preprocessing.image as kp
from keras.utils import plot_model

Using TensorFlow backend.


In [2]:
path_labels_training = r"D:\PolyPhonic\data\training\labels2.csv"
path_directory_training = r"D:\PolyPhonic\data\training\images"
path_directory_test = r"D:\PolyPhonic\data\test\images"
path_directory_valid = r"D:\PolyPhonic\data\validation\images"
path_labels_valid = r"D:\PolyPhonic\data\validation\labels.csv"
path_labels_test = r"D:\PolyPhonic\data\test\labels.csv"


# Make generator yield batches of (img_timestep,label)

In [3]:
class CustomGenerator(keras.utils.Sequence):
    
    def __init__(self, data , batch_size):
        df = pd.read_csv(data,encoding='cp1252').to_numpy()
        self.x = [row[0] for row in df]
        self.y = [row[1:]for row in df]
        self.indices = len(self.x)
        self.batch_size = batch_size

        
    def __len__(self): ## returns number of batches in a Sequence
        return math.ceil(len(self.x) / (self.batch_size))
   
    def __getitem__(self, idx): ## yield a batch of tuple(sample,label)
        ## param idx = index of batch
        
            
        batch_x = self.x[idx * self.batch_size : (idx + 1)  *  self.batch_size ]
        batch_y = self.y[idx * self.batch_size : (idx + 1)  *  self.batch_size ]
           
        return np.array([kp.img_to_array(kp.load_img(img_path,
                                         color_mode='grayscale',target_size=(100,100),
                                         interpolation='nearest')).dot(1./255) for img_path in batch_x]), np.array(batch_y)


In [6]:
class CustomGeneratorTemporal(keras.utils.Sequence):
    
    def __init__(self, data , batch_size, sequences):
        df = pd.read_csv(data,encoding='cp1252').to_numpy()
        self.x = [row[0] for row in df]
        self.y = [row[1:]for row in df]
        self.indices = len(self.x)
        self.batch_size = batch_size
        self.sequences = sequences

        
    def __len__(self): ## returns number of batches in a Sequence
        return math.ceil(len(self.x) / (self.batch_size * sequences))
   
    def __getitem__(self, idx): ## yield a batch of tuple(sample,label)
        ## param idx = index of batch
        
            
        batch_x = self.x[idx * self.batch_size * self.sequences: (idx + 1)  *  self.batch_size * self.sequences]
        batch_y = self.y[idx * self.batch_size * self.sequences: (idx + 1)  *  self.batch_size * self.sequences]
        
        temporal_batch_x = []
        temporal_batch_y = []
        
        sampleIndex = 0
        
        for row in range(self.batch_size):
            row_list_batch_x = []
            row_list_batch_y = []
            for column in range(self.sequences):
                
                

                row_list_batch_x.append(kp.img_to_array(kp.load_img(batch_x[sampleIndex],
                                         color_mode='grayscale',target_size=(100,100),
                                         interpolation='nearest')).dot(1./255)) ## dot product for normalising image between 0-1
                row_list_batch_y.append(batch_y[sampleIndex])
                sampleIndex +=1
            temporal_batch_x.append(row_list_batch_x)
            temporal_batch_y.append(row_list_batch_y)
        
        ## make batch_x and batch_y return a batch[samples[sequence[]]]
        ## len(batch_x) == batch_size
        
        return np.array(temporal_batch_x), np.array(temporal_batch_y)


* there seems to be a problem with the saved images on the disk , they are saved as grayscale but have still a dimension of 3.

# Creating Functional NN, called functional cause each layers is treated as a function 


In [7]:
class customCallBack(keras.callbacks.Callback):
    def __init__(self,validationAcc):
        self.validationAcc = validationAcc
        
    
    def on_epoch_end(self, batch, logs=None):
        if(logs["val_acc"] >= self.validationAcc):
            self.model.stop_training = True
            


In [None]:
keras.metrics.accuracy

In [10]:
batch_size = 16
sequences = 8


callback_accuracy = customCallBack(validationAcc=0.85)

datagenTrain = CustomGeneratorTemporal(path_labels_training,batch_size=batch_size,sequences=sequences)
datagenValid = CustomGeneratorTemporal(path_labels_valid,batch_size=batch_size,sequences=sequences)

training_steps = datagenTrain.indices // batch_size 
validation_steps = datagenValid.indices // batch_size 


input_tensor = layers.Input(shape=(sequences,100,100,1))
CNN_1 = layers.TimeDistributed(layers.Conv2D(64,kernel_size=(3,3),activation='relu'))(input_tensor)
MAX_POOLING1 = layers.TimeDistributed(layers.MaxPooling2D(pool_size=(2,2)))(CNN_1)
CNN_2 = layers.TimeDistributed(layers.Conv2D(64,kernel_size=(3,3),activation='relu'))(MAX_POOLING1)
MAX_POOLING2 = layers.TimeDistributed(layers.MaxPooling2D(pool_size=(2,2)))(CNN_2)
CNN_3 = layers.TimeDistributed(layers.Conv2D(128,kernel_size=(3,3),activation='relu'))(MAX_POOLING2)
FLATTEN_LAYER = layers.TimeDistributed(layers.Flatten())(CNN_3)
GRU_1 = layers.Bidirectional(layers.SimpleRNN(128,return_sequences=True,input_shape=(sequences,-1),recurrent_dropout=0.5))(FLATTEN_LAYER)
OUTPUT_TENSOR = layers.TimeDistributed(layers.Dense(150,activation='sigmoid'))(GRU_1)
model = keras.models.Model(input_tensor,OUTPUT_TENSOR)   ## aggregates layers between input_tensor and OUTPUT_TENSOR
model.summary()



model.compile(loss=binary_crossentropy,metrics=['accuracy'],optimizer= RMSprop())


try:
    history = model.fit(x=datagenTrain,
              steps_per_epoch=training_steps,
              epochs=1,validation_data=datagenValid,
              validation_steps=validation_steps,callbacks=[callback_accuracy]
                            )
except IndexError:
       model.stop_training = True
       print(IndexError)
  

Model: "model_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_3 (InputLayer)         (None, 8, 100, 100, 1)    0         
_________________________________________________________________
time_distributed_15 (TimeDis (None, 8, 98, 98, 64)     640       
_________________________________________________________________
time_distributed_16 (TimeDis (None, 8, 49, 49, 64)     0         
_________________________________________________________________
time_distributed_17 (TimeDis (None, 8, 47, 47, 64)     36928     
_________________________________________________________________
time_distributed_18 (TimeDis (None, 8, 23, 23, 64)     0         
_________________________________________________________________
time_distributed_19 (TimeDis (None, 8, 21, 21, 128)    73856     
_________________________________________________________________
time_distributed_20 (TimeDis (None, 8, 56448)          0   

KeyboardInterrupt: 

In [None]:
datagenTest = CustomGeneratorTemporal(path_labels_test,batch_size=8,sequences=8)
batch_x,batch_y = datagenTest.__getitem__(0)
pred_y = model.predict_on_batch(batch_x)


In [None]:
for pred in pred_y[6]:   ## Each beat is supposed to have at least 1 note with confidence 1; currently the model only is 
                         ## able to output with the highest confidence of 0.15, no self-confidence
    print(pred.max())

In [51]:
def prediction_confidence(nHighestPred,batch,sequence):
    
    sorted_preds = sorted((pred_y[batch][sequence]).tolist(),reverse = True)[:nHighestPred]
    return [np.where(pred_y[batch][sequence] == value) for value in sorted_preds]

prediction_confidence(1,0,1)

[(array([0], dtype=int64),)]

The problem seems to lie with the fact that model.fit(generator expects a list of tuples of batch_size(x,y)) while x in our case is not a sample, but a sequence of samples


generator needs to yield tuple of(batch_size[samples[sequences], batch_size[labels[sequences]]]) for timeseries
,whereas a normal ImagedataGenerator yields tuple of (batch_size[samples],batch_size[labels])

it seems to pass batches of frames to a CNN , a time distributed layer needs to be wrapped around the CNN,

* find an alternative way to save matplotlib to disk , as the matplot.savefig() method handles grayscale incorrectly.
* as saved grayscale image need to have the dimension of shape(100,100,1)

* solution 2 , save images as RGB on disk, transform into grayscale in batch

* Solution: use keras_inbuild preprocessing tools from now on

* output for of Dense also has to be 3D , namely of type (bath_size,timesteps,preds)

* around 700 beat images missing in training

In [None]:
def findMissingGp3():
    
    for file in scan
    
    