In [None]:
import numpy as np
import pandas as pd
import torch
import math
import torch.nn as nn
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import entropy
from scipy.stats import f_oneway
from statsmodels.stats.multicomp import pairwise_tukeyhsd

## Smoothness experiment

In this notebook, we want to explore how smooth is the interaction with the descriptors to pattern models we have been working on. We will define a set of movements within the descriptor space and we will analyze how smooth the transitions are. We will also provide a Pure Data patch that will allow us to listen to the movements.

---

What we currently have:
- We have defined the following 5 descriptors:
    - `onset_count`
    - `start`
    - `center`
    - `syncopation`
    - `balance`
- We have trained a model that can generate 16 step patterns given values for the 5 descriptors.
- We have a Pure Data patch to interactively explore the descriptor space and listen to the generated patterns.

---

The experiment will be as follows:
- We will generate 3000 movements, each movement will consist of 32 interpolations between a point A and a point B.
- Point A will be randomly generated inside the 5 dimensional descriptor space.
- Point B will be at a certain distance from point A, also randomly generated and within the 5 dimensional descriptor space.
- For each movement, we will only consider changing two descriptors at a time (if each descriptor is a knob, it is natural to only move 2 knobs at a time)
- We define 3 different type of movements, each corresponding to a different distance between the two points.
    - `small` -> `sqrt(2) * 0.25`
    - `medium` -> `sqrt(2) * 0.5`
    - `large` -> `sqrt(2) * 0.75`
- For each set of descriptors within a movement (32 interpolations), we will run the model and compute the corresponding 16 step pattern.
- At this point, we will analyze 2 things in parallel:
    - KL divergence between the distribution of the two patterns in each interpolation step.
    - Generate descriptors given the two patterns in each interpolation step and compute the euclidean distance between the 2 sets of descriptors.
- For each two analyses, we will make a plot where the X axis will be the interpolation step and the Y axis will be the KL divergence or the euclidean distance respectively.
- We will also make a boxplot for each interpolation step, showing the distribution of the KL divergence or the euclidean distance.

### 1. Define experiment parameters

In [None]:
d = ['onset_count', 'start', 'center', 'syncopation', 'balance']
n_iterations = 3000  # number of movements
n_interpolation = 32  # number of steps
n_descriptors = 5
max_distance = np.sqrt(2)
movement_types = {
    "small": max_distance * 0.25,
    "medium": max_distance * 0.5,
    "large": max_distance * 0.75,
}
models = ["d2p_p_err", "d2p_d_err"]
model_paths = [f"../d2p_model/models/{m}.pth" for m in models]
n_per_category = n_iterations // 3  # ensure balanced distribution of movement types

### 2. Generate experiment data

In [None]:
def generate_random_point():
    """Generates a random point in the 5D space with values between 0 and 1."""
    A = np.random.rand(n_descriptors)
    if A[1] > A[2]:
        A[1], A[2] = A[2], A[1]  # swap start (A[1]) and center (A[2]) if start is greater than center
    return A

def generate_b_point(A, descriptors_moved, distance):
    """Generates point B at a specific distance from A in the space defined by descriptors_moved.
    If the generated point goes out of bounds, a new direction is generated until a valid point is found.
    If a valid point is not found after several attempts, None is returned."""
    max_attempts = 100
    for _ in range(max_attempts):
        direction = np.random.randn(2)
        direction /= np.linalg.norm(direction)  # normalize direction
        delta = direction * distance
        B = A.copy()
        B[descriptors_moved] += delta

        if np.all((B >= 0) & (B <= 1)):
            return B  # make sure all elements of B are within bounds [0,1]
    
    return None  # if a valid point was not found after max_attempts, return None

def compute_euclidean_distance(p1, p2):
    """Compute Euclidean distance between two points."""
    return np.linalg.norm(p1 - p2)

def run_experiment():
    """Runs the experiment and returns a DataFrame with the results."""
    results = []
    
    movement_categories = ["small"] * n_per_category + ["medium"] * n_per_category + ["large"] * n_per_category
    np.random.shuffle(movement_categories)  # shuffle the order to randomize sampling
    
    for movement_type in movement_categories:
        B = None
        while B is None:
            distance = movement_types[movement_type]
            descriptors_moved = np.random.choice(n_descriptors, 2, replace=False)
            
            A = generate_random_point()
            B = generate_b_point(A, descriptors_moved, distance)
            if B is not None and B[1] > B[2]:
                B = None  # if the center always >= start constraint is not met, generate a new B

        # interpolate between A and B
        steps = np.linspace(0, 1, n_interpolation)[:, np.newaxis]
        trajectory = (1 - steps) * A + steps * B
        
        results.append({
            "distance_category": movement_type,
            "descriptors_moved": descriptors_moved.tolist(),
            # "A": A.tolist(),
            # "B": B.tolist(),
            "trajectory": trajectory.tolist(),
        })
    
    return pd.DataFrame(results)

# run and save results
df_results = run_experiment()
df_results.to_csv('movements.csv', index=False)

# make sure everything is correct
df_results = pd.read_csv('movements.csv')
df_results.head()

### 3. Compute all patterns

In [None]:
class Multiclass(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.hidden = nn.Linear(input_dim, 16)
        self.act = nn.ReLU()
        self.hidden1 = nn.Linear(16, 32)
        self.act1 = nn.ReLU()
        self.hidden2 = nn.Linear(32, 64)
        self.act2 = nn.ReLU()
        self.hidden3 = nn.Linear(64, 32)
        self.act3 = nn.ReLU()
        self.output = nn.Linear(32, 16)

    def forward(self, x):
        x = self.act(self.hidden(x))
        x = self.act1(self.hidden1(x))
        x = self.act2(self.hidden2(x))
        x = self.act3(self.hidden3(x))
        x = self.output(x)
        return x

# Define model-architecture mapping
model_architectures = {
    "d2p_p_err": Multiclass,
    "d2p_d_err": Multiclass,
    # Add different architectures if needed
}

def load_model(model_name, input_dim):
    model_class = model_architectures[model_name]
    model = model_class(input_dim)
    model.to("cpu")
    model.load_state_dict(torch.load(f"../d2p_model/models/{model_name}.pth", map_location="cpu"))
    model.eval()
    return model

models = {name: load_model(name, n_descriptors) for name in model_architectures}

def compute_pattern(descriptors, model_name):
    model = models[model_name]
    descriptors_tensor = torch.tensor(np.array([descriptors]), dtype=torch.float32).to("cpu")
    with torch.no_grad():
        prediction = model(descriptors_tensor)
        prediction = torch.sigmoid(prediction)
    prediction = [int(value * 127) for value in prediction.tolist()[0]]
    return prediction

df_results[[f"patterns_{model_name}" for model_name in models]] = df_results.apply(
    lambda row: pd.Series({f"patterns_{model_name}": [compute_pattern(np.array(step, dtype=np.float32), model_name) for step in eval(row['trajectory'])] for model_name in models}),
    axis=1
)

df_results.head()

### 4. Convert all patterns to descriptors

In [None]:
def v_density(v_list):
  if v_list == []*len(v_list): #check if empty
    return 0
  v_list = np.array(v_list) #convert to numpy

  # normalize if necessary
  if np.greater(np.max(v_list), 1): #if we have numbers greater than 1
    v_list = v_list/127

  return np.mean(v_list)

def v_onset_count(v_list):
  if v_list == []*len(v_list): #check if empty
    return 0
  return (np.array(v_list) > 0).sum()

def v_syncopation(v_list):
  v_list = np.array(v_list) #convert to numpy
  if np.count_nonzero(v_list ==0) == len(v_list) or len(v_list) == 0: #check if empty
    return 0, [0]*len(v_list)

  # normalize if necessary
  if np.greater(np.max(v_list), 1): #if we have numbers greater than 1
    v_list = v_list/127

  # iterate over pattern and find if next velocity is smaller.
  # that note should give a value as it is either a pulse reinforcement
  # or a syncopation. The degree is given by the diference in velocity.
  mw = [5,1,2,1,3,1,2,1,4,1,2,1,3,1,2,1] # metrical weights
  sync_list = np.zeros([len(v_list)])
  for i,x in enumerate (v_list):
    vel_diff = x-v_list[(i+1)%len(v_list)]
    if vel_diff > 0 and mw[i]<mw[(i+1)%len(v_list)]: #signifficant note & syncopation
      mw_diff = mw[(i+1)%len(v_list)]-mw[i]
      sync_list [i] = vel_diff * mw_diff
  return sum (sync_list)/15, sync_list/4 # max sync at any step is 4

def v_metricality (v_list):
  # How metrical is the current vlist
  # only observe notes on higher metrical levels than next step silence
  v_list = np.array(v_list) #convert to numpy
  if np.count_nonzero(v_list ==0) == len(v_list) or len(v_list) == 0: #check if empty
    return 0, [0]*len(v_list)
  # normalize if necessary
  if np.greater(np.max(v_list), 1): #if we have numbers greater than 1
    v_list = v_list/127
  mw = [5,1,2,1,3,1,2,1,4,1,2,1,3,1,2,1] # metrical weights
  metricality_list = np.zeros([len(v_list)])
  for i,x in enumerate(v_list):
    vel_diff = x-v_list[(i+1)%len(v_list)]
    if vel_diff > 0 and mw[i]>mw[(i+1)%len(v_list)]: #signifficant note & pulse reinforcement
      mw_diff = mw[i] - mw[(i+1)%len(v_list)]
      metricality_list[i] = vel_diff * mw_diff

  return sum (metricality_list)/15, metricality_list/4 # max metr at any step is 4

def v_syness(v_list):
  # compare the syncopation and the number of onsets
  # syness is higher when syncopation is high and density is small
  # NOTE: no need to divide by v_density as syncopation has
  # already taken velocity into account
  if v_list == [] or v_list == [0]*len(v_list): #check if empty
    return 0
  return (v_syncopation(v_list)[0] / v_onset_count(v_list)) / 0.26666666666666666 # normalized for max syness

def v_meterness(v_list):
  # compare the metricality and the number of onsets
  # meterness is higher when metricality is high and density is small
  # NOTE: no need to normalize by v_denisty as metricality has been normalized
  if v_list == [] or v_list == [0]*len(v_list): #check if empty
    return 0
  return v_metricality(v_list)[0]/v_onset_count(v_list)/ 0.26666666666666666 #normalize for max meterness

def v_evenness(v_list):
    # how well distributed are the D onsets of a pattern
    # if they are compared to a perfect D sided polygon
    # input patterns are phase-corrected to start always at step 0
    # i.e. if we have 4 onsets in a 16 step pattern, what is the distance of onsets
    # o1, o2, o3, o4 to positions 0 4 8 and 12
    # here we will use a simple algorithm that does not involve DFT computation
    # evenness is well described in [Milne and Dean, 2016] but this implementation is much simpler

    d = v_onset_count(v_list)
    if d <=1:
      return 0

    iso_angle_16 = 2 * math.pi / 16 # angle of 1/16th
    first_onset_step = [i for i, x in enumerate(v_list) if x != 0][0]
    first_onset_angle = first_onset_step * iso_angle_16
    iso_angle = 2 * math.pi / d
    # ideal angles
    iso_pattern_radians = [x * iso_angle for x in range(d)]
    # actual distances
    pattern_radians = [i * iso_angle_16 for i, x in enumerate(v_list) if x != 0]
    cosines = [
        abs(math.cos(x - pattern_radians[i] + first_onset_angle))
        for i, x in enumerate(iso_pattern_radians)
    ]
    return sum(cosines) / d

def v_balance(v_list):
    # balance is described in [Milne and Herff, 2020] as:
    # "a quantification of the proximity of that rhythm's
    # “centre of mass” (the mean position of the points)
    # to the centre of the unit circle."
    d = v_onset_count(v_list)
    if d <= 1:
        return 0

    center = np.array([0, 0])
    iso_angle_16 = 2 * math.pi / 16
    X = [math.cos(i * iso_angle_16) for i, x in enumerate(v_list) if x != 0]
    Y = [math.sin(i * iso_angle_16) for i, x in enumerate(v_list) if x != 0]
    matrix = np.array([X, Y])
    matrix_sum = matrix.sum(axis=1)
    magnitude = np.linalg.norm(matrix_sum - center) / d
    return 1 - magnitude

def v_start(v_list):
  # output the starting step of the pattern
  if v_onset_count(v_list) == 0:
    return 0
  s = 0
  while v_list[s] == 0:
    s += 1
  return s / (len(v_list))

prediction_d = ["density", "onset_count", "syncopation", "metricality", "syness", "meterness", "evenness", "balance", "start"]

def describe_pattern(pattern):
    return [
        v_density(pattern),
        v_onset_count(pattern),
        v_syncopation(pattern)[0],
        v_metricality(pattern)[0],
        v_syness(pattern),
        v_meterness(pattern),
        v_evenness(pattern),
        v_balance(pattern),
        v_start(pattern),
    ]

for model_name in models:
    pattern_col = f"patterns_{model_name}"
    descriptor_col = f"descriptors_{model_name}"
    df_results[descriptor_col] = df_results[pattern_col].apply(lambda patterns: [describe_pattern(pattern) for pattern in patterns])

df_results.head()


### 5. Execute KL divergence and euclidean distance analysis

In [None]:
# Function to compute KL divergence (avoiding zero issues)
def kl_divergence(p, q):
    p = np.array(p) + 1e-10  # Small offset to avoid division by zero
    q = np.array(q) + 1e-10
    return entropy(p, q)

# Function to compute Euclidean distance
def euclidean_distance(d1, d2):
    return np.linalg.norm(np.array(d1) - np.array(d2))

In [None]:
sns.set_theme(style="whitegrid")

movement_types = ["small", "medium", "large"]
kl_means = {model: {} for model in models}
euc_means = {model: {} for model in models}

kl_palette = ["royalblue", "darkorange", "seagreen"]
euc_palette = ["crimson", "purple", "saddlebrown"]

for model in models:
    pattern_col = f"patterns_{model}"
    descriptor_col = f"descriptors_{model}"

    for movement_type in movement_types:
        df = df_results[df_results["distance_category"] == movement_type]
        kl_all, euc_all = [], []
        
        for _, row in df.iterrows():
            pattern_steps = row[pattern_col]
            descriptor_steps = row[descriptor_col]
            
            if len(pattern_steps) == n_interpolation and len(descriptor_steps) == n_interpolation:
                kl_divs = [kl_divergence(pattern_steps[i], pattern_steps[i+1]) for i in range(n_interpolation - 1)]
                euc_dists = [euclidean_distance(descriptor_steps[i], descriptor_steps[i+1]) for i in range(n_interpolation - 1)]
                
                kl_all.append(np.cumsum(kl_divs))  # Accumulate KL divergence
                euc_all.append(np.cumsum(euc_dists))  # Accumulate Euc. distance
            
        kl_means[model][movement_type] = np.mean(kl_all, axis=0)
        euc_means[model][movement_type] = np.mean(euc_all, axis=0)

# Create dataframes for plotting
kl_dfs = {model: pd.DataFrame(kl_means[model]) for model in models}
euc_dfs = {model: pd.DataFrame(euc_means[model]) for model in models}

for model in models:
    kl_dfs[model]["Interpolation Step"] = np.arange(n_interpolation - 1)
    euc_dfs[model]["Interpolation Step"] = np.arange(n_interpolation - 1)

# Determine global y-axis limits
max_kl = max(df.drop(columns="Interpolation Step").max().max() for df in kl_dfs.values())
max_euc = max(df.drop(columns="Interpolation Step").max().max() for df in euc_dfs.values())

# Create subplots with two columns and four rows
fig, axes = plt.subplots(4, 2, figsize=(16/1.5, 18/1.5))  # 4 rows, 2 columns

model_name = {
    "d2p_p_err": "Pattern-based eval.",
    "d2p_d_err": "Descriptor-based eval.",
}

for i, model in enumerate(models):
    # KL Div. Line Plot (First Column)
    kl_melted = kl_dfs[model].melt(id_vars="Interpolation Step", var_name="Movement Type", value_name="KL Div.")
    sns.lineplot(data=kl_melted, x="Interpolation Step", y="KL Div.", hue="Movement Type", palette=kl_palette, marker="o", ax=axes[0, i])
    axes[0, i].set_title(f"Acc. KL Div. ({model_name[model]})", fontsize=14)
    axes[0, i].set_xlabel("Interpolation Step")
    axes[0, i].set_ylabel("Acc. KL Div.")
    axes[0, i].grid(True, linestyle="--", linewidth=0.5)
    axes[0, i].legend(fontsize=8, loc='upper left')
    axes[0, i].set_ylim(0, max_kl)  # Set common y-axis limit

    # KL Div. Box Plot (Second Column)
    sns.boxplot(data=kl_dfs[model].drop(columns="Interpolation Step"), palette=kl_palette, ax=axes[1, i])
    axes[1, i].set_xlabel("Movement Type")
    axes[1, i].set_ylabel("Acc. KL Div.")
    axes[1, i].set_title(f"KL Divergence Dist. ({model_name[model]})", fontsize=14)
    axes[1, i].grid(True, linestyle="--", linewidth=0.5)
    axes[1, i].set_ylim(0, max_kl)  # Set common y-axis limit

    # Euc. Dist. Line Plot (Third Column)
    euc_melted = euc_dfs[model].melt(id_vars="Interpolation Step", var_name="Movement Type", value_name="Euc. Dist.")
    sns.lineplot(data=euc_melted, x="Interpolation Step", y="Euc. Dist.", hue="Movement Type", palette=euc_palette, marker="o", ax=axes[2, i])
    axes[2, i].set_title(f"Acc. Euc. Dist. ({model_name[model]})", fontsize=14)
    axes[2, i].set_xlabel("Interpolation Step")
    axes[2, i].set_ylabel("Acc. Euc. Dist.")
    axes[2, i].grid(True, linestyle="--", linewidth=0.5)
    axes[2, i].legend(fontsize=8, loc='upper left')
    axes[2, i].set_ylim(0, max_euc)  # Set common y-axis limit

    # Euc. Dist. Box Plot (Fourth Column)
    sns.boxplot(data=euc_dfs[model].drop(columns="Interpolation Step"), palette=euc_palette, ax=axes[3, i])
    axes[3, i].set_xlabel("Movement Type")
    axes[3, i].set_ylabel("Acc. Euc. Dist.")
    axes[3, i].set_title(f"Euc. Dist. ({model_name[model]})", fontsize=14)
    axes[3, i].grid(True, linestyle="--", linewidth=0.5)
    axes[3, i].set_ylim(0, max_euc)  # Set common y-axis limit

# fig.suptitle("Comparison of Acc. KL Div. and Euc. Dist. Across Models and Movement Types", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.98])
plt.show()

### 6. ANOVA analysis

In [None]:
from scipy.stats import f_oneway
from statsmodels.stats.multicomp import pairwise_tukeyhsd

# Loop through the models
for model in models:
    # Get the relevant columns for each model's KL Divergence and Euclidean Distance
    kl_df_model = kl_dfs[model]
    euc_df_model = euc_dfs[model]

    # Perform ANOVA for KL Divergence for each model
    kl_anova_p = f_oneway(kl_df_model["small"], kl_df_model["medium"], kl_df_model["large"]).pvalue
    print(f"ANOVA p-value for KL Divergence ({model}): {kl_anova_p}")

    # Perform ANOVA for Euclidean Distance for each model
    euc_anova_p = f_oneway(euc_df_model["small"], euc_df_model["medium"], euc_df_model["large"]).pvalue
    print(f"ANOVA p-value for Euclidean Distance ({model}): {euc_anova_p}")

    # If ANOVA for KL Divergence is significant, perform Tukey's HSD test
    if kl_anova_p < 0.05:
        kl_melted = kl_df_model.melt(id_vars="Interpolation Step", var_name="Movement Type", value_name="KL Divergence")
        kl_tukey = pairwise_tukeyhsd(kl_melted["KL Divergence"], kl_melted["Movement Type"], alpha=0.05)
        print(f"Tukey's HSD test for KL Divergence ({model}):")
        print(kl_tukey)

    # If ANOVA for Euclidean Distance is significant, perform Tukey's HSD test
    if euc_anova_p < 0.05:
        euc_melted = euc_df_model.melt(id_vars="Interpolation Step", var_name="Movement Type", value_name="Euclidean Distance")
        euc_tukey = pairwise_tukeyhsd(euc_melted["Euclidean Distance"], euc_melted["Movement Type"], alpha=0.05)
        print(f"Tukey's HSD test for Euclidean Distance ({model}):")
        print(euc_tukey)
    
    print()

### 6. Find outliers

In [None]:
# # Find the maximum KL divergence and Euclidean distance for each movement type
# max_kl_divergence = df_results['patterns'].apply(lambda patterns: max([kl_divergence(patterns[i], patterns[i+1]) for i in range(n_interpolation - 1)]))
# max_euclidean_distance = df_results['descriptors'].apply(lambda descriptors: max([euclidean_distance(descriptors[i], descriptors[i+1]) for i in range(n_interpolation - 1)]))

# # Add these maximum values to the dataframe
# df_results['max_kl_divergence'] = max_kl_divergence
# df_results['max_euclidean_distance'] = max_euclidean_distance

# # Find the rows with the maximum KL divergence and Euclidean distance
# max_kl_row = df_results.loc[df_results['max_kl_divergence'].idxmax()]
# max_euclidean_row = df_results.loc[df_results['max_euclidean_distance'].idxmax()]

# print("Row with maximum KL divergence:")
# print(max_kl_row)

# print("\nRow with maximum Euclidean distance:")
# print(max_euclidean_row)