## Setup in Local Machine

In [None]:
#Install
%pip install worker
%pip install pandas
%pip install numpy
%pip install networkx
%pip install matplotlib
%pip install music21
%pip install musescore
%pip install tslearn
%pip install sklearn
%pip install multiprocessing
%pip install pyvis

In [None]:
# Import
import worker
import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt

from music21 import *
from music21 import converter, corpus, environment, note, chord
from tslearn.metrics import dtw
from multiprocessing import Pool, Manager, cpu_count
from sklearn.neighbors import kneighbors_graph
from sklearn.metrics import pairwise_distances
from networkx.algorithms.cuts import conductance
from pyvis.network import Network
from IPython.core.display import display, HTML

In [None]:
# Path
env = environment.Environment()
env['musicxmlPath'] = 'C:\\Program Files\\MuseScore 4\\bin\\MuseScore4.exe' # Change if needed
env['musescoreDirectPNGPath'] = 'C:\\Program Files\\MuseScore 4\\bin\\MuseScore4.exe' # Change if needed

## Process Sample Music

#### Functions

In [None]:
# midi to score array function (contains all data needed for score visualization)
def midi_to_sarr(midi_parsed):
    sarr = []
    for part in midi_parsed.parts:
        for element in part.flatten():
            sarr.append(element)
    return sarr

In [None]:
# sarr to narr and nmat function (removes all elements except notes, rests and chords then turn it into a matrix )
def sarr_to_nmat_and_narr(score_array):
  trashed_elements = 0
  narr = []
  nmat = pd.DataFrame(columns=['onset_beats', 'duration_beats', 'midi_pitch'])

  for element in score_array:
    if isinstance(element, chord.Chord):
      row = [element.offset, element.duration.quarterLength, element.root().midi]
      nmat.loc[len(nmat)] = row
      narr.append(element)
    elif isinstance(element, note.Rest):
      row = [element.offset, element.duration.quarterLength, 0]
      nmat.loc[len(nmat)] = row
      narr.append(element)    
    else:
      try:
        row = [element.offset, element.duration.quarterLength, element.pitch.midi]
        nmat.loc[len(nmat)] = row
        narr.append(element)
      except:
        trashed_elements += 1
        # print(f"Trashed element #{trashed_elements}:\n{note}") # for debugging
  return nmat, narr

#### Usage

In [None]:
# parse the midi and keep score title
midi_file = 'bach_846.mid' # Hardcoded, for multiple songs, make a function to iterate thru folder
midi_parsed = converter.parse(midi_file)
score_title = midi_file[:-4] # Temporary, apparently the title is not a score element that you can extract so im using the filename

In [None]:
# convert parsed midi into a readable array of elements that forms the score
sarr = midi_to_sarr(midi_parsed) # this midi was cleaned

# output sarr to a readable .txt file
with open("sarr_output.txt", "w") as f:
    f.write(f"Number of elements: {len(sarr)}\n\n")
    for element in sarr:
        f.write(f"{element}\n")

In [None]:
# convert score array into a note array and note matrix
nmat, narr = sarr_to_nmat_and_narr(sarr)

# output nmat to a readable .txt file
with open("nmat_output.txt", "w") as f:
    f.write(f"Number of notes: {len(nmat)}\n\n")
    for note in nmat:
        f.write(f"{note}\n")
    f.write("\n")
    np.savetxt('nmat_output.txt', nmat, delimiter=',', fmt='%s')

# output narr to a readable .txt file
with open("narr_output.txt", "w") as f:
    f.write(f"Number of notes: {len(narr)}\n\n")
    for note in narr:
        f.write(f"{note}\n")

## Implication-Realization Ruleset, Assignment and Score Visualization

#### Functions

In [None]:
# IR symbol calculation function
def calculate_ir_symbol(interval1, interval2, threshold=5):
    direction = interval1 * interval2
    abs_difference = abs(interval2-interval1)
    # Process
    if direction > 0 and (abs(interval2-interval1))<threshold:
        return 'P'  
    # IR2: D (Duplication)
    elif interval1 == interval2 == 0:
        return 'D' 
    # IR3: IP (Intervallic Process)
    elif ((interval1 * interval2)<0) and (-threshold <= (abs(interval2) - abs(interval1)) <= threshold) and (abs(interval2) != abs(interval1)):
        return 'IP' 
    # IR4: ID (Intervallic Duplication)
    elif ((interval1 * interval2) < 0) and (abs(interval2) == abs(interval1)):
        return 'ID'   
    # IR5: VP (Vector Process)
    elif (interval1 * interval2 > 0) and (abs(interval2-interval1) >= threshold) and (abs(interval1) <= threshold):
        return 'VP'
    # IR6: R (Reveral)
    elif (interval1 * interval2 < 0) and (abs(abs(interval2)-abs(interval1)) >= threshold) and (abs(interval1) >= threshold):
        return 'R'
    # IR7: IR (Intervallic Reveral)
    elif (interval1 * interval2 > 0) and (abs(abs(interval2)-abs(interval1)) >= threshold) and (abs(interval1) >= threshold):
        return 'IR' 
    # IR8: VR (Vector Reveral)
    elif (interval1 * interval2 < 0) and (abs(interval2 - interval1) >= threshold) and (abs(interval1) <= threshold):
        return 'VR'
    elif interval2 == 0 and not (interval1 < -5 or interval1 > 5):
        return 'IP'
    elif interval2 == 0 and (interval1 < -5 or interval1 > 5):
        return 'R'
    elif interval1 == 0 and not (interval2 < -5 or interval2 > 5):
        return 'P'
    elif interval1 == 0 and (interval2 < -5 or interval2 > 5):
        return 'VR'

In [None]:
# assign IR symbol function (original; modified)
def assign_ir_symbols(score_array):

    symbols = []
    current_group = [] 
    group_pitches = []
    
    color_map = {
        'P': 'blue',        # IR1: P (Process) 
        'D': 'green',       # IR2: D (Duplication)
        'IP': 'red',        # IR3: IP (Intervallic Process)
        'ID': 'orange',     # IR4: ID (Intervallic Duplication)
        'VP': 'purple',     # IR5: VP (Vector Process)
        'R': 'cyan',        # IR6: R (Reveral)
        'IR': 'magenta',    # IR7: IR (Intervallic Reveral)
        'VR': 'yellow',     # IR8: VR (Vector Reveral)
        'M': 'pink',        # IR9: M (Monad)
        'd': 'lime',        # IR10 d (Dyad)
    }

    def evaluate_current_group():
        if len(current_group) == 3:
            interval1 = group_pitches[1] - group_pitches[0]
            interval2 = group_pitches[2] - group_pitches[1]
            symbol = calculate_ir_symbol(interval1, interval2)
            # symbols.append(symbol)
            color = color_map.get(symbol, 'black')  # Default to black if symbol is not predefined
            symbols.extend((note, symbol, color) for note in current_group)
        elif len(current_group) == 2:
            # symbols.append('d')  # Dyad
            symbols.extend((note, 'd', color_map['d']) for note in current_group)
        elif len(current_group) == 1:
            # symbols.append('M')  # Monad
            symbols.extend((note, 'M', color_map['M']) for note in current_group)
        # else:
            # symbols.append('Error: Invalid note object')
        current_group.clear()
        group_pitches.clear()

    for element in score_array:
        if isinstance(element, note.Note):
            current_group.append(element)
            group_pitches.append(element.pitch.ps)
            if len(current_group) == 3:
                evaluate_current_group()
        elif isinstance(element, chord.Chord):
            current_group.append(element)
            group_pitches.append(element.root().ps)
            if len(current_group) == 3:
                evaluate_current_group()
        elif isinstance(element, note.Rest):
            # continue
            # Remove continue for visualization
            rest_tuple = (element, 'rest', 'black') 
            evaluate_current_group()
            symbols.append(rest_tuple)
        else:
            if current_group:
                evaluate_current_group()

    # Handle any remaining notes
    if current_group:
        evaluate_current_group()

    return symbols

In [None]:
# Score visualization function
def visualize_notes_with_symbols(notes_with_symbols):
    s = stream.Score()
    part = stream.Part()
    for note, symbol, color in notes_with_symbols:
        print(note, symbol, color)
        note.style.color = color
        note.lyric = symbol
        part.append(note)
    s.append(part)
    s.show()

#### Usage

In [None]:
# TO BE REVISED TOMORROW START HERE

# testscore = sarr # make the new score
# music21.stream.base.Score
# testscore.show("midi")

# Usage of the above functions
test_ir = assign_ir_symbols(narr)

# output narr to a readable .txt file
with open("test_ir_output.txt", "w") as f:
    f.write(f"Number of notes: {len(test_ir)}\n\n")
    for note in test_ir:
        f.write(f"{note}\n")

# try: # error when polyphonic
visualize_notes_with_symbols(test_ir)
# except:
#     score = converter.parse('bach_846.mid')
#     score.show

## Gestalt Based Segmentation (Functions)

In [None]:
# onset function
def get_onset(notematrix: pd.DataFrame, timetype='beat'):
  if timetype == 'beat':
    return notematrix['onset_beats']
  elif timetype == 'sec':
    return notematrix['onset_sec']
  else:
    ValueError(f"Invalid timetype: {timetype}. Choices are only 'beat' and 'sec'")

In [None]:
# duration function
def get_duration(notematrix: pd.DataFrame, timetype='beat') -> pd.Series:
  if timetype == 'beat':
    return notematrix['duration_beats']
  elif timetype == 'sec':
    return notematrix['duration_sec']
  else:
    ValueError(f"Invalid timetype: {timetype}. Choices are only 'beat' and 'sec'")

In [None]:
# clang boundary calculation funtion
def calculate_clang_boundaries(notematrix: pd.DataFrame):
  cl = 2*(get_onset(notematrix).diff() + get_duration(notematrix).shift(-1)) + abs(notematrix['midi_pitch'].diff())
  clb = (cl.shift(-1) > cl) & (cl.shift(1) > cl)
  clind = clb.index[clb].tolist()
  return clind

In [None]:
# segment boundary calculation function
def calculate_segment_boundaries(notematrix, clind):
    first = [0] + clind
    last = clind + [len(notematrix) - 1]
    mean_pitch = [notematrix.iloc[first[i]:last[i]+1]['midi_pitch'].mean() for i in range(len(first))]
    segdist = []
    for i in range(1, len(first)):
        segdist.append(abs(mean_pitch[i] - mean_pitch[i - 1]) +
                       notematrix.iloc[first[i]]['onset_beats'] - notematrix.iloc[last[i - 1]]['onset_beats'] +
                       notematrix.iloc[first[i]]['duration_beats'] + notematrix.iloc[first[i - 1]]['duration_beats'] +
                       2 * (notematrix.iloc[first[i]]['onset_beats'] - notematrix.iloc[last[i - 1]]['onset_beats']))

    segb = [(segdist[i] > segdist[i-1] and segdist[i] > segdist[i+1]) for i in range(1, len(segdist)-1)]
    segind = [clind[i] for i in range(1, len(segdist)-1) if segb[i-1]]
    return segind

In [None]:
# gestalt segmentation function
def segmentgestalt(notematrix):
    if notematrix.empty:
        return None

    # IR Assignment function here? <------------------------------------------------------ yayayaya look here
    clind = calculate_clang_boundaries(notematrix)
    segind = calculate_segment_boundaries(notematrix, clind)

    segments = []
    start_idx = 0
    for end_idx in segind:
        segments.append(notematrix.iloc[start_idx:end_idx+1])
        start_idx = end_idx + 1

    segments.append(notematrix.iloc[start_idx:])
    return segments

## Get Clang Boundaries and Segments

In [None]:
# show clang boundaries
clang_boundaries = calculate_clang_boundaries(nmat)
print("Clang Boundaries:", clang_boundaries)

In [None]:
# Get segments
segments = segmentgestalt(nmat)

# Store viz related properties (score title, color), labelling the segment
labeled_segments = []
assigned_color = 'red' # HARDCODED temporary since multiple scores not impremented yet
# color_list = ['blue', 'green', 'red', 'orange', 'purple', 'cyan', 'magenta', 'yellow', 'pink', 'lime']
label = (score_title, assigned_color)
for segment in segments:
    labeled_segments.append((label, segment))


# output to a readable .txt file
with open("labeled_segments_output.txt", "w") as f:
    f.write(f"Number of segments: {len(labeled_segments)}\n\n")
    for idx, tuple in enumerate(labeled_segments):
        label, segment = tuple
        title, color = label
        f.write(f"{title} Segment {idx+1} ({color}):\n")
        f.write(f"{segment}\n")
        f.write("--------------------------------------------\n\n")

## DTW Distance Using TSLearn

In [None]:
def segments_to_distance_matrix(segments: list[pd.DataFrame], cores=None):
    if __name__ == '__main__':
        
        if cores is not None and cores > cpu_count():
            raise ValueError(f"You don't have enough cores! Please specify a value within your system's number of cores. \n Core Count: {cpu_count()}")
        
        seg_np = [segment.to_numpy() for segment in segments]
    
        num_segments = len(seg_np)
        distance_matrix = np.zeros((num_segments, num_segments))
    
        # Create argument list for multiprocessing
        args_list = []
        for i in range(num_segments):
            for j in range(i + 1, num_segments):
                args_list.append((i, j, segments[i], segments[j]))
    
        with Manager() as manager:
            message_list = manager.list()
    
            def log_message(message):
                message_list.append(message)
    
            # Use multiprocessing Pool to parallelize the calculations
            with Pool() as pool:
                results = pool.map(worker.calculate_distance, args_list)
    
            # Update distance matrix with the results
            for i, j, distance, message in results:
                distance_matrix[i, j] = distance
                distance_matrix[j, i] = distance  # Reflect along the diagonal
                log_message(message)
    
            # Print messages from the shared list
            for message in message_list:
                print(message)
    
        return distance_matrix

In [None]:
# segments to distance matrix
dist_mat = segments_to_distance_matrix(segments)
print(f"there are {len(dist_mat)} elements in dist mat")

## Building the KNN Graph


In [None]:
# building KNN Graph
k = 3
distance_matrix = dist_mat
knn_graph = kneighbors_graph(distance_matrix, n_neighbors=k, mode='connectivity')

G = nx.from_scipy_sparse_array(knn_graph)

# Detect if the graph is disjoint
if not nx.is_connected(G):
    print("The KNN graph is disjoint. Ensuring connectivity...")

    # Calculate the connected components
    components = list(nx.connected_components(G))

    # Connect the components
    for i in range(len(components) - 1):
        min_dist = np.inf
        closest_pair = None
        for node1 in components[i]:
            for node2 in components[i + 1]:
                dist = distance_matrix[node1, node2]
                if dist < min_dist:
                    min_dist = dist
                    closest_pair = (node1, node2)

        # Add an edge between the closest pair of nodes from different components
        G.add_edge(closest_pair[0], closest_pair[1])

# Plot the final connected graph
pos = nx.spring_layout(G, seed=42, iterations=50)
pos_dict = {i: pos[i] for i in range(len(pos))}
nx.draw(G, node_size=50, pos=pos_dict)
plt.title('Bach Prelude in C')
plt.show()

In [None]:
# distance matrix to knn graph function
def distance_matrix_to_knn_graph(k: int, distance_matrix: np.array, graph_title: str,
                                 seed: int, iterations: int):
  knn_graph = kneighbors_graph(distance_matrix, n_neighbors=k, mode='connectivity')

  G = nx.from_scipy_sparse_array(knn_graph)

  # Detect if the graph is disjoint
  if not nx.is_connected(G):
      print("The KNN graph is disjoint. Ensuring connectivity...")

      # Calculate the connected components
      components = list(nx.connected_components(G))

      # Connect the components
      for i in range(len(components) - 1):
          min_dist = np.inf
          closest_pair = None
          for node1 in components[i]:
              for node2 in components[i + 1]:
                  dist = distance_matrix[node1, node2]
                  if dist < min_dist:
                      min_dist = dist
                      closest_pair = (node1, node2)

          # Add an edge between the closest pair of nodes from different components
          G.add_edge(closest_pair[0], closest_pair[1])

  # Plot the final connected graph
  pos = nx.spring_layout(G, seed=seed, iterations=iterations)
  nx.draw(G, node_size=50, pos=pos)
  plt.title(graph_title + f" (K={k})")
  plt.show()

In [None]:
# show graph
distance_matrix_to_knn_graph(3, dist_mat, "Bach Prelude in C", 42, 50)

### Currently Trying to Put The Segment Data Into The Node so we can analyze grouped segments

In [None]:
# segments to graph function
def segments_to_graph(k: int, segments: list[pd.DataFrame], labeled_segments, cores=None):
  # Convert segments to a distance matrix
  distance_matrix = segments_to_distance_matrix(segments, cores=cores)

  # Compute the k-NN graph
  knn_graph = kneighbors_graph(distance_matrix, n_neighbors=k, mode='connectivity')

  # Convert the k-NN graph to a NetworkX graph
  G = nx.from_scipy_sparse_array(knn_graph)

  # Add segment data as attributes to each node
  for i in range(len(segments)):
    G.nodes[i]['segment'] = labeled_segments[i] # print shit

  # Detect if the graph is disjoint
  if not nx.is_connected(G):
      print("The KNN graph is disjoint. Ensuring connectivity...")

      # Calculate the connected components
      components = list(nx.connected_components(G))

      # Connect the components
      for i in range(len(components) - 1):
          min_dist = np.inf
          closest_pair = None
          for node1 in components[i]:
              for node2 in components[i + 1]:
                  dist = distance_matrix[node1, node2]
                  if dist < min_dist:
                      min_dist = dist
                      closest_pair = (node1, node2)

          # Add an edge between the closest pair of nodes from different components
          G.add_edge(closest_pair[0], closest_pair[1])

  return G, distance_matrix

In [None]:
# segments to graph
graph, distance_matrix = segments_to_graph(5, segments, labeled_segments)

## Trying to find ways to validate "Graph Identity"

besides average dtw distance Im trying to see if I can make something of a graph "silhouette score".

Basically get the communities in the graph then calculate the following:

* homogeneity: intra-cluster distance

* heterogeneity: inter-cluster distance

* "Graph Silhoette Score:" $\frac{Heterogeneity - Homogeneity} {max(Hetero, Homo)}$

Also might take a look at clustering coefficients

In [None]:
# graph metrics function
def graph_metrics(graph: nx.classes.graph.Graph, distance_matrix: np.array,
                  seed: int):
  avg_dtw_distance = distance_matrix.mean()
  avg_clustering_coef = nx.average_clustering(graph)

  communities = nx.community.louvain_communities(graph, seed)

  silhouette_scores = []
  for cluster in communities:
    for i in cluster:
      cluster_distances = distance_matrix[i, list(cluster - {i})]
      homogeneity = np.mean(cluster_distances)
      other_cluster_distances = [np.mean(distance_matrix[i, list(other_cluster)]) for other_cluster in communities if other_cluster != cluster]
      heterogeneity = min(other_cluster_distances) if other_cluster_distances else homogeneity
      silhouette_score = (heterogeneity - homogeneity) / max(heterogeneity, homogeneity)
      silhouette_scores.append(silhouette_score)

  # Average silhouette score for all nodes
  average_silhouette_score = np.mean(silhouette_scores)

  conductance_scores = []
  for cluster in communities:
    cluster_conductance = conductance(graph, cluster)
    conductance_scores.append(cluster_conductance)

  average_conductance = np.mean(conductance_scores)

  print("Average DTW Distance:", avg_dtw_distance)
  print("Average Clustering Coefficient:", avg_clustering_coef)
  print("Average Silhouette Score", average_silhouette_score)
  print("Average Conductance:", average_conductance)

In [None]:
# show graph metrics and segment data
graph_metrics(graph, distance_matrix, 42)
print(f"Graph Length: {len(graph)}")

for node in graph.nodes(data=True):
  node_id = node[0]
  segment_data = node[1]['segment']
  print(f"Node {node_id} segment data:")
  print(segment_data)

In [None]:
# plot graph function
def plot_graph(graph: nx.classes.graph.Graph,
               seed: int,
               iterations: int,
               title: str,
               node_size: int):
  pos = nx.spring_layout(graph, seed=seed, iterations=iterations)
  for node in graph.nodes(data=True):
    segment_data = node[1]['segment']
    label, matrix = segment_data
    title, color = label
    print(color)
    # G.nodes[node]['color'] = color
  nx.draw(G, node_size=node_size, node_color='red' ,pos=pos)
  plt.title(title)
  plt.show()

In [None]:
# show graph
plot_graph(graph, 3, 50, "Bach Prelud in C (K=5)", 50)

In [None]:
for i in range(len(graph.nodes())):
  graph.nodes[i]['segment'] = graph.nodes[i]['segment'].to_json(default_handler=str)

nt = Network('1000px', '1000px', notebook=True, cdn_resources = 'remote')
nt.from_nx(graph)
nt.show('sample_graph.html')   ### Still need to figure out the labels
display(HTML('sample_graph.html'))