# Load Csv Files (Input, Target)

In [7]:
import pandas as pd

# example files

input_file = pd.read_csv('train_data0307.csv')  
target_file = pd.read_csv('target_data0307.csv')

# Functions

In [34]:
import statistics

########################################################################################################################################
# About Octaves Functions


########################################################################################################################################
# About Octaves Functions

########################################################################################################################################
# About Dynamic, Dynamic Change Consistency Functions
def distribute_dynamic(mean_velocity):
    """
    Return Velocity to Dynamic
    """
    dynamic = ''
    if 0 <= mean_velocity < 36:
        dynamic = 'ppp'
    elif 36 <= mean_velocity < 49:
        dynamic = 'pp'
    elif 49 <= mean_velocity < 62:
        dynamic = 'p'
    elif 62 <= mean_velocity < 75:
        dynamic = 'mp'
    elif 75 <= mean_velocity < 88:
        dynamic = 'mf'
    elif 88 <= mean_velocity < 101:
        dynamic = 'f'
    elif 101 <= mean_velocity < 114:
        dynamic = 'ff'
    elif 114 <= mean_velocity <= 127:
        dynamic = 'fff'
    
    return dynamic


def add_new_dynamic_to_dataframe(input_file, target_file):

    input_file_note_status = [[{'note': x}, {'status': False}, {'velocity' : 0}] for x in range(128)]
    target_file_note_status = [[{'note': x}, {'status': False}, {'velocity' : 0}] for x in range(128)]

    input_dynamic_list = []
    target_dynamic_list = []
    
    for index in range(len(input_file)):

            target_temp_data = []

            notes_input = eval(input_file.at[index, 'note']) 
            msg_type_input = eval(input_file.at[index, 'msg_type']) 
            velocity = eval(input_file.at[index, 'velocity'])

            temp = list(zip(notes_input, msg_type_input, velocity)) 

            for data in temp:
                if data[1] == 'note_on':
                    input_file_note_status[data[0]][1]['status'] = True
                    input_file_note_status[data[0]][2]['velocity'] = data[2]
                elif data[1] == 'note_off':
                    input_file_note_status[data[0]][1]['status'] = False
                    input_file_note_status[data[0]][2]['velocity'] = data[2]

            velocity_list = []

            for on_data in input_file_note_status:
                if on_data[1]['status'] == True:
                    velocity_list.append(({'note': on_data[0]['note']}, {'velocity' : on_data[2]['velocity']}))

            max_velocity = 0 

            for velocity in velocity_list:
                
                if max_velocity < velocity[1]['velocity']:
                    max_velocity = velocity[1]['velocity']
            dynamic = distribute_dynamic(max_velocity)
            input_dynamic_list.append(dynamic)
            
    
    
    for index in range(len(target_file)):


            notes_input = eval(target_file.at[index, 'note'])
            msg_type_input = eval(target_file.at[index, 'msg_type'])
            velocity = eval(target_file.at[index, 'velocity'])

            temp = list(zip(notes_input, msg_type_input, velocity)) 

            
            for data in temp:
                if data[1] == 'note_on':
                    target_file_note_status[data[0]][1]['status'] = True
                    target_file_note_status[data[0]][2]['velocity'] = data[2]
                elif data[1] == 'note_off':
                    target_file_note_status[data[0]][1]['status'] = False
                    target_file_note_status[data[0]][2]['velocity'] = data[2]

            velocity_list = []

            for on_data in target_file_note_status:
                if on_data[1]['status'] == True:
                    velocity_list.append(({'note': on_data[0]['note']}, {'velocity' : on_data[2]['velocity']}))

            max_velocity = 0 

            for velocity in velocity_list:
                
                if max_velocity < velocity[1]['velocity']:
                    max_velocity = velocity[1]['velocity']
            dynamic = distribute_dynamic(max_velocity)
            target_dynamic_list.append(dynamic)


    target_file.insert(loc=5, column='new_dynamic', value=target_dynamic_list)
    input_file.insert(loc=5, column='new_dynamic', value=input_dynamic_list)
    

def calculate_dynamic_change_consistency(df_input, df_target):
    
    try:
        add_new_dynamic_to_dataframe(input_file, target_file)
    except:
        pass
    
    dynamic_series_input = df_input['new_dynamic']
    dynamic_series_target = df_target['new_dynamic']
    
    dynamic_states_input = zip(dynamic_series_input, dynamic_series_input.shift(-1))
    dynamic_states_target = zip(dynamic_series_target, dynamic_series_target.shift(-1))
    

    dynamic_counter_input = pd.Series(dynamic_states_input).value_counts().to_dict()
    dynamic_counter_target = pd.Series(dynamic_states_target).value_counts().to_dict()
    
    
    
    common_patterns = {state: min(dynamic_counter_input.get(state, 0), dynamic_counter_target.get(state, 0)) for state in set(dynamic_counter_input) | set(dynamic_counter_target)}
    total_common_patterns = sum(common_patterns.values())
    total_input_patterns = sum(dynamic_counter_input.values())

    
    dynamic_change_consistency_percentage = min(100, (total_common_patterns / total_input_patterns) * 100)

    return f'{dynamic_change_consistency_percentage:.2f}%'


def calculate_similarity_dynamic(input_df, target_df):
    
    try:
        add_new_dynamic_to_dataframe(input_file, target_file)
    except:
        pass
    
    input_counts = input_df['dynamic'].value_counts().to_dict()
    target_counts = target_df['dynamic'].value_counts().to_dict()
    

    common_keys = set(input_counts) & set(target_counts)


    total_possible_difference = sum(max(input_counts.get(key, 0), target_counts.get(key, 0)) for key in common_keys)
    actual_difference = sum(abs(input_counts.get(key, 0) - target_counts.get(key, 0)) for key in common_keys)


    similarity_percentage = ((total_possible_difference - actual_difference) / total_possible_difference) * 100
    return f'{similarity_percentage:.2f}%'


########################################################################################################################################
# About Note Accuracy Functions
def evaluate_note_accuracy(input_file, target_file, threshold=1):
    """
    threshold = "sec threshold"
    input_file = pandas Dataframe Data (train)
    target_file = pandas Dataframe Data (target)
    """
    
    
    target_temp_data = [] 
    target_file_note_status = [[{'note': x}, {'status': False}] for x in range(128)]
    input_file_note_status = [[{'note': x}, {'status': False}] for x in range(128)]

    
    total_score = 0 
    input_data_len = len(input_file)


    for index in range(len(input_file)):

            target_file_note_status = [[{'note': x}, {'status': False}] for x in range(128)]
            input_file_note_status = [[{'note': x}, {'status': False}] for x in range(128)]
            target_temp_data = []

            notes_input = eval(input_file.at[index, 'note']) 
            msg_type_input = eval(input_file.at[index, 'msg_type']) 

            for i in range(1+threshold):
                if i == 0:
                    notes_target = eval(target_file.at[index, 'note']) 
                    msg_type_target = eval(target_file.at[index, 'msg_type'])
                    target_temp_data.append(list(zip(notes_target, msg_type_target)))
                else:
                    try:
                        notes_target = eval(target_file.at[index-i, 'note'])
                        msg_type_target = eval(target_file.at[index-i, 'msg_type'])
                        target_temp_data.append(list(zip(notes_target, msg_type_target)))
                    except:
                        pass
                    notes_target = eval(target_file.at[index+i, 'note']) 
                    msg_type_target = eval(target_file.at[index+i, 'msg_type'])
                    target_temp_data.append(list(zip(notes_target, msg_type_target)))

            temp = list(zip(notes_input, msg_type_input)) 


            
            for data in temp:
                if data[1] == 'note_on':
                    input_file_note_status[data[0]][1]['status'] = True
                elif data[1] == 'note_off':
                    input_file_note_status[data[0]][1]['status'] = False

            
            for target_data in target_temp_data:
                target_file_note_status = [[{'note': x}, {'status': False}] for x in range(128)] 
                for data in target_data:
                    if data[1] == 'note_on':
                        target_file_note_status[data[0]][1]['status'] = True
                    elif data[1] == 'note_off':
                        target_file_note_status[data[0]][1]['status'] = False
            


                
                isPerfect = True

                for i in range(len(input_file_note_status)):
                    if input_file_note_status[i][1]['status'] != target_file_note_status[i][1]['status']:
                        isPerfect = False
                        break

                if isPerfect:
                    total_score += 1
                    break
    return int(total_score / input_data_len * 100)


########################################################################################################################################
# About Accent Functions
def add_new_velocity_to_dataframe(input_file, target_file):
    """
    add Dynamic Symbols 
    - Reflect Only "note_on" notes
    """
    input_file_note_status = [[{'note': x}, {'status': False}, {'velocity' : 0}] for x in range(128)]
    target_file_note_status = [[{'note': x}, {'status': False}, {'velocity' : 0}] for x in range(128)]

    input_velocity_list = []
    target_velocity_list = []
    
    
    for index in range(len(input_file)):

            target_temp_data = []

            notes_input = eval(input_file.at[index, 'note']) 
            msg_type_input = eval(input_file.at[index, 'msg_type']) 
            velocity = eval(input_file.at[index, 'velocity'])

            temp = list(zip(notes_input, msg_type_input, velocity)) 

            for data in temp:
                if data[1] == 'note_on':
                    input_file_note_status[data[0]][1]['status'] = True
                    input_file_note_status[data[0]][2]['velocity'] = data[2]
                elif data[1] == 'note_off':
                    input_file_note_status[data[0]][1]['status'] = False
                    input_file_note_status[data[0]][2]['velocity'] = data[2]

            velocity_list = []

            for on_data in input_file_note_status:
                if on_data[1]['status'] == True:
                    velocity_list.append(on_data[2]['velocity'])

            input_velocity_list.append(velocity_list)
            
    
    
    for index in range(len(target_file)):


            notes_input = eval(target_file.at[index, 'note'])
            msg_type_input = eval(target_file.at[index, 'msg_type'])
            velocity = eval(target_file.at[index, 'velocity'])

            temp = list(zip(notes_input, msg_type_input, velocity)) 

            
            for data in temp:
                if data[1] == 'note_on':
                    target_file_note_status[data[0]][1]['status'] = True
                    target_file_note_status[data[0]][2]['velocity'] = data[2]
                elif data[1] == 'note_off':
                    target_file_note_status[data[0]][1]['status'] = False
                    target_file_note_status[data[0]][2]['velocity'] = data[2]

            velocity_list = []

            for on_data in target_file_note_status:
                if on_data[1]['status'] == True:
                    velocity_list.append(on_data[2]['velocity'])

            target_velocity_list.append(velocity_list)


    target_file.insert(loc=5, column='new_velocity', value=target_velocity_list)
    input_file.insert(loc=5, column='new_velocity', value=input_velocity_list)
    
    
def calculate_dynamic_accent(dataframe):
    """
    Return Velocity to Accent.
    """
    
    accent_list = [0]
    
    for i in range(1, len(dataframe)):
        
        if len(dataframe.at[i-1, 'new_velocity']) != 0 and len(dataframe.at[i, 'new_velocity']) !=0:
            prev_velocity = sum(dataframe.at[i-1, 'new_velocity']) / len(dataframe.at[i-1, 'new_velocity'])
            mean_velocity = sum(dataframe.at[i, 'new_velocity']) / len(dataframe.at[i, 'new_velocity'])
        else:
            prev_velocity = 0
            mean_velocity = 0
        
        
        
        accent = 1 if 76 <= mean_velocity <= 127 and mean_velocity > 1.2 * prev_avg_velo else 0
        accent_list.append(accent)
    return accent_list



def calculate_accent_accuracy(input_data, target_data):
    

    try:
        add_new_velocity_to_dataframe(input_data, target_data)
        input_accent_list = calculate_dynamic_accent(input_data)
        target_accent_list = calculate_dynamic_accent(target_data)
        input_file.insert(loc=6, column='new_accent', value=input_accent_list)
        target_file.insert(loc=6, column='new_accent', value=target_accent_list)
    except:
        pass
    
    accent_score = 0

    input_time_range = []
    target_time_range = []
    
    
    start = 0
    end = 0
    isAccent = False

    for i in range(len(input_data)):
        # accent on
        if input_data['accent'][i] == 1 and isAccent == False:
            start = input_data['sec'][i]
            isAccent = True

        if input_data['accent'][i] == 0 and isAccent == True:
            end = input_data['sec'][i]
            isAccent = False
            input_time_range.append([start])
            start = 0
            end = 0
    
    start = 0
    end = 0
    isAccent = False

    for i in range(len(target_data)):
        # accent on
        if target_data['accent'][i] == 1 and isAccent == False:
            start = target_data['sec'][i]
            isAccent = True

        if target_data['accent'][i] == 0 and isAccent == True:
            end = target_data['sec'][i]
            isAccent = False
            target_time_range.append([start])
            start = 0
            end = 0

    for i in input_time_range:
        if i in target_time_range:
            accent_score += 1
    if len(target_time_range) == 0 and len(input_time_range) == 0:
        return 'The Score have no Accents Info'
    elif len(target_time_range) == 0:
        return f'{0}%'
    elif len(input_time_range) == 0:
        return f'{0}%'
    else:
        accent_score = int(accent_score  / len(target_time_range) * 100)
        return f'{accent_score:.2f}%'

########################################################################################################################################
# About Octaves Functions
def note_to_octave(note_number):
    """
    Return note number to octave range

    Parameters:
    - note_number: MIDI Note Number (0 ~ 127)

    Returns:
    - octave: Octave Number
    """
    octave = (note_number // 12) - 1
    return octave


def add_new_octave_to_dataframe(input_file, target_file):
    """
    Add new_octave Column to Csv files and Return Dataframe
    
    """
    input_file_note_status = [[{'note': x}, {'status': False}, {'velocity' : 0}] for x in range(128)]
    target_file_note_status = [[{'note': x}, {'status': False}, {'velocity' : 0}] for x in range(128)]

    input_octave_list = []
    target_octave_list = []
    
    for index in range(len(input_file)):

            target_temp_data = []

            notes_input = eval(input_file.at[index, 'note']) 
            msg_type_input = eval(input_file.at[index, 'msg_type']) 

            temp = list(zip(notes_input, msg_type_input)) 

            # Piano Status
            for data in temp:
                if data[1] == 'note_on':
                    input_file_note_status[data[0]][1]['status'] = True
                elif data[1] == 'note_off':
                    input_file_note_status[data[0]][1]['status'] = False

            note_list = []

            for on_data in input_file_note_status:
                if on_data[1]['status'] == True:
                    note_list.append(on_data[0]['note'])
            
            octave_list = []
            
            for note in note_list:
                octave_list.append(note_to_octave(note))
                
            octave_list = set(octave_list)
            
            input_octave_list.append(list(octave_list))
    
    
    for index in range(len(target_file)):

            target_temp_data = []

            notes_target = eval(target_file.at[index, 'note'])
            msg_type_target = eval(target_file.at[index, 'msg_type'])

            temp = list(zip(notes_target, msg_type_target)) 

            # Piano Status
            for data in temp:
                if data[1] == 'note_on':
                    target_file_note_status[data[0]][1]['status'] = True
                elif data[1] == 'note_off':
                    target_file_note_status[data[0]][1]['status'] = False

            note_list = []

            for on_data in target_file_note_status:
                if on_data[1]['status'] == True:
                    note_list.append(on_data[0]['note'])
            
            octave_list = []
            
            for note in note_list:
                octave_list.append(note_to_octave(note))
                
            octave_list = set(octave_list)
            
            target_octave_list.append(list(octave_list))

    try:
        input_file.insert(loc=5, column='new_octave', value=input_octave_list)
        target_file.insert(loc=5, column='new_octave', value=target_octave_list)
    except:
        pass
    
def calculate_octave_similarity(input_file, target_file, threshold=1):
    """
    octave Assessment Algorithm
    """
    
    add_new_octave_to_dataframe(input_file, target_file)
    
    input_data_len = len(input_file)
    
    total_correct_octave_cnt = 0
    for index in range(len(input_file)):
        
        if index - threshold < 0:
            for i in range(0, threshold, 1):
                if target_file.at[index, 'new_octave'] == input_file.at[index+i, 'new_octave']:
                    total_correct_octave_cnt+=1
                    break
        else:
            for i in range(-threshold, threshold, 1):
                if target_file.at[index, 'new_octave'] == input_file.at[index+i, 'new_octave']:
                    total_correct_octave_cnt+=1
                    break

    
    return total_correct_octave_cnt / input_data_len * 100

# Score Test

In [36]:
octave_score = calculate_octave_similarity(input_file, target_file, threshold=1)
print(f'octave Accuracy: {int(octave_score)}%')
accent_accuracy = calculate_accent_accuracy(input_file, target_file)
print(f'Accent Accuracy: {accent_accuracy}')
note_accuracy = evaluate_note_accuracy(input_file, target_file, threshold=1)
print(f'Note Accuracy: {note_accuracy}%' )
dynamic_change_consistency_score = calculate_dynamic_change_consistency(input_file, target_file)
print(f'Dynamic Change Consistency Score: {dynamic_change_consistency_score}')
dynamic_score = calculate_similarity_dynamic(input_file, target_file)
print(f'Dynamic Score: {dynamic_score}')

octave Accuracy: 75%
Accent Accuracy: The Score have no Accents Info
Note Accuracy: 91%
Dynamic Change Consistency Score: 32.26%
Dynamic Score: 7.34%
