<a href="https://colab.research.google.com/github/ShakhovaP/musical-chord-recognition/blob/main/create_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [2]:
%cd /content/gdrive/MyDrive/GrKI_project/

/content/gdrive/MyDrive/GrKI_project


In [3]:
%ls

chord_transcript.py       create_dataset.py  GnR_test.wav
cnn_chord_recognition.py  [0m[01;34mdata[0m/              test_chord_list.lab


In [4]:
# from google.colab import files
# files.upload()

# PREPROCESSING DATA 
-----------------
##### Analysing and simplifying chords in chord transcription files.

In [5]:
import csv
import numpy as np
import os
import re

In [6]:
CHORDS_DIR = 'data/data_transcripts'

#### Used functions

In [7]:
def _read_file(song_path):
    """Reads file with chord transcriptions of the song

        :param song_path (string): path to the song transcription file
        :return chord_transcription (list): ChordStartTime, ChordEndTime and ChordLabel for each chord 
    """
    try:
        with open(song_path, "r") as csv_file:
            try:
                data = csv.reader(csv_file, delimiter=" ")
                chord_transcription = [[float(row[0]), float(row[1]), str(row[2])] for row in data]
                return chord_transcription
            except:
                print(f"Problem in processing {os.path.basename(song_path)}.\n"
                        "Data format is incorrect!")
                # [print(row[0]) for row in data]   
    except OSError:
        print(f"Can't open {os.path.basename(song_path)}")

In [8]:
def count_unique_values(data):
    """Counts how many times each value of data was used

        :param data (list):
        :return (dictionary): dictionary where keys are all values used in data and values are the numbers of times they were used
    """
    data = np.array(data).reshape(-1)
    # res = {}
    values, counts = np.unique(data, return_counts=True)  
    return dict(zip(values, counts))

In [9]:
def get_chord_transcripts(dirname):
    """Reads and saves chord transcriptions of each song from the directory

        :param dirname (string): name of the directory with transcription files
        :return transcriptions (dictionary): dictionary where keys are song transcription file names and 
                                                              values are [ [ChordStartTime, ChordEndTime, Chord], ...] of each song
    """
    transcriptions = {}
    for filename in os.scandir(dirname):
        if filename.is_file():
            t_chords = _read_file(filename)
            transcriptions[os.path.basename(filename)] = t_chords
    return transcriptions

In [10]:
def _get_column(matrix, i):
    """Returns needed column of given matrix

        :param matrix (list): given matrix
        :param i (int): column index

        :return (list): list of values of column with index i in matrix
    """
    return [row[i] for row in matrix]

In [11]:
def count_chord_types(song_transcripts_arr):
    """Counts how many times each chord was used in input dataset

        :param song_transcripts_arr (list): list of chord transcriptions
        :return chord_counts (dictionary): dictionary where keys are chord labels and values are the numbers of times they were used
    """
    all_used_chords = []
    for st in song_transcripts_arr:
        all_used_chords += _get_column(st, 2)
    chord_counts = count_unique_values(all_used_chords)
    return chord_counts

In [12]:
def _simplify_chord(chord):
    """Replaces complicated chord with simple analog

        :param chord (string):  input chord
        :return base (string): simplified chord
    """
    base = re.search("^[A-GN][#b]?", chord).group()   # find the base note in chord label)
    n = ["A", "B", "C", "D", "E", "F", "G"]
    if re.search(".*min.*|^[A-G][b#]?m.*", chord):    # if the chord is minor
        base += "m"                                   # add "m" to the base note
    
    if "#" in base:
        n.reverse()
        index = n.index(base[0]) - 1
        base += re.sub("[A-G]#", "|" + n[index] + "b", base)
    elif "b" in base:
        index = n.index(base[0]) - 1
        base = re.sub("[A-G]b", n[index] + "#", base) + "|" + base

    return base

In [13]:
def simplify_song(chord_transcript):
    """Replace all complicated chords with simple analogs

        :param song_transcript (list): list of [ChordStartTime, ChordEndTime, ChordLabel]
        :return 
    """
    for row in chord_transcript:
        row[2] = _simplify_chord(row[2])

____________
#### Preprocessing process

Getting a chord transcriptions for every song in directory

In [14]:
chord_transcriptions = get_chord_transcripts(CHORDS_DIR)
chord_transcriptions['11_-_In_My_Life.lab']

[[0.0, 0.440395, 'N'],
 [0.440395, 2.470042, 'A'],
 [2.470042, 5.045102, 'E'],
 [5.045102, 7.142059, 'A'],
 [7.142059, 9.607823, 'E'],
 [9.607823, 10.75721, 'A'],
 [10.75721, 11.929818, 'E'],
 [11.929818, 12.823786, 'F#:min'],
 [12.823786, 14.275034, 'A:7'],
 [14.275034, 15.436031, 'D'],
 [15.436031, 16.585419, 'D:min'],
 [16.585419, 18.895804, 'A'],
 [18.895804, 20.068412, 'A'],
 [20.068412, 21.24102, 'E'],
 [21.24102, 22.076938, 'F#:min'],
 [22.076938, 23.539795, 'A:7'],
 [23.539795, 24.730593, 'D'],
 [24.730593, 25.908231, 'D:min'],
 [25.908231, 28.230226, 'A'],
 [28.230226, 30.552222, 'F#:min'],
 [30.552222, 32.885827, 'D'],
 [32.885827, 35.172993, 'G'],
 [35.172993, 37.483378, 'A'],
 [37.483378, 39.816984, 'F#:min'],
 [39.816984, 42.127369, 'B'],
 [42.127369, 44.402925, 'D:min'],
 [44.402925, 46.7017, 'A'],
 [46.7017, 49.058526, 'A'],
 [49.058526, 51.415351, 'E'],
 [51.415351, 52.576349, 'A'],
 [52.576349, 53.748956, 'E'],
 [53.748956, 54.596485, 'F#:min'],
 [54.596485, 56.059342,

Count and print all chords and how many times they had been used

In [15]:
print('ALL USED CHORDS')
chord_types = count_chord_types(chord_transcriptions.values())
print("\nNumber of chord classes: ", len(list(chord_types.keys())))
print('\n')
[(f'{key}: {value} ( {round(value*100/sum(chord_types.values()), 2)}% )') for key, value in chord_types.items()]

ALL USED CHORDS

Number of chord classes:  908




['A: 1764 ( 4.41% )',
 'A#:7: 2 ( 0.01% )',
 'A#:hdim7: 2 ( 0.01% )',
 'A#:maj(*3): 32 ( 0.08% )',
 'A#:maj(1,*3,*5): 2 ( 0.01% )',
 'A#:min: 47 ( 0.12% )',
 'A#:min(9): 3 ( 0.01% )',
 'A#:min7: 8 ( 0.02% )',
 'A#:min7(b5): 3 ( 0.01% )',
 'A#:sus4: 1 ( 0.0% )',
 'A#:sus4(b7): 2 ( 0.01% )',
 'A/2: 10 ( 0.03% )',
 'A/3: 79 ( 0.2% )',
 'A/4: 2 ( 0.01% )',
 'A/5: 31 ( 0.08% )',
 'A/6: 1 ( 0.0% )',
 'A/7: 2 ( 0.01% )',
 'A/9: 2 ( 0.01% )',
 'A/b6: 1 ( 0.0% )',
 'A/b7: 14 ( 0.04% )',
 'A:(1): 11 ( 0.03% )',
 'A:(1,2,4): 1 ( 0.0% )',
 'A:(1,2,4,5): 44 ( 0.11% )',
 'A:(1,3,b5)/b5: 1 ( 0.0% )',
 'A:(1,5): 9 ( 0.02% )',
 'A:(1,b3,b5,6): 4 ( 0.01% )',
 'A:7: 236 ( 0.59% )',
 'A:7(#9): 7 ( 0.02% )',
 'A:7(*5,13): 1 ( 0.0% )',
 'A:7(13): 4 ( 0.01% )',
 'A:7(b9): 3 ( 0.01% )',
 'A:7/3: 11 ( 0.03% )',
 'A:7/5: 6 ( 0.02% )',
 'A:7/b7: 28 ( 0.07% )',
 'A:9: 10 ( 0.03% )',
 'A:9(11): 1 ( 0.0% )',
 'A:9/b7: 2 ( 0.01% )',
 'A:aug: 24 ( 0.06% )',
 'A:aug/#5: 2 ( 0.01% )',
 'A:dim: 3 ( 0.01% )',
 'A:dim/b3:

Simplify complicated chords: replace them with one of the list of 24 base chords

In [16]:
[simplify_song(transcript) for transcript in chord_transcriptions.values()]

print('\nSIMPLIFIED CHORDS:\n')
simplified_chord_types = count_chord_types(chord_transcriptions.values())
[print(f'{key}: {value} ( {round(value*100/sum(simplified_chord_types.values()), 2)}% )') 
                                            for key, value in simplified_chord_types.items()]

chord_classes = list(simplified_chord_types.keys())


SIMPLIFIED CHORDS:

A: 3887 ( 9.72% )
A#m|Bbm: 500 ( 1.25% )
A#|Bb: 1597 ( 3.99% )
Am: 1455 ( 3.64% )
B: 1922 ( 4.81% )
B#|Cb: 107 ( 0.27% )
Bm: 804 ( 2.01% )
C: 3402 ( 8.51% )
C#m|Dbm: 702 ( 1.76% )
C#|Db: 1400 ( 3.5% )
Cm: 610 ( 1.53% )
D: 4333 ( 10.84% )
D#m|Ebm: 334 ( 0.84% )
D#|Eb: 1276 ( 3.19% )
Dm: 818 ( 2.05% )
E: 2720 ( 6.8% )
E#m|Fbm: 9 ( 0.02% )
E#|Fb: 7 ( 0.02% )
Em: 1231 ( 3.08% )
F: 1928 ( 4.82% )
F#m|Gbm: 811 ( 2.03% )
F#|Gb: 1531 ( 3.83% )
Fm: 581 ( 1.45% )
G: 4530 ( 11.33% )
G#m|Abm: 611 ( 1.53% )
G#|Ab: 1528 ( 3.82% )
Gm: 508 ( 1.27% )
N: 834 ( 2.09% )


# CREATING DATASETS FOR DNN MODELS 

-----------------
##### Creating 2 datasets:
#####      1. using signal after stft-processing.
#####      1. using signal without any processing.



In [17]:
import json
import librosa
import numpy as np
import os

In [18]:
# AUDIO_DIR = '/content/gdrive/MyDrive/data/data_wav'
# AUDIO_TEST = '/content/gdrive/MyDrive/data/audio_test'
# JSON_PATH = '/content/gdrive/MyDrive/alldata-stft.json'
# filename = "05-Beat_It.wav"

AUDIO_DIR = 'data/data_wav'
AUDIO_TEST = 'data/audio_test'
JSON_PATH = 'data/dataset-1.json'

In [19]:
SAMPLE_RATE = 22050
N_FFT = 2048
HOP_LENGTH = 512
WINDOW_SIZE = 0.3


#### Transcription functions

In [20]:
def calc_duration(transcription):
    """Calculates song duration in transcription file

        :param transcription (list): list of [ChordStartTime, ChordEndTime, ChordLable] for each chord 
        :return (float): ChordEndTime of the last chord in transcription file
    """
    return transcription[-1][1]

In [21]:
def find_transcript(filename, transcriptions):
    """Finds transcription for song name

        :param filename (string): name of the song
        :return (list): transcription for song 
    """
    key = filename[:-4] + ".lab" # replace ".wav" with ".lab"
    return transcriptions[key]

In [22]:
def framing(transcript, frame_size):
    """Cuts song transcription into frames

        :param transcript (list): 
        :param frame_size (float): size of frame in seconds

        :return windowed_chords (list): list of chord labels
    """
    windowed_chords = []
    delta = 0
    for row in transcript:
        chord_duration = row[1] - row[0]
        chord = row[2]

        chord_duration += delta
        windows_pro_chord = int((chord_duration ) // frame_size)
        windowed_chords += [chord]*windows_pro_chord
        delta = chord_duration % frame_size
    return windowed_chords

In [23]:
def get_chord_class_number(chord, mapping):
    """Is used to replace chord label(string) with int

        :param chord (string): chord label
        :param mapping (list): list of chord labels 

        :return (int): index of input chord label in mapping
    """
    return mapping.index(chord)

#### Audio functions

In [24]:
def load_audio(filename, sample_rate, transcript):
    """Load audiofile

        :param filename (np.array): audiofile
        :param sample_rate (int): sample rate of audiofile
        :param transcript (float): 

        :return signal (np.array): array of samples
    """
    tr_duration = calc_duration(transcript)
    print('Transcript duration: ', tr_duration)
    signal, sr = librosa.load(filename, sr=sample_rate, duration=tr_duration)
    audio_duration = librosa.get_duration(signal, sr)
    print('Audio duration: ', audio_duration)
    return signal

In [25]:
def cut_into_frames(signal, frame_duration, sample_rate):
    """Cuts audiofile into small frames

        :param signal (np.array): array of samples
        :param frame_duration (float): choosen duration of each frame (in seconds)
        :param sample_rate (int): number of samples per second in audio signal

        :return cut_signal (np.array): array of equal-sized frames where each frame is array of samples
    """
    frame_length = frame_duration * sample_rate
    number_of_frames = int(len(signal) // frame_length)
    cut_signal_length = int(number_of_frames * frame_length)
    cut_signal = np.array(signal[:cut_signal_length]).reshape(number_of_frames, -1)
    return cut_signal

In [26]:
def calculate_STFTs(cut_signal, n_fft, hop_length): 
    """Calculates STFTs for each frame of input signal

        :param signal (np.array): audiofile
        :param sample_rate (int): sample rate of audiofile
        :param n_fft (int): 
        :param hop_length (int): 

        :return stft (np.array): array of STFT-matrix for each frame of input signal
    """
    stfts = [librosa.stft(s,  
                          n_fft=n_fft, 
                          hop_length=hop_length, 
                          ).T.tolist() for s in cut_signal]
    
  
    return stfts

#### Dataset functions

In [27]:
def save_json(json_path, data):
    """Saves dataset into JSON-file

        :param json_path (string): name of the file where the data will be saved
        :param data (dictionary): data that will be saved
            
        :return
    """
    with open(json_path, "w") as f:
        json.dump(data, f, indent=4)
    print('\nJSON saved!\n')

In [28]:
def create_dataset(transcripts, chord_classes, audiodir, sample_rate, window_size, n_fft, hop_length):
    """Saves dataset into JSON-file

        :param transcripts (list): list of chord transcription lists of each song
        :param chord_classes (list): list of all possible chord classes  
        :param audiodir (string): name of the directory with songs audio files
        :param sample_rate (int): number of samples per second in audio signal
        :param window_size (float): duration of each frame (in which the input signal will be cut)
        :param n_fft (float): 
        :param hop_length (float): 

        :return dataset (dictionary): dictionary with fields 
                                            "mapping" (the list of chord classes), 
                                            "STFTs" (the list of STFT matrixes of each frame for each song), 
                                            "signal" (the list of cut frames from the original signal without processing)
                                            "chords" (the list of framed chords (not names of the chords but indexes from "mapping"-field)
                                                      for each song)
    """
    
    dataset = {
        "mapping": [],
        # "STFTs": [],
        "signal": [],
        "chords": [],
    }

    dataset["mapping"] = chord_classes

    count = 1
    for _file in os.scandir(audiodir):
        if count == 26: break
        filename = os.path.basename(_file)
        print(f'\n {count} Processing {filename}')
        count += 1
        if _file.is_file():
            transcript = find_transcript(filename, transcripts)
            signal = load_audio(_file, sample_rate, transcript)

            for row in transcript:
                # row[2] = get_classification_vector(row[2], chord_classes)
                row[2] = get_chord_class_number(row[2], chord_classes)

            w_transcript = framing(transcript, window_size)
            w_signal = cut_into_frames(signal, window_size, sample_rate)
            while len(w_transcript) != len(w_signal):
                print(len(w_transcript), '!=', len(w_signal), 
                    'Number of transcript windows is not equal to the number of audio windows.')
                l = min(len(w_transcript), len(w_signal))
                w_transcript = w_transcript[:l-1]
                w_signal = w_signal[:l-1]
            
            # stfts = calculate_STFTs(w_signal, n_fft, hop_length)

            dataset["chords"] += w_transcript
            # dataset["STFTs"] += stfts
            dataset["signal"] += w_signal.tolist()
    return dataset

#### Dataset creation

In [29]:
chords_dataset = create_dataset(
    transcripts=chord_transcriptions,
    chord_classes=chord_classes,
    audiodir=AUDIO_DIR,
    sample_rate=SAMPLE_RATE,
    window_size=WINDOW_SIZE,
    n_fft=N_FFT,
    hop_length=HOP_LENGTH  
)


 1 Processing 01_-_A_Hard_Day's_Night.wav
Transcript duration:  152.555102
Audio duration:  152.0065306122449
508 != 506 Number of transcript windows is not equal to the number of audio windows.

 2 Processing 01_-_Come_Together.wav
Transcript duration:  260.649796
Audio duration:  260.02285714285716
868 != 866 Number of transcript windows is not equal to the number of audio windows.

 3 Processing 01_-_Drive_My_Car.wav
Transcript duration:  150.282449
Audio duration:  150.0212244897959

 4 Processing 01_-_Help!.wav
Transcript duration:  141.087347
Audio duration:  141.00897959183675

 5 Processing 01_-_I_Saw_Her_Standing_There.wav
Transcript duration:  175.804082
Audio duration:  175.0204081632653
586 != 583 Number of transcript windows is not equal to the number of audio windows.

 6 Processing 01_-_It_Won't_Be_Long.wav
Transcript duration:  133.746939
Audio duration:  133.01551020408164
445 != 443 Number of transcript windows is not equal to the number of audio windows.

 7 Process

Save data into JSON

In [30]:
save_json(json_path=JSON_PATH, data=chords_dataset)


JSON saved!

