In [1]:
# FUNCTIONS

# for MIR-QBSH query
def read_semitone_from_MIR_query(dir_query):
    list_query_name = []
    df_query = []
    truth = []

    for file in os.listdir(dir_query):
        if file.endswith(".csv"):
            list_query_name.append(file.replace(".csv",""))
            file_path = os.path.join(dir_query, file)
            fields = ["semitone"]
            temp_df = pd.read_csv(file_path, usecols=fields)
            df_query.append(temp_df.to_numpy().flatten())
            truth.append(file.split("-")[2])

    return list_query_name, df_query, truth

# for MIR-QBSH manually labelled query
def read_semitone_from_MIR_manual_query(dir_query):
    list_query_name = []
    df_query = []
    truth = []

    for file in os.listdir(dir_query):
        if file.endswith(".pv"):
            list_query_name.append(file.replace(".pv",""))
            file_path = os.path.join(dir_query, file)
            temp_df = pd.read_csv(file_path)
            temp_df = temp_df.to_numpy().flatten()
            round_df = []
            for i in temp_df:
                if i!=0:
                    round_df.append(round(i))
            df_query.append(round_df)
            truth.append(file.replace(".pv","").split("-")[2])

    return list_query_name, df_query, truth

# for IOACAS_QBH query
def read_semitone_from_IOACAS_query(dir_query):
    list_query_name = []
    df_query = []
    truth = []

    truth_file = os.path.join(dir_query, "query_truth.list")

    # read csv as string type need converters
    truth_df = pd.read_csv(truth_file, header=None, sep="\t", converters={i: str for i in range(100)})

    all_truth = truth_df[1].values.astype(str).flatten()
    # print("all truth", all_truth)
    all_wav = []

    for wav in truth_df[0]:
        all_wav.append(wav.split("\\")[1].replace(".wav",""))

    for file in os.listdir(dir_query):
        if file.endswith(".csv"):
            list_query_name.append(file.replace(".csv",""))
            file_path = os.path.join(dir_query, file)
            fields = ["semitone"]
            temp_df = pd.read_csv(file_path, usecols=fields)
            df_query.append(temp_df.to_numpy().flatten())
            # index_truth = all_wav.index("002_010")
            index_truth = all_wav.index(file.split("-")[0])
            truth.append(str(all_truth[index_truth]))
    # print("one truth", truth)

    return list_query_name, df_query, truth

# for MIR-QBSH and IOACAS-QBH database
def read_note_from_midi(dir_midi):
    list_dir_midi = []
    df_midi = []

    for file in os.listdir(dir_midi):
        if file.endswith(".csv"):
            list_dir_midi.append(file.replace(".mid.csv",""))
            file_path = os.path.join(dir_midi, file)
            fields = ["note_index"]
            temp_df = pd.read_csv(file_path, usecols=fields)
            df_midi.append(temp_df.to_numpy().flatten())

    return list_dir_midi, df_midi

# calculate relative distance
def calc_rel_dis(df):
    res = []
    length = len(df)
    for i in range(length-1):
        dis = float(df[i+1] - df[i])
        res.append(dis)
    res = remove_consecutive(res)
    return res

# remove consecutive distance in list of relative distance 
def remove_consecutive(list):
    i = 0
    while i < len(list)-1:
        if list[i] == list[i+1]:
            del list[i]
        else:
            i = i+1
    return list

# arrange index according to its similarity, more similar less 
def get_all_rank(ls):
    res = []
    for i in ls:
        temp = ls.index(max(ls))
        res.append(list_dir_midi[temp])
        ls[temp] = 0
    return res

# get rank from ground truth
def get_rank(rank, truth):
    return rank.index(truth)+1

# get top ten rank
def get_top_ten(rank):
    return rank[:10]

# get inverted index from all midi
def get_inverted_index_midi(dis_midi, list_dir_midi):
    # create inverted index for 2-grams, 3-grams, 4-grams in all midi
    import hashedindex
    RP4G = hashedindex.HashedIndex()
    RP3G = hashedindex.HashedIndex()
    RP2G = hashedindex.HashedIndex()

    # example:
    # midi relative distance : +1 +1 +4
    # 2-grams are +1 and +4
    # 3-grams are +1 +1 and +1 +3
    # 4-grams is +1 +1 +4

    # print(dis_midi[0])

    # inserting 2-grams as inverted index
    for midiNumber in range(len(dis_midi)):
        for note in dis_midi[midiNumber]:
            RP2G.add_term_occurrence(note, list_dir_midi[midiNumber])

    # test 2-grams
    # RP2G.get_documents((2.0))

    # inserting 3-grams as inverted index
    for midiNumber in range(len(dis_midi)):
        for noteNumber in range(len(dis_midi[midiNumber])-1):
            term = (dis_midi[midiNumber][noteNumber], dis_midi[midiNumber][noteNumber+1])
            RP3G.add_term_occurrence(term, list_dir_midi[midiNumber])

    # test 3-grams
    # RP3G.get_documents((2.0, 0.0))

    # inserting 4-grams as inverted index
    for midiNumber in range(len(dis_midi)):
        for noteNumber in range(len(dis_midi[midiNumber])-2):
            term = (dis_midi[midiNumber][noteNumber], 
            dis_midi[midiNumber][noteNumber+1], dis_midi[midiNumber][noteNumber+2])
            
            RP4G.add_term_occurrence(term, list_dir_midi[midiNumber])

    # test 4-grams
    # RP4G.get_documents((2.0, 0.0, -2.0))

    return RP4G, RP3G, RP2G

# get lists containing 4-grams, 3-grams, and 2-grams from each query
def get_ngrams_query(dis_query):
    RP4G = []
    RP3G = []
    RP2G = []

    # inserting list of 4-grams
    for noteNumber in range(len(dis_query)-2):
        term = (dis_query[noteNumber], dis_query[noteNumber+1], dis_query[noteNumber+2])
        RP4G.append(term)

    # inserting list of 3-grams
    for noteNumber in range(len(dis_query)-1):
        term = (dis_query[noteNumber], dis_query[noteNumber+1])
        RP3G.append(term)

    # inserting list of 2-grams
    RP2G = dis_query
    
    return RP4G, RP3G, RP2G

from collections import Counter

# get Counter containing number of query appearances in inverted index
def get_counter_grams(index, query):
    res = Counter()
    for grams in query:
        try:
            res += index.get_documents(grams)
        except IndexError as error:
            pass
    return res

# get rank from 
def get_rank_from_counter_grams(res_grams, truth):
    res = []
    for i in res_grams.most_common():
        res.append(i[0])
    try:
        rank = get_rank(res, truth)
    except ValueError as error:
        rank = sys.maxsize
    return rank

# get rank with relative pitch 4-grams (RP4G), 3-grams (RP3G) and 2-grams (RP2G)
def get_rank_with_rpg(dis_query, truth):
    index4G, index3G, index2G = get_inverted_index_midi(dis_midi, list_dir_midi)
    query4G, query3G, query2G = get_ngrams_query(dis_query)

    # 4-grams
    res_4grams = get_counter_grams(index4G, query4G)

    # 3-grams
    res_3grams = get_counter_grams(index3G, query3G)

    # 2-grams
    res_2grams = get_counter_grams(index2G, query2G)

    res_rank = []

    rank = get_rank_from_counter_grams(res_4grams, truth)
    res_rank.append(rank)

    rank = get_rank_from_counter_grams(res_3grams, truth)
    res_rank.append(rank)

    rank = get_rank_from_counter_grams(res_2grams, truth)
    res_rank.append(rank)

    return res_rank

# get rank with Mode Normalised Frequency using edit distance method
def get_rank_with_mnf(df_query, truth):
    # ref = dis_midi
    # hyp = dis_query

    hyp = convert_df_to_MNF(df_query)
    midi = []
    for i in df_midi:
        midi.append(convert_df_to_MNF(i))
    
    list_ratio = []
    for ref in midi:
        sm = edit_distance.SequenceMatcher(a=ref, b=hyp)
        list_ratio.append(sm.ratio())
    
    rank = get_all_rank(list_ratio)
    rankTruth = get_rank(rank, truth)

    # show all rank
    # print("All rank from MNF:", rank)

    # show top ten result
    # print("Top ten from MNF:", get_top_ten(rank))

#     try:
#         rankTruth = get_rank(rank, truth)
#     except ValueError as error:
#         rankTruth = len(dis_midi)
    # topTen = get_top_ten(rank)

    return rankTruth

# get best rank from both nGrams and edit distance method
def get_rank_with_unified_algorithm(dis_query, df_query, truth):
    res = get_rank_with_rpg(dis_query, truth)

    # comment lines below if not using MNF for faster retrieval
    try:
        res.append(get_rank_with_mnf(df_query, truth))
    except ValueError as error:
        pass
    except IndexError as error:
        pass

    bestRank = min(res)
    return bestRank

# convert one dataframe containing semitone to Mode Normalised Frequency (MNF)
def convert_df_to_MNF(df):
    from collections import Counter
    res = []
    data = Counter(df)

    data = data.most_common()
    mode = data[0][0]
    adder = int(78-mode) # mode marked as char 'N' which is 78 in ascii

    for i in df:
        res.append(chr(int(i+adder)))
    res = remove_consecutive(res)
    return res

# count MRR from a list of rank from some queries
def count_MRR(list_rank):
    mrr = 0
    counter = 0
    for rank in list_rank:
        if not rank == sys.maxsize:
            mrr+=1/rank
            counter+=1
    return round(mrr/counter,2), counter

# count top 3 hit ratios from list rank
def count_top_3_ratio(list_rank):
    count_top3 = 0
    counter = 0
    for rank in list_rank:
        if not rank == sys.maxsize:
            if rank <= 3:
                count_top3+=1
            counter+=1
    return round(count_top3/counter,2), counter

# count top 5 hit ratios from list rank
def count_top_5_ratio(list_rank):
    count_top5 = 0
    counter = 0
    for rank in list_rank:
        if not rank == sys.maxsize:
            if rank <= 5:
                count_top5+=1
            counter+=1
    return round(count_top5/counter,2), counter

# count top 10 hit ratios from list rank
def count_top_10_ratio(list_rank):
    count_top10 = 0
    counter = 0
    for rank in list_rank:
        if not rank == sys.maxsize:
            if rank <= 10:
                count_top10+=1
            counter+=1
    return round(count_top10/counter,2), counter

def print_result_to_file(process_name):
    from datetime import datetime

    dateTimeObj = str(datetime.now())+".txt"
    dateTimeObj = dateTimeObj.replace(":","-")
    filename = os.path.join(folder_hasil, process_name + ' ' + dateTimeObj)
    with open(filename, 'w') as f:
        f.write(cap.stdout)

def process_query():
    process_name = dir_query.replace(folder_data+"\\","")+" with "+dir_midi.replace(folder_data+"\\","")
    print("Start process", process_name)

    for i in range(len(dis_query)):
        print("Processing...",i+1,"/",len(dis_query))
        
        start_time = time.time()

        rankTruth = get_rank_with_unified_algorithm(dis_query[i], df_query[i], truth[i])
        
        list_time.append(round(time.time() - start_time, 3))

        list_rank.append(rankTruth)

        # show ground truth from a query
        # print("query",list_query_name[i],"truth is",truth[i])

        print("query",list_query_name[i],"truth is on rank",rankTruth)

    print("Finish process", process_name)
    
    try:
        mrr, counter = count_MRR(list_rank)
        print("MRR:", mrr, "from", counter, "queries")

        hit_ratio, counter = count_top_3_ratio(list_rank)
        print("Top 3 ratio:", hit_ratio, "from", counter, "queries" )

        hit_ratio, counter = count_top_5_ratio(list_rank)
        print("Top 5 ratio:", hit_ratio, "from", counter, "queries" )

        hit_ratio, counter = count_top_10_ratio(list_rank)
        print("Top 10 ratio:", hit_ratio, "from", counter, "queries" )
        
        print("Avg time:", round(sum(list_time)/len(list_time), 3))
    
    except ZeroDivisionError:
        print("No result to show")
        print("list rank", list_rank)
        print("list time", list_time)

    return process_name

In [2]:
%%capture cap --no-stderr
# read midi and query data
# insert it into dataframe

import os
import pandas as pd
import edit_distance
import time
import sys

folder_data = "Data query dan midi"
folder_hasil = "Hasil eksperimen"

if not os.path.exists(folder_hasil):
    os.mkdir(folder_hasil)

# test query
# dir_query = os.path.join(folder_data, "Test query IOACAS_QBH")
# dir_query = os.path.join(folder_data, "Test query IOACAS_QBH_DNN-LSTM")
# dir_query = os.path.join(folder_data, "Test query MIR-QBSH")
# dir_query = os.path.join(folder_data, "Test query MIR-QBSH_DNN-LSTM")
# dir_query = os.path.join(folder_data, "Test query MIR-QBSH_manual label")

# all query
# dir_query = os.path.join(folder_data, "Query_IOACAS_QBH")
dir_query = os.path.join(folder_data, "Query_IOACAS_QBH_DNN-LSTM")
# dir_query = os.path.join(folder_data, "Query_MIR_QBSH")
# dir_query = os.path.join(folder_data, "Query_MIR_QBSH_DNN-LSTM")
# dir_query = os.path.join(folder_data, "Query_MIR_QBSH_manual label")

# all midi
# dir_midi = os.path.join(folder_data, "Database midi MIR-QBSH")
dir_midi = os.path.join(folder_data, "Database midi IOACAS-QBH")


# read and insert semitone from query to list of dataframe
if "manual" in dir_query:
    list_query_name, df_query, truth = read_semitone_from_MIR_manual_query(dir_query)
elif "IOACAS" in dir_query:
    list_query_name, df_query, truth = read_semitone_from_IOACAS_query(dir_query)
else:
    list_query_name, df_query, truth = read_semitone_from_MIR_query(dir_query)

# read and insert note_index or semitone from midi to list of dataframe
list_dir_midi, df_midi = read_note_from_midi(dir_midi)

# calculate relative distance in all query
dis_query = []
for query in df_query:
    dis_query.append(calc_rel_dis(query))

# calculate relative distance in all midi
dis_midi = []
for midi in df_midi:
    dis_midi.append(calc_rel_dis(midi))

# ref = dis_midi
# hyp = dis_query

list_rank = []
list_time = []

# test match with query data
process_name = process_query()

KeyboardInterrupt: 

In [3]:
print_result_to_file(process_name)

NameError: name 'cap' is not defined

In [None]:
%%capture cap --no-stderr
# read midi and query data
# insert it into dataframe

import os
import pandas as pd
import edit_distance
import time
import sys

folder_data = "Data query dan midi"
folder_hasil = "Hasil eksperimen"

if not os.path.exists(folder_hasil):
    os.mkdir(folder_hasil)

# test query
# dir_query = os.path.join(folder_data, "Test query IOACAS_QBH")
# dir_query = os.path.join(folder_data, "Test query IOACAS_QBH_DNN-LSTM")
# dir_query = os.path.join(folder_data, "Test query MIR-QBSH")
# dir_query = os.path.join(folder_data, "Test query MIR-QBSH_DNN-LSTM")
# dir_query = os.path.join(folder_data, "Test query MIR-QBSH_manual label")

# all query
dir_query = os.path.join(folder_data, "Query_IOACAS_QBH")
# dir_query = os.path.join(folder_data, "Query_IOACAS_QBH_DNN-LSTM")
# dir_query = os.path.join(folder_data, "Query_MIR_QBSH")
# dir_query = os.path.join(folder_data, "Query_MIR_QBSH_DNN-LSTM")
# dir_query = os.path.join(folder_data, "Query_MIR_QBSH_manual label")

# all midi
# dir_midi = os.path.join(folder_data, "Database midi MIR-QBSH")
dir_midi = os.path.join(folder_data, "Database midi IOACAS-QBH")


# read and insert semitone from query to list of dataframe
if "manual" in dir_query:
    list_query_name, df_query, truth = read_semitone_from_MIR_manual_query(dir_query)
elif "IOACAS" in dir_query:
    list_query_name, df_query, truth = read_semitone_from_IOACAS_query(dir_query)
else:
    list_query_name, df_query, truth = read_semitone_from_MIR_query(dir_query)

# read and insert note_index or semitone from midi to list of dataframe
list_dir_midi, df_midi = read_note_from_midi(dir_midi)

# calculate relative distance in all query
dis_query = []
for query in df_query:
    dis_query.append(calc_rel_dis(query))

# calculate relative distance in all midi
dis_midi = []
for midi in df_midi:
    dis_midi.append(calc_rel_dis(midi))

# ref = dis_midi
# hyp = dis_query

list_rank = []
list_time = []

# test match with query data
process_name = process_query()

In [None]:
print_result_to_file(process_name)