<a href="https://colab.research.google.com/github/LeoPVL/Goznak/blob/main/Goznak_ML_Task2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [21]:
import librosa # for mel-spectrogram estimation
!pip install soundfile
import soundfile # for opening .flac audio
from matplotlib import pyplot as plt
import numpy as np
from google.colab import drive
import tensorflow as tf
import pathlib
import cv2
from tensorflow.keras.layers import Input,Conv2D,MaxPooling2D,UpSampling2D,Cropping2D,concatenate,ZeroPadding2D
from tensorflow.keras import Model



In [22]:
def _floats_feature(value):
   return tf.train.Feature(float_list=tf.train.FloatList(value=value.reshape(-1)))

def serialize_example(feature0, feature1):
    feature = {
        'image': _floats_feature(feature0),
        'target': _floats_feature(feature1)
    }
    example_proto = tf.train.Example(features = tf.train.Features(feature = feature))
    return example_proto.SerializeToString()

def read_file(file_path):
    arr = np.load(file_path)
    img = arr.T
    return img

def get_tf_records(directory,file_name,SIZE = (80,48),records_count = None):
    print('getting tf records ', file_name)
    clean_paths = [x for x in pathlib.Path(directory+'clean/').rglob('*.npy')]
    noisy_paths = [x for x in pathlib.Path(directory+'noisy/').rglob('*.npy')]

    print('Found ',len(noisy_paths),' records.')
    if records_count is None:
      clean_paths = sorted(clean_paths)
      noisy_paths = sorted(noisy_paths)
    else:
      clean_paths = sorted(clean_paths)[:records_count]
      noisy_paths = sorted(noisy_paths)[:records_count]
    i=0
    sample_size = SIZE[1] 
    with tf.io.TFRecordWriter(directory+file_name) as writer:
        while i < len(noisy_paths):
                if ((i+1)%1000 == 0)and(i!=0):
                  print(i,' records recorded')
                noisy = read_file(str(noisy_paths[i]))
                clean = read_file(str(clean_paths[i]))
                noise = noisy - clean
                noisylen = noisy.shape[1]


                samples_num = noisylen//sample_size+1
                padding = np.zeros((size[0],(size[1]*(samples_num)-noisylen)))
                noisy = np.concatenate([noisy,padding],axis = 1)
                noise = np.concatenate([noise,padding],axis = 1)
                for k in range(0,samples_num):
                  sample = noisy[:,k*sample_size:(k+1)*sample_size]
                  X = sample
                  y = noise[:,k*sample_size:(k+1)*sample_size]

                  example = serialize_example(
                      X, y
                  )
                  writer.write(example)

                i+=1


In [23]:
def read_tfrecord(example):
    TFREC_FORMAT = {
        "image": tf.io.FixedLenFeature([*SIZE], dtype=tf.float32), 
        
        "target": tf.io.FixedLenFeature([*SIZE], dtype=tf.float32)
        }
    example = tf.io.parse_single_example(example, TFREC_FORMAT)
    image = example['image']
    target = example['target']
    return image, target
def load_dataset(filename):
    dataset = tf.data.TFRecordDataset(filename)
    dataset = dataset.map(read_tfrecord) 
    return dataset
def arcface_format(image, target):
    return {'inp': image},target

def get_dataset(filenames,batch_size):
    dataset = load_dataset(filenames)
    dataset = dataset.map(arcface_format)
    dataset = dataset.repeat() 
    dataset = dataset.shuffle(2048)
    dataset = dataset.batch(batch_size)
    return dataset

In [24]:
def get_crop_shape(target, refer):
    
    cw = (target.get_shape()[2] - refer.get_shape()[2])
    assert (cw >= 0)
    if cw % 2 != 0:
        cw1, cw2 = cw // 2, cw // 2 + 1
    else:
        cw1, cw2 = cw // 2, cw // 2
    ch = (target.get_shape()[1] - refer.get_shape()[1])
    assert (ch >= 0)
    if ch % 2 != 0:
        ch1, ch2 = ch // 2, ch // 2 + 1
    else:
        ch1, ch2 = ch // 2, ch // 2

    return (ch1, ch2), (cw1, cw2)

def get_encoder(img_shape):
    
    inp = Input(shape=img_shape)
    conv1 = Conv2D(64, (5, 5), activation='relu', padding='same', data_format="channels_last", name='conv1_1')(inp)
    conv1 = Conv2D(64, (5, 5), activation='relu', padding='same', data_format="channels_last", name='conv1_2')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2), data_format="channels_last", name='pool1')(conv1)
    conv2 = Conv2D(96, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv2_1')(pool1)
    conv2 = Conv2D(96, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv2_2')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2), data_format="channels_last", name='pool2')(conv2)

    conv3 = Conv2D(128, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv3_1')(pool2)
    conv3 = Conv2D(128, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv3_2')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2), data_format="channels_last", name='pool3')(conv3)

    conv4 = Conv2D(256, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv4_1')(pool3)
    conv4 = Conv2D(256, (4, 4), activation='relu', padding='same', data_format="channels_last", name='conv4_2')(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2), data_format="channels_last", name='pool4')(conv4)

    conv5 = Conv2D(512, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv5_1')(pool4)
    conv5 = Conv2D(512, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv5_2')(conv5)
    
    return conv5,conv4,conv3,conv2,conv1,inp

def get_decoder(convs):
    
    conv5,conv4,conv3,conv2,conv1,inputs = convs
    
    up_conv5 = UpSampling2D(size=(2, 2), data_format="channels_last", name='up_conv5')(conv5)
    ch, cw = get_crop_shape(conv4, up_conv5)
    crop_conv4 = Cropping2D(cropping=(ch, cw), data_format="channels_last", name='crop_conv4')(conv4)
    up6 = concatenate([up_conv5, crop_conv4])
    conv6 = Conv2D(256, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv6_1')(up6)
    conv6 = Conv2D(256, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv6_2')(conv6)

    up_conv6 = UpSampling2D(size=(2, 2), data_format="channels_last", name='up_conv6')(conv6)
    ch, cw = get_crop_shape(conv3, up_conv6)
    crop_conv3 = Cropping2D(cropping=(ch, cw), data_format="channels_last", name='crop_conv3')(conv3)
    up7 = concatenate([up_conv6, crop_conv3])
    conv7 = Conv2D(128, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv7_1')(up7)
    conv7 = Conv2D(128, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv7_2')(conv7)

    up_conv7 = UpSampling2D(size=(2, 2), data_format="channels_last", name='up_conv7')(conv7)
    ch, cw = get_crop_shape(conv2, up_conv7)
    crop_conv2 = Cropping2D(cropping=(ch, cw), data_format="channels_last", name='crop_conv2')(conv2)
    up8 = concatenate([up_conv7, crop_conv2])
    conv8 = Conv2D(96, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv8_1')(up8)
    conv8 = Conv2D(96, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv8_2')(conv8)

    up_conv8 = UpSampling2D(size=(2, 2), data_format="channels_last", name='up_conv8')(conv8)
    ch, cw = get_crop_shape(conv1, up_conv8)
    crop_conv1 = Cropping2D(cropping=(ch, cw), data_format="channels_last", name='crop_conv1')(conv1)
    up9 = concatenate([up_conv8, crop_conv1])
    conv9 = Conv2D(64, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv9_1')(up9)
    conv9 = Conv2D(64, (3, 3), activation='relu', padding='same', data_format="channels_last", name='conv9_2')(conv9)

    ch, cw = get_crop_shape(inputs, conv9)
    conv9 = ZeroPadding2D(padding=(ch, cw), data_format="channels_last", name='conv9_3')(conv9)
    conv10 = Conv2D(1, (1, 1), activation='tanh', data_format="channels_last", name='out')(conv9)
    
    return conv10
    

def get_unet(img_shape = (200,200,1)):

    enc = get_encoder(img_shape)
    
    dec = get_decoder(enc)
    
    model = Model(inputs=enc[-1], outputs=dec)

    return model

In [25]:
def inference_with_pics(noisy_path,clean_path,path_to_model,size):
  X = []
  y = []
  model = tf.keras.models.load_model(path_to_model)
  sample_size = size[1] 
  noisy = read_file(str(noisy_path))
  clean = read_file(str(clean_path))
  noisylen = noisy.shape[1]

  samples_num = noisylen//sample_size+1
  padding = np.zeros((size[0],(size[1]*(samples_num)-noisylen)))
  noisy = np.concatenate([noisy,padding],axis = 1)
  for k in range(0,samples_num):
    sample = noisy[:,k*sample_size:(k+1)*sample_size]
    X+= [sample]
  
  X = (np.array(X).reshape(samples_num, *size, 1))

  pred = model(X)
  pred_noise = np.concatenate([x for x in pred], axis = 1)
  
  pred_noise = pred_noise.reshape(size[0],size[1]*(samples_num))
  result = noisy-pred_noise
  print('noisy')
  plt.imshow(noisy)
  plt.show()
  print('clean')
  plt.imshow(np.concatenate([clean,padding],axis = 1))
  plt.show()
  print('pred')
  plt.imshow(result)
  plt.show()

In [26]:
# clean_path = '/content/drive/MyDrive/train/val/clean/1166/1166_14986_1166-14986-0024.npy'
# noisy_path = '/content/drive/MyDrive/train/val/noisy/1166/1166_14986_1166-14986-0024.npy'
# inference_with_pics(noisy_path,clean_path,TRAIN_PATH+'denoising_model.h5')

In [27]:
# clean_path = '/content/drive/MyDrive/train/val/clean/245/245_122647_245-122647-0070.npy'
# noisy_path = '/content/drive/MyDrive/train/val/noisy/245/245_122647_245-122647-0070.npy'
# inference_with_pics(noisy_path,clean_path,TRAIN_PATH+'denoising_model.h5')


In [28]:
def train(train_path,VAL_PATH,):
  
  batch_size = 100
  
  # get_tf_records(train_path,'train.tfrec',size = size)
  # get_tf_records(VAL_PATH,'val.tfrec',size = size)
  
  train_dataset = get_dataset(train_path + 'train.tfrec',batch_size)
  val_dataset = get_dataset(VAL_PATH + 'val.tfrec',batch_size)

  model = get_unet([*SIZE,1])
  opt = tf.keras.optimizers.Adam(lr=(1e-4))
  model.compile(optimizer=opt,
              loss='MeanSquaredError',
              metrics=['MSE','CosineSimilarity'])
  
  for _ in range(10):
    model.fit(train_dataset,
            steps_per_epoch = 600, 
            validation_data=val_dataset,
            validation_steps = 100,
            epochs = 10)
    model.save(train_path+'denoising_model.h5')

In [29]:
def inference(noisy_file_path,path_to_model):

  model = tf.keras.models.load_model(path_to_model+'denoising_model.h5')
  X = []
  y = []
  sample_size = SIZE[1]
  noisy = read_file(str(noisy_file_path))
  noisylen = noisy.shape[1]

  samples_num = noisylen//sample_size+1
  padding = np.zeros((SIZE[0],(SIZE[1]*(samples_num)-noisylen)))
  noisy = np.concatenate([noisy,padding],axis = 1)
  for k in range(0,samples_num):
    sample = noisy[:,k*sample_size:(k+1)*sample_size]
    X+= [sample]
  
  X = (np.array(X).reshape(samples_num, *SIZE, 1))

  pred = model(X)
  pred_noise = np.concatenate([x for x in pred], axis = 1)
  
  pred_noise = pred_noise.reshape(SIZE[0],SIZE[1]*(samples_num))

  return pred_noise[:,:noisylen].T

In [None]:
drive.mount('/content/drive')
SIZE = (80,48)
TRAIN_PATH = '/content/drive/My Drive/train/train/train/'
VAL_PATH = '/content/drive/MyDrive/train/val/'
train(TRAIN_PATH,VAL_PATH)

noisy_path = VAL_PATH+'noisy/1084/1084_139230_1084-139230-0002.npy'
clean_mel = inference(noisy_path,TRAIN_PATH)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Epoch 1/10