<a href="https://colab.research.google.com/github/atjessehill/Thesis-Notebooks/blob/main/Process_Audio.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
from os import path
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
from google.colab import drive
import json
from sklearn import preprocessing
import glob
drive.mount('/content/drive')
from skimage.io import imsave, imread


Mounted at /content/drive


In [None]:
"""
A collection of functions and snippets for extracting audio features
1. MFCC feature extraction adapted from "The Sound of AI" Youtube Channel
2. Extracting and saving melspectrogram, and slicing songs into 30 second clips
   before extracting melspectrogram
"""

In [None]:
BASE = 'drive/My Drive'
SONG_SAMPLE_PATH = 'Thesis/Samples'
DATA_SAVE_PATH = 'Thesis/InputData'
SONG_FULL_PATH = 'Thesis/Samples/FULL_TRACKS'

In [None]:
def rescale_feature(x):
  scaler = preprocessing.MinMaxScaler()
  scaler.fit(x)

  return scaler.transform(x)

def load_splits():
  with open(os.path.join(BASE, DATA_SAVE_PATH, '10-fold-splits.json'), 'r') as fp:
    data = json.load(fp)

  return data

def scale_minmax(X, min=0.0, max=1.0):
  X_std = (X - X.min()) / (X.max() - X.min())
  X_scaled = X_std * (max - min) + min
  return X_scaled

def save_mfcc(json_file, n_mfcc=20, n_fft=2048, hop_length=512, rescale=False, debug=False):

  json_save_file = os.path.join(BASE, DATA_SAVE_PATH, json_file)

  data = {
      'data': 13,

            'mapping': {
          'noDJ': 0,
          'yesDJ': 1
      },
      'mfcc': [],
      'labels': []
  }

  paths = ['noDJ', 'yesDJ']
  shape = None
  for i, (p) in enumerate(paths):
    for j, (file) in enumerate(glob.glob(os.path.join(BASE, SONG_SAMPLE_PATH, p, "*.mp3"))):
      if j % 25 == 0:
        print("j{} i{} Loading{}".format(j, i, file))
      #print("j{} i{} Loading{}".format(j, i, file))
      signal, sr = librosa.load(file, sr=22050)
      mfcc = librosa.feature.mfcc(signal, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length)
      mfcc = mfcc.T
      if rescale:
        mfcc = rescale_feature(mfcc)
      if j == 0:
        print("Shape is {}".format(mfcc.shape))
        shape = mfcc.shape
      if mfcc.shape == shape:

        data['mfcc'].append(mfcc.tolist())
        data['labels'].append(i)

      else:
        print("Coudln't save {} with shape {} on file {}".format(file, mfcc.shape, file.split('//')[-1]))

      if debug and j==2:
        break
    if debug:
      return data

    with open(json_save_file, 'w') as fp:
      json.dump(data, fp, indent=4)

In [None]:
def load_data(path):
  with open(os.path.join(BASE, DATA_SAVE_PATH, 'All', path), 'r') as fp:
    data = json.load(fp)

  x = np.array(data['mfcc'])
  y = np.array(data['labels'])
  return x, y

def load_data_list(path):
  with open(os.path.join(BASE, DATA_SAVE_PATH, path), 'r') as fp:
    data = json.load(fp)

  x = data['mfcc']
  y = data['labels']
  return x, y

def setup_mfcc():

  n_mfcc = [20]
  # #n_mfcc = [13, 14, 15, 16]
  n_fft = [1024]
  hop_length = [1024]

  for nmfcc in n_mfcc:
    for nfft in n_fft:
      for hoplength in hop_length:
        json_file = "MFCC_{}_nfft_{}_hl_{}.json".format(nmfcc, nfft, hoplength)
        if nfft != hoplength:
          print(f"Skipping {json_file}")
          # continue
        else:
          print("Calling {}".format(json_file))
          save_mfcc(json_file, n_mfcc=nmfcc, n_fft=nfft, hop_length=hoplength, rescale=True)

In [None]:
# Function to remove MFCC

def twenty_to_fewer_mfcc():

  n_mfcc = [13, 14, 15, 16, 17, 18, 19]
  n_fft = [1024]
  hop_length = [1024]
  for nfft in n_fft:
    for hoplength in hop_length:
      data = {
          'mapping': {
              'noDJ': 0,
              'yesDJ': 1
          },
          'mfcc': [],
          'labels': []
      }

      if nfft == 512 and hoplength == 512:
        continue
      
      twenty_ref = f'MFCC_20_nfft_{nfft}_hl_{hoplength}.json'
      if path.exists(os.path.join(BASE, DATA_SAVE_PATH, twenty_ref)):
        print(f"Loading {twenty_ref}")
        x, y = load_data_list(twenty_ref)      
      
      for nmfcc in n_mfcc:
        json_save_file = os.path.join(BASE, DATA_SAVE_PATH, f'MFCC_{nmfcc}_nfft_{nfft}_hl_{hoplength}.json')
        outer = []
        for i in x:
          inner = []
          for j in i:
            inner.append(j[:nmfcc])
          outer.append(inner)

        data['mfcc'] = outer
        data['labels'] = y

        with open(json_save_file, 'w') as fp:
          json.dump(data, fp, indent=4)
          print("Created "+json_save_file)


In [None]:
# Extract and save melspectrogram for each file encountered in the filepath, 
# for each combination of window size and hop length
n_fft = [1024]
hop_length = [512]
paths = ['noDJ', 'yesDJ']
for nfft in n_fft:
  for hl in hop_length:
    print(f"Starting {nfft} and {hl}")
    # for i, (p) in enumerate(paths):
    for j, (file) in enumerate(glob.glob(os.path.join(BASE, SONG_SAMPLE_PATH, "30_sec", "*.mp3"))):
      url = file.split('/')[-1]
      url, clip, _ = url.split('.mp3')
      out = url+'_spec'+clip+'.npy'
      if j % 25 == 0:
        print("j{} i{} Loading{}".format(j, i, file))
      file_output = os.path.join(BASE, DATA_SAVE_PATH, "Spectrograms_30sec", out)
      if os.path.exists(file_output):
        continue
      y, sr = librosa.load(file, sr=22050)
      s = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=nfft, hop_length=hl)
      mels = np.log(s + 1e-9)
      img = scale_minmax(mels, 0.0, 255.0).astype(np.uint8)
      img = np.flip(img, axis=0)
      img = 255-img
      np.save(file_output, img)
      if j % 25 == 0:
        print(f"Finished {j}")

In [None]:
# Extract and save MFCC's for each 3:30 second clip, with different combinations
# of MFCC's, window size, and hop length

n_mfcc = [20]
n_fft = [1024]
hop_length = [128, 256, 512, 1024]
paths = ['noDJ', 'yesDJ']
rescale = True
for nmfcc in n_mfcc:
  for nfft in n_fft:
    for hl in hop_length:
      print(f"Starting {nfft} and {hl}")
      for i, (p) in enumerate(paths):
          for j, (file) in enumerate(glob.glob(os.path.join(BASE, SONG_SAMPLE_PATH, p, "*.mp3"))):
            url = file.split('/')[-1]
            out = url+'_MFCC_'+str(nmfcc)+'_'+str(nfft)+'_'+str(hl)+'.npy'
            file_output = os.path.join(BASE, DATA_SAVE_PATH, "MFCC", out)
            if j % 25 == 0:
              print("j{} i{} Loading{}".format(j, i, file))
            signal, sr = librosa.load(file, sr=22050)

            mfcc = librosa.feature.mfcc(signal, sr=sr, n_mfcc=nmfcc, n_fft=nfft, hop_length=hl)
            mfcc = mfcc.T
            if rescale:
              mfcc = rescale_feature(mfcc)
            if j == 0:
              print("Shape is {}".format(mfcc.shape))
              shape = mfcc.shape
            if mfcc.shape == shape:
              np.save(file_output, mfcc)
            else:
              print("Could not save", file_output)
            print(j)

In [None]:
# Slice each song into 30 second chunks, ignoring the end

def slice_audio():

  paths = ['noDJ', 'yesDJ']

  for i, (p) in enumerate(paths):
    for j, (file) in enumerate(glob.glob(os.path.join(BASE, 'Thesis', 'Samples', 'FULL_TRACKS', p, "*.mp3"))):
      if j % 25 == 0:
        print("Starting ", i, j, file)

      mp3_name = file.split('/')[-1]
      song = AudioSegment.from_mp3(file)
      offset = 0
      remove = len(song) % (30*1000)
      song = song[:len(song)-remove]
      slices = int(len(song)/(30*1000))
      for s in range(0, slices):
        start = s*30*1000
        end = (s+1)*30*1000
        extract = song[start:end]
        if len(extract) != 30000:
          print(f"Errored at {file} clip {s} --> {len(extract)}")
        else:
          file_name = mp3_name+f'_clip_{s}.mp3'
          file_name = os.path.join(BASE, 'Thesis', 'Samples', '30_sec', file_name)
          print(file_name)
          extract.export(file_name, format='mp3')

In [None]:
# Create a dictionary for the dataset which tracks which songs are in which
# 10-fold

url_splits = []
for i, shuffle_index in enumerate(splits):
  fold_data = {}
  x_train = np.delete(urls_np, shuffle_index)
  x_test = np.take(urls_np, shuffle_index)
  y_train = np.delete(result_np, shuffle_index)
  y_test = np.take(result_np, shuffle_index)

  fold_data['train'] = x_train.tolist()
  fold_data['test'] = x_test.tolist()
  fold_data['ytrain'] = y_train.tolist()
  fold_data['ytest'] = y_test.tolist()
  url_splits.append(fold_data)


json_save_file = os.path.join(BASE, DATA_SAVE_PATH, '10-fold-splits.json')

with open(json_save_file, 'w') as fp:
  json.dump(url_splits, fp, indent=4)

In [None]:
# Old essentia code, TODO: reimport essentia

def get_low_level_features(json_file):
  json_save_file = os.path.join(BASE, DATA_SAVE_PATH, json_file)

  data = {

      'mapping': {
        'noDJ': 0,
        'yesDJ': 1
      },
      'name': [],
      'labels': []
  }

  paths = ['noDJ', 'yesDJ']
  for i, (p) in enumerate(paths):
    for j, (file) in enumerate(glob.glob(os.path.join(BASE, SONG_FULL_PATH, p, "*.mp3"))):
      features, feature_frames = es.MusicExtractor(lowlevelStats=['mean', 'stdev'],
                                             rhythmStats=['mean', 'stdev'],
                                             tonalStats=['mean', 'stdev'])(file)

      for feature_name in features.descriptorNames():
        if feature_name not in data.keys():
          data[feature_name] = []
        data[feature_name].append(features[feature_name])
      data['name'].append(file.split('/')[-1])
      data['labels'].append(i)
      print("Extracted features for ", file)
      if j == 2:
        break
  with open(json_save_file, 'w') as f:
    json.dumps(data, f, indent=4)

get_low_level_features('outfile')