<a href="https://colab.research.google.com/github/EricEricEricJin/ECE539-Group-Project/blob/master/pred_series_mlp_preprocess.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Aims to develop a MLP that use predict series as input and output the class (speech or singing)

```
[M samples] [N sample]
           |
   marker -|
```
Input shape: (M+N),  
Output shape: (2), categorical

In [25]:
from google.colab import drive
drive.mount("/content/drive")

import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import tensorflow as tf
import tensorflow.keras as keras

import os
import random

Mounted at /content/drive


In [26]:
# - Use model to predict
# - Align with marker
# - Sample M before and N after
# - Sample (M+N) inside large-enough intervals !!NEED TO COVER CEC!!
# - Save samples in .npy format

In [27]:
SAMPLE_RATE = 22050

MM = int(0.5*60*SAMPLE_RATE) # 0.5min, assume all intervals between songs are larger than 0.5min
NN = int(1.5*50*SAMPLE_RATE) # 1.5min, assume all songs are longer than 1.5min
DIR_NAME = "/content/drive/My Drive/Colab Notebooks/ECE539_Project/"

CHUNK_SAMPLE = 661500 // 5

In [28]:
# Load model
model = keras.models.load_model(DIR_NAME + f"ms_clf_weights_N={CHUNK_SAMPLE}")
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 resizing (Resizing)         (None, 512, 64, 1)        0         
                                                                 
 normalization (Normalizati  (None, 512, 64, 1)        3         
 on)                                                             
                                                                 
 conv2d (Conv2D)             (None, 510, 62, 32)       320       
                                                                 
 conv2d_1 (Conv2D)           (None, 508, 60, 64)       18496     
                                                                 
 max_pooling2d (MaxPooling2  (None, 254, 30, 64)       0         
 D)                                                              
                                                                 
 dropout (Dropout)           (None, 254, 30, 64)       0

In [29]:
def evaluate_stream(BV, STEP_SIZE):
  # Load audio file and parse marker file
  wav_filepath = DIR_NAME + f"xxm_mixed/{BV}.wav"
  audio_binary = tf.io.read_file(wav_filepath)
  marker = np.loadtxt(
    DIR_NAME + f"xxm_mixed/{BV}.csv",
    delimiter=',')
  marker = marker[:,0] * 3600 + marker[:,1] * 60 + marker[:,2]

  # Decode audio and transform to spectrograms
  audio, _ = tf.audio.decode_wav(audio_binary)
  waveform = tf.squeeze(audio)

  def get_spectrogram(waveform):
      spectrogram = tf.signal.stft(
          waveform, frame_length=255, frame_step=128)
      # removes the phase (we only care about the magnitude)
      spectrogram = tf.abs(spectrogram)
      return spectrogram

  pred_list = []
  model_input_list = []

  for i in range((len(waveform) - CHUNK_SAMPLE) // STEP_SIZE + 1):
    wave_ith = waveform[i * STEP_SIZE : i * STEP_SIZE + CHUNK_SAMPLE]
    model_input_list.append(tf.expand_dims(get_spectrogram(wave_ith), -1))

  # Use model to predict
  input_shape = model_input_list[0].shape
  pred_list = model.predict(tf.reshape(model_input_list, (len(model_input_list), *input_shape)))
  return marker, pred_list

In [30]:
BV_list = ["BV1bT411S7ck",
           "BV15z4y1M7ee",
           "BV1WN41167kE"]
STEP_SIZE = CHUNK_SAMPLE // 2

for BV in BV_list:
  marker, pred_list = evaluate_stream(BV, STEP_SIZE)
  pred_1D = pred_list[:,1] - pred_list[:,0]

  start_idx = np.array([int((m-60) * SAMPLE_RATE / STEP_SIZE) for m in marker])
  end_idx = np.array([int((m+90) * SAMPLE_RATE / STEP_SIZE) for m in marker])
  input_size = end_idx[0] - start_idx[0]

  print("start idx", start_idx)
  print("end idx", end_idx)
  print("intervals", end_idx-start_idx)
  print("input size", input_size)

  # singings
  singings = np.array([pred_1D[start_idx[i] : end_idx[i]] for i in range(len(marker))])

  # speeches
  speeches = []
  for i in range(len(marker) - 1):
    for j in range((start_idx[i+1] - end_idx[i]) // input_size):
      speeches.append(pred_1D[end_idx[i] + j*input_size : end_idx[i] + (j+1)*input_size])
  for i in range((len(pred_1D) - end_idx[-1]) // input_size):
    speeches.append(pred_1D[end_idx[-1] + i*input_size : end_idx[-1] + (i+1)*(input_size)])
  speeches = np.array(random.sample(speeches, len(marker)))

  print(BV, "singing", singings.shape, "speech", speeches.shape)

  # save to drive
  np.save(DIR_NAME + "pred_series/singing/" + f"{BV}.npy", singings)
  np.save(DIR_NAME + "pred_series/speech/" + f"{BV}.npy", speeches)


start idx [ 367  447  589  746  883 1212 1263 1355 1449 1564 1625 1685 1791 1840
 1949 2030 2165 2240 2355 2416]
end idx [ 417  497  639  796  933 1262 1313 1405 1499 1614 1675 1735 1841 1890
 1999 2080 2215 2290 2405 2466]
intervals [50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50]
input size 50
BV1bT411S7ck singing (20, 50) speech (20, 50)
start idx [ 140  486  573  686  795 1100 1184 1260 1349 1447 1555 1636 1730 1856
 1981 2144 2237 2344 2444]
end idx [ 190  536  623  736  845 1150 1234 1310 1399 1497 1605 1686 1780 1906
 2031 2194 2287 2394 2494]
intervals [50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50]
input size 50
BV15z4y1M7ee singing (19, 50) speech (19, 50)
start idx [ 343  449  583  761  856  961 1079 1188 1312 1476 1557 1694 1799 1864
 2024 2611 2731 2879 3075]
end idx [ 393  499  633  811  906 1011 1129 1238 1362 1526 1607 1744 1849 1914
 2074 2661 2781 2929 3125]
intervals [50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50]
input size 50
BV1W

In [31]:
drive.flush_and_unmount()