In [None]:
import numpy as np
import matplotlib.pyplot as plt
import librosa
import librosa.display
import crepe
from scipy.interpolate import interp1d
from scipy.signal import argrelextrema
import pandas as pd
import plotly.graph_objects as go
import IPython.display as ipd
from sklearn.cluster import KMeans
from sklearn.manifold import SpectralEmbedding
from scipy.signal import find_peaks as scipy_find_peaks

In [None]:
start_time = 0
end_time = 30

audio_path = r"C:\Users\nandh\Downloads\Varnams\split sounds\223582__gopalkoduri__carnatic-varnam-by-vignesh-in-abhogi-raaga\vocals.wav"
csv_path = "Evari Bodhana.csv"
y, sr = librosa.load(audio_path, sr=44100, offset=start_time, duration=end_time - start_time,mono=True)


# Compute spectrogram
D = librosa.stft(y)
S_db = librosa.amplitude_to_db(np.abs(D), ref=np.min)

# CREPE pitch estimation
time, frequency, confidence, activation = crepe.predict(y, sr, viterbi=True, step_size=20, model_capacity="tiny")

# # Interpolation
spec_time = librosa.times_like(D, sr=sr)
interp_freq = interp1d(time, frequency, kind='linear', fill_value='extrapolate')
interp_conf = interp1d(time, confidence, kind='linear', fill_value='extrapolate')

new_frequency = interp_freq(spec_time)
new_confidence = interp_conf(spec_time)

df = pd.DataFrame({"Time": spec_time, "Frequency": new_frequency, "Confidence": new_confidence})
with open(csv_path, 'w') as f:
    f.truncate(0)  # Clear file
df.to_csv(csv_path, index=False)
print(f"CSV file cleared and saved at {csv_path}")

In [None]:
new_confidence = pd.read_csv(csv_path)["Confidence"].values
new_frequency = pd.read_csv(csv_path)["Frequency"].values
spec_time = pd.read_csv(csv_path)["Time"].values


In [None]:
def plot_spectrogram_with_crepe(spec_time, conf, S_db, sr):
    plt.figure(figsize=(14, 8))
    librosa.display.specshow(S_db, sr=sr, x_axis='time', y_axis='linear', cmap='viridis')
    plt.plot(spec_time, conf, color='r', linewidth=1.5, label='CREPE Pitch')  # Use spec_time
    plt.xlabel('Time (s)')
    plt.ylabel('Frequency (Hz)')
    plt.ylim(0, 2000)
    plt.legend(loc='upper right')
    plt.tight_layout()
    plt.show()
    
def find_tonic(S, sr):
    chroma = librosa.feature.chroma_stft(S=np.abs(S), sr=sr)
    pitch_class_sums = np.sum(np.abs(chroma), axis=1)
    pitch_labels = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B"]
    pitch_class_dict = dict(zip(pitch_labels, pitch_class_sums))
    return max(pitch_class_dict, key=pitch_class_dict.get)

def get_carnatic_frequencies(tonic):
    # Intonational ratios for the basic set of Carnatic notes
    carnatic_ratios = {
        'sa': 0.5*1.0,    # Tonic (Sa)
        'ri1': 0.5*16/15, # Ri1
        'ri2': 0.5*9/8,  # Ri2
        'ga1': 0.5*6/5,  # Ga1
        'ga2': 0.5*5/4, # Ga2
        'ma1': 0.5*4/3, # Ma1
        'ma2': 0.5*45/32,   # Ma2
        'pa': 0.5*3/2,    # Pa
        'da1': 0.5*8/5, # Dha1
        'da2': 0.5*5/3, # Dha2
        'ni1': 0.5*16/9, # Ni1
        'ni2': 0.5*15/8,   # Ni2

        'Sa': 1.0,    # Tonic (Sa)
        'Ri1': 16/15, # Ri1
        'Ri2': 9/8,  # Ri2
        'Ga1': 6/5,  # Ga1
        'Ga2': 5/4, # Ga2
        'Ma1': 4/3, # Ma1
        'Ma2': 45/32,   # Ma2
        'Pa': 3/2,    # Pa
        'Da1': 8/5, # Dha1
        'Da2': 5/3, # Dha2
        'Ni1': 16/9, # Ni1
        'Ni2': 15/8,   # Ni2

        'SA': 2.0,   # Octave higher (Sa)
        'RI1': 2*16/15, # Ri1
        'RI2': 2*9/8,  # Ri2
        'GA1': 2*6/5,  # Ga1
        'GA2': 2*5/4, # Ga2
        'MA1': 2*4/3, # Ma1
        'MA2': 2*45/32,   # Ma2
        'PA': 2*3/2,    # Pa
        'DA1': 2*8/5, # Dha1
        'DA2': 2*5/3, # Dha2
        'NI1': 2*16/9, # Ni1
        'NI2': 2*15/8,   
    }

    tonic_freq = librosa.note_to_hz(tonic)  # Get the frequency of the tonic

    # Calculate the frequencies for each Carnatic note relative to the tonic
    carnatic_frequencies = {note: tonic_freq * ratio for note, ratio in carnatic_ratios.items()}
    return carnatic_frequencies

def get_closest_note(freq, carnatic_frequencies):
    """Find the closest Carnatic note for a given frequency."""
    return min(carnatic_frequencies, key=lambda note: abs(carnatic_frequencies[note] - freq))

def get_closest_frequency(freq, carnatic_frequencies):
    """Find the closest Carnatic note frequency for a given frequency."""
    return min(carnatic_frequencies.values(), key=lambda f: abs(f - freq))

def get_index_from_time(time_input,conf):
    # Define the start and end times
    total_duration = end_time - start_time
    num_pieces = len(conf)
    
    # Calculate the duration of each piece
    duration_per_piece = total_duration / num_pieces
    
    # Check if the input time is within the valid range
    if time_input < start_time or time_input > end_time:
        raise ValueError(f"Input time must be between {start_time} and {end_time} seconds.")
    
    # Calculate the index
    index = int((time_input - start_time) / duration_per_piece)
    
    return index

def plot_frequency_with_carnatic_notes(frequency_list, beat_frames, tonic,beat_sr):
    beat_frames= librosa.frames_to_time(beat_frames, sr=beat_sr)
    loc_extremes = np.where(np.diff(np.sign(np.diff(frequency_list, prepend=np.nan, append=np.nan))) != 0)[0]
    extremes = frequency_list[loc_extremes].tolist()
    angles = np.degrees(np.arctan(np.diff(frequency_list, prepend=np.nan, append=np.nan) / 2))
    # notelist = [(conf[i], i, angles[i], angles[i + 1], i in loc_extremes) for i in range(len(conf) - 1)]
    carnatic_frequencies = get_carnatic_frequencies(tonic)
    frequency_array = np.array(frequency_list)
    
    beat_points=[]
    for i in beat_frames:
        if i < start_time or i > end_time:
            continue
        beat_points.append(get_index_from_time(i,frequency_list))




    # Identify valid (non-NaN) frames
    valid_indices = ~np.isnan(frequency_array)  
    valid_frequencies = frequency_array[valid_indices]
    if len(valid_frequencies) == 0:
        raise ValueError("No valid frequencies to process.")

    carnatic_frequencies = get_carnatic_frequencies(tonic)

    # Plot the graph
    fig = go.Figure()

    # Plot the frequency graph with gaps for NaNs
    for start, end in zip(
        np.where(np.diff(np.concatenate(([0], valid_indices, [0]))) == 1)[0],
        np.where(np.diff(np.concatenate(([0], valid_indices, [0]))) == -1)[0]
    ):
        fig.add_trace(go.Scatter(
            x=np.arange(start, end),
            y=frequency_array[start:end],
            mode='lines',
            name='Frequency (Hz)',
            line=dict(color='blue')
        ))

    # Plot horizontal lines for Carnatic notes
    for note, freq in carnatic_frequencies.items():
        fig.add_trace(go.Scatter(
            x=[0, len(frequency_list) - 1],
            y=[freq, freq],
            mode='lines',
            line=dict(dash='dash', color='gray', width=2),
            name=note,
            hovertemplate=f"{note} ({freq:.2f} Hz)"
        ))

    # Plot the extremes as red dots
    fig.add_trace(go.Scatter(
        x=loc_extremes,
        y=extremes,
        mode='markers',
        marker=dict(color='red', size=2, symbol='circle'),
        name='Extremes'
    ))

    # Plot vertical lines for beat points
    for beat in beat_points:
        fig.add_trace(go.Scatter(
            x=[beat, beat],  # Vertical line at 'beat'
            y=[np.nanmin(frequency_array), np.nanmax(frequency_array)],  # Full y-range
            mode='lines',
            line=dict(color='orange', width=2),
            name=f'Beat @ {beat}'
        ))

    fig.update_layout(
        title=f'Frequency with Carnatic Notes (Tonic: {tonic})',
        xaxis_title='Time',
        yaxis_title='Frequency (Hz)',
        showlegend=True
    )

    fig.show()

def breaklist(elements, indexes):
    segmented_lists = []
    start_index = 0  

    for idx in indexes:
        segment = elements[start_index:idx]
        segmented_lists.append(segment)
        start_index = idx  
    if start_index < len(elements):
        segmented_lists.append(elements[start_index:])

    return segmented_lists

def plot_with_carnatic_bars(note_num, noteslist, carnatic_frequencies):
    bars = list(carnatic_frequencies.values())
    
    # Find relevant frequency range
    min_freq = get_closest_frequency(np.nanmin(noteslist[note_num]), carnatic_frequencies)
    max_freq = get_closest_frequency(np.nanmax(noteslist[note_num]), carnatic_frequencies)
    
    # Filter bars within the frequency range
    newbars = [i for i in bars if min_freq <= i <= max_freq]
    
    # Plot
    plt.plot(noteslist[note_num])
    for i in newbars:
        plt.axhline(y=i, color='r', linestyle='--')
    plt.show()
    for i in newbars:
        print(get_closest_note(i, carnatic_frequencies))

def spectral_decomp(note, n_clusters, plot=True):
    note = np.array(note)
    X = np.column_stack((np.arange(len(note)), note))
    embedding = SpectralEmbedding(n_components=2, affinity='nearest_neighbors')
    X_transformed = embedding.fit_transform(X)

    kmeans = KMeans(n_clusters=n_clusters, n_init=10, random_state=42)
    labels = kmeans.fit_predict(X_transformed)

    # Sort clusters based on first occurrence
    unique_clusters = np.unique(labels, return_index=True)
    sorted_clusters = [cluster for _, cluster in sorted(zip(unique_clusters[1], unique_clusters[0]))]
    label_mapping = {old: new for new, old in enumerate(sorted_clusters)}
    sorted_labels = np.array([label_mapping[label] for label in labels])

    # Assign frequencies to clusters
    segments = [[] for _ in range(n_clusters)]
    for idx, freq in enumerate(note):
        segments[sorted_labels[idx]].append((idx, freq))

    if plot:
        fig = go.Figure()

        colors = ['red', 'blue', 'green', 'orange', 'purple', 'cyan', 'magenta']
        for i in range(n_clusters):
            indices, freqs = zip(*segments[i]) if segments[i] else ([], [])
            fig.add_trace(go.Scatter(
                x=indices,
                y=freqs,
                mode='markers+lines',
                marker=dict(size=6, color=colors[i % len(colors)]),
            ))

        # Plot horizontal lines at each unique frequency

        unique_freqs = [i for i in get_carnatic_frequencies("C#3").values() if min(note) <= i <= max(note)]
        unique_notes= [i for i in get_carnatic_frequencies("C#3").keys() if min(note) <= get_carnatic_frequencies("C#3")[i] <= max(note)]
        x_values = np.linspace(min(X[:, 0]), max(X[:, 0]), num=100)  # Densely spaced x values

        for i in range (len( unique_freqs)):
            y_values = np.full_like(x_values,unique_freqs[i])
            fig.add_trace(go.Scatter(
                x=x_values,
                y=y_values,
                mode="lines",
                line=dict(color="gray", dash="dash"),
                showlegend=False,
                hovertemplate=f"{unique_notes[i]}({unique_freqs[i]:.2f} Hz)"
            ))

        
        fig.show()

    return [list(zip(*seg))[1] if seg else [] for seg in segments] 

def playnote(n, beat_audio, beat_sr, beat_times, start_time):
    adjusted_beat_times = beat_times - start_time
    adjusted_beat_times = adjusted_beat_times[adjusted_beat_times >= 0]  # Remove negative times
    if n < 0 or n >= len(adjusted_beat_times) - 1:
        print("Invalid note index")
        return
    note_start_time = adjusted_beat_times[n]
    note_end_time = adjusted_beat_times[n+1]
    start_sample = int(note_start_time * beat_sr)
    end_sample = int(note_end_time * beat_sr)

    note_audio = beat_audio[start_sample:end_sample]
    ipd.display(ipd.Audio(note_audio, rate=beat_sr))

def find_peaks_and_valleys(conf):
    peaks = []
    valleys = []
    
    for i in range(1, len(conf) - 1):
        if not np.isnan(conf[i-1]) and not np.isnan(conf[i]) and not np.isnan(conf[i+1]):
            if conf[i] > conf[i-1] and conf[i] > conf[i+1]:
                peaks.append(i)
            elif conf[i] < conf[i-1] and conf[i] < conf[i+1]:
                valleys.append(i)
    
    return peaks, valleys

def play_segment_between_beats(beat_audio, beat_sr, beat_frames, beat_index,offset=0):
    # Ensure the beat_index is valid
    if beat_index < 0 or beat_index >= len(beat_frames) - 1:
        print("Invalid beat index. Please provide a valid index.")
        return

    # Get the start and end frames for the segment
    start_frame = beat_frames[beat_index-offset]
    end_frame = beat_frames[beat_index + 1+offset]

    # Convert frames to time
    start_time = librosa.frames_to_time(start_frame, sr=beat_sr)
    end_time = librosa.frames_to_time(end_frame, sr=beat_sr)

    # Convert time to sample indices
    start_sample = int(start_time * beat_sr)
    end_sample = int(end_time * beat_sr)

    # Slice the audio segment
    audio_segment = beat_audio[start_sample:end_sample]

    # Play the audio segment
    ipd.display(ipd.Audio(audio_segment, rate=beat_sr))

def trim(data):
    data = np.array(data)  
    valid_indices = np.where(~np.isnan(data))[0]
    valid_data = data[valid_indices]
    peaks = argrelextrema(valid_data, np.greater, order=2)[0]

    troughs = argrelextrema(valid_data, np.less, order=2)[0]

    # Combine peaks & troughs and sort them
    extrema = np.sort(np.concatenate((peaks, troughs)))

    if len(extrema) < 2:
        return data  # Not enough peaks/troughs to trim

    # Find start and end positions in original indices
    start, end = valid_indices[extrema[0]], valid_indices[extrema[-1]]

    return data[start:end+1]

def shift_beats_to_peaks_or_valleys(beat_frames, conf):
    """
    Shift the beat frames to align with the nearest peak or valley in the confidence array.
    
    Parameters:
    - beat_frames: The original beat frames.
    - conf: The confidence array.
    
    Returns:
    - shifted_beat_frames: The updated beat frames.
    """
    peaks, valleys = find_peaks_and_valleys(conf)
    shifted_beat_frames = []

    for beat in beat_frames:
        # Find the nearest peak or valley
        nearest_index = None
        min_distance = float('inf')

        for index in peaks + valleys:
            distance = abs(index - beat)
            if distance < min_distance:
                min_distance = distance
                nearest_index = index

        shifted_beat_frames.append(nearest_index)

    return np.array(shifted_beat_frames)

def extend_sublists(main_list, num=4):
    extended_list = []
    for i in range(len(main_list)):
        current_sublist = main_list[i]
        if i == 0 or i == len(main_list) - 1:
            extended_list.append(current_sublist)
        else:
            new_sublist = []
            new_sublist.extend(main_list[i - 1][-num:])
            new_sublist.extend(current_sublist)
            new_sublist.extend(main_list[i + 1][:num])
            extended_list.append(new_sublist)
    return extended_list

def plot_with_carnatic_bars_with_peaks(note_num, noteslist, carnatic_frequencies):
    bars = list(carnatic_frequencies.values())

    # Find relevant frequency range
    min_freq = get_closest_frequency(np.nanmin(noteslist[note_num]), carnatic_frequencies)
    max_freq = get_closest_frequency(np.nanmax(noteslist[note_num]), carnatic_frequencies)
    newbars = [i for i in bars if min_freq <= i <= max_freq]

    data = noteslist[note_num]
    plt.scatter(np.arange(len(data)), data,s=1, color='green')

    peaks, _ = scipy_find_peaks(data)
    valleys, _ = scipy_find_peaks(-np.array(data))  # Negate to find valleys

    plt.plot(peaks, data[peaks], "bo", markersize=4)
    plt.plot(valleys, data[valleys], "bo", markersize=4)

    for i in newbars:
        plt.axhline(y=i, color='r', linestyle='--')

    plt.show()

    for i in newbars:
        print(get_closest_note(i, carnatic_frequencies))

def interpolate_with_nans(data, target_length=128):

    data = np.array(data, dtype=np.float64)
    original_length = len(data)
    x_original = np.linspace(0, 1, original_length)
    x_target = np.linspace(0, 1, target_length)
    valid = ~np.isnan(data)
    if np.count_nonzero(valid) < 2:
        return np.full(target_length, np.nan)
    interpolator = interp1d(x_original[valid], data[valid], kind='linear', bounds_error=False, fill_value="extrapolate")
    interpolated = interpolator(x_target)
    nan_mask_original = np.isnan(data)
    nan_mask_interpolated = np.interp(x_target, x_original, nan_mask_original.astype(float)) > 0.5
    interpolated[nan_mask_interpolated] = np.nan
    return interpolated

def play_segment(beat_audio, beat_sr, start_frame,end_frame):
    # Ensure the beat_index is valid


    # Convert frames to time
    start_time = librosa.frames_to_time(start_frame, sr=beat_sr)
    end_time = librosa.frames_to_time(end_frame, sr=beat_sr)

    # Convert time to sample indices
    start_sample = int(start_time * beat_sr)
    end_sample = int(end_time * beat_sr)

    # Slice the audio segment
    audio_segment = beat_audio[start_sample:end_sample]

    # Play the audio segment
    ipd.display(ipd.Audio(audio_segment, rate=beat_sr))

In [None]:
tonic =  find_tonic(D, sr)
print(tonic)


In [None]:
rmse = librosa.feature.rms(y=y, frame_length=2048, hop_length=512)[0]
conf_thresh = 0  # Confidence threshold
rmse_thresh = np.percentile(rmse, 7)  # Reject bottom 20% of RMSE energy


conf = np.where(new_confidence > conf_thresh, new_frequency, np.nan)
conf[rmse < rmse_thresh] = np.nan  # Reject frames with low energy


print(len(conf))
plot_spectrogram_with_crepe(spec_time, conf, S_db, sr)

In [None]:
beat_audio_path = r"C:\Users\nandh\Downloads\carnatic_varnam_1.1\carnatic_varnam_1.1\Audio\223582__gopalkoduri__carnatic-varnam-by-vignesh-in-abhogi-raaga.mp3"
beat_audio, beat_sr = librosa.load(beat_audio_path, sr=None, mono=True)

tempo, beat_frames = librosa.beat.beat_track(y=beat_audio, sr=beat_sr, tightness=400)
beat_times = librosa.frames_to_time(beat_frames, sr=beat_sr)
audio_beat_clicks = librosa.clicks(times=beat_times, sr=beat_sr, click_freq=1500, length=len(beat_audio))

start_sample = int(start_time * beat_sr)
end_sample = int(end_time * beat_sr)

sliced_audio = beat_audio[start_sample:end_sample]
sliced_clicks = audio_beat_clicks[start_sample:end_sample]

ipd.display(ipd.Audio(sliced_audio + sliced_clicks, rate=beat_sr))
selected_beat_frames = beat_frames[(beat_times >= start_time) & (beat_times <= end_time)]
# plot_frequency_with_carnatic_notes(conf, selected_beat_frames, "C#3", beat_sr)


shifted_beat_frames = shift_beats_to_peaks_or_valleys(beat_frames, conf)
shifted_beat_times = librosa.frames_to_time(shifted_beat_frames, sr=beat_sr)

audio_beat_clicks_shifted = librosa.clicks(times=shifted_beat_times, sr=beat_sr, click_freq=1500, length=len(beat_audio))

# Play the sliced audio with the shifted clicks
start_sample = int(start_time * beat_sr)
end_sample = int(end_time * beat_sr)

sliced_audio = beat_audio[start_sample:end_sample]
sliced_clicks_shifted = audio_beat_clicks_shifted[start_sample:end_sample]

ipd.display(ipd.Audio(sliced_audio + sliced_clicks_shifted, rate=beat_sr))

# Plot frequency with the shifted beats
selected_beat_frames_shifted = shifted_beat_frames[(shifted_beat_times >= start_time) & (shifted_beat_times <= end_time)]


In [None]:

brokenlist = breaklist(conf, shifted_beat_frames)
brokenlist.pop(0)
brokenlist.pop(-1)
input_num = 83
num = input_num - 1
print("Playing segment",input_num)



play_segment_between_beats(beat_audio, beat_sr, shifted_beat_frames, num,offset=0)
plot_with_carnatic_bars_with_peaks(num, brokenlist, get_carnatic_frequencies("C#3"))

def flatten_list(nested_list):
    return [item for sublist in nested_list for item in sublist]

extrema_indices=sorted(flatten_list(find_peaks_and_valleys(brokenlist[num])))
extrema_values = [brokenlist[num][i] for i in extrema_indices]
potential_notes =[get_closest_note(i, get_carnatic_frequencies("C#3")) for i in extrema_values]
print(extrema_indices)
print(potential_notes)



In [None]:
interpolated_list = [interpolate_with_nans(i, target_length=128) for i in brokenlist]


In [None]:
print(len(conf))
conf2 = conf[0:3000] # Assuming 'conf' is already defined and cleaned (silences as NaN)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from scipy.spatial.distance import cdist
from fastdtw import fastdtw
from sklearn.cluster import AgglomerativeClustering
from collections import Counter, defaultdict



def sliding_segments(conf, window_size, hop_size):
    segments = []
    indices = []
    for i in range(0, len(conf) - window_size, hop_size):
        segment = conf[i:i+window_size]
        if np.isnan(segment).any():
            continue  # Skip segments with silence
        segments.append(segment)
        indices.append(i)
    return np.array(segments), np.array(indices)

def non_overlapping_segments(conf, window_size, hop_size):
    segments = []
    indices = []
    i = 0
    while i < len(conf) - window_size:
        segment = conf[i:i + window_size]
        if np.isnan(segment).any():
            i += hop_size
            continue
        segments.append(segment)
        indices.append(i)
        i += window_size  # skip all overlapping windows
    return np.array(segments), np.array(indices)


window_size = 60
hop_size = int(window_size/12)

segments, segment_starts = non_overlapping_segments(conf2, window_size, hop_size)
# Center each segment to have zero mean (keep std deviation intact)
# segments = np.array([seg - np.mean(seg) for seg in segments])


print(f"Extracted {len(segments)} valid segments.")

def dtw_distance_matrix(segments):
    n = len(segments)
    dists = np.zeros((n, n))
    for i in tqdm(range(n)):
        for j in range(i+1, n):
            dist, _ = fastdtw(segments[i], segments[j])
            dists[i, j] = dist
            dists[j, i] = dist
    return dists

dtw_dists = dtw_distance_matrix(segments)

# --- Step 3: Perform Clustering ---
clustering = AgglomerativeClustering(
    n_clusters=None,
    distance_threshold=70,
    metric='precomputed',
    linkage='average'
)
labels = clustering.fit_predict(dtw_dists)
print(f"Found {len(set(labels))} clusters.")

# --- Step 4: Group segments by cluster ---
cluster_dict = defaultdict(list)
cluster_origins = defaultdict(list)

for seg, start_idx, lbl in zip(segments, segment_starts, labels):
    cluster_dict[lbl].append(seg)
    cluster_origins[lbl].append(start_idx)


# --- Step 5: Plot number of segments per cluster ---
cluster_sizes = {label: len(segments) for label, segments in cluster_dict.items()}

import matplotlib.ticker as ticker

# Adjust height dynamically but cap it
fig_height = min(20, len(cluster_sizes) * 0.4)  # max 20 inches tall
plt.figure(figsize=(12, fig_height))

# Sort cluster labels
sorted_labels = sorted(cluster_sizes.keys())
sorted_counts = [cluster_sizes[label] for label in sorted_labels]

bars = plt.barh(sorted_labels, sorted_counts, color='lightgreen')

plt.ylabel('Cluster Label')
plt.xlabel('Number of Segments')
plt.title('Segment Counts per Cluster')
plt.grid(True, axis='x', linestyle='--', alpha=0.5)

# Add count labels on each bar
for bar, count in zip(bars, sorted_counts):
    plt.text(bar.get_width() + 1, bar.get_y() + bar.get_height()/2,
             f'{count}', va='center', fontsize=9)

# Ensure all labels are shown
plt.yticks(sorted_labels)  # Force each cluster label to appear
plt.gca().yaxis.set_major_locator(ticker.FixedLocator(sorted_labels))

plt.tight_layout()
plt.show()






In [None]:
sorted_clusters = sorted(cluster_dict.items(), key=lambda item: len(item[1]), reverse=True)
for i in range(len(sorted_clusters)):
    if len(sorted_clusters[i][1])== len(sorted_clusters[0][1]):
        print(f"the cluster {sorted_clusters[i][0]} has {len(sorted_clusters[i][1])} segments")

clustersize=[]
clusterlabels=[]

for i in range(len(sorted_clusters)):
    clustersize.append(len(sorted_clusters[i][1]))
    clusterlabels.append(sorted_clusters[i][0])
print(clusterlabels)

print(np.std(clustersize))
print(np.mean(clustersize))


In [None]:
label = sorted_clusters[0][0]
label_to_plot = label # Change this as needed
segment_list = cluster_dict[label_to_plot]

for segment in segment_list:
    print(cluster_origins[label_to_plot])
    plt.plot(segment, alpha=0.5)
plt.title(f"Cluster {label_to_plot} - {len(segment_list)} Segments")

plt.show()

plot_with_carnatic_bars_with_peaks(0, segment_list, get_carnatic_frequencies("C#3"))



start = cluster_origins[label_to_plot][0]
print(start)
play_segment(beat_audio, beat_sr, start, start+window_size)


In [None]:
chosen_cluster = sorted_clusters[0][0]
start_indices = cluster_origins[chosen_cluster]
origins=[]
for i in start_indices:
    origins.append((i,i+window_size))
start_indices = [int(i) for i in cluster_origins[chosen_cluster]]
print(start_indices)

for i in start_indices:
    conf2[i:i+30] = np.nan
plt.plot(conf2, alpha=0.5)

In [None]:
conf2 = conf

def extract_notes_from_conf(conf, initial_window_size, decay_size, min_window_size, outlier_threshold,similairity_threshold=100):
    conf = conf.copy()
    remaining_conf = conf.copy()
    all_removed_segments = []

    window_size = initial_window_size
    global_label_offset = 0

    total_iters = (initial_window_size - min_window_size) // decay_size + 1
    iter_count = 0

    while window_size >= min_window_size:
        iter_count += 1
        print(f"Iteration {iter_count}/{total_iters} — Window Size: {window_size}")

        hop_size = int(window_size / 12)
        segments, segment_starts = non_overlapping_segments(remaining_conf, window_size, hop_size)
        
        if len(segments) == 0:
            print("  Skipped — no valid segments")
            window_size -= decay_size
            continue

        dtw_dists = dtw_distance_matrix(segments)
        clustering = AgglomerativeClustering(
            n_clusters=None,
            distance_threshold=similairity_threshold,
            metric='precomputed',
            linkage='average'
        )
        labels = clustering.fit_predict(dtw_dists)

        cluster_dict = defaultdict(list)
        cluster_origins = defaultdict(list)

        for seg, start_idx, lbl in zip(segments, segment_starts, labels):
            cluster_dict[lbl].append(seg)
            cluster_origins[lbl].append(start_idx)

        clustered = False
        for label, starts in cluster_origins.items():
            if len(starts) >= outlier_threshold:
                clustered = True
                global_label = global_label_offset + label
                for i in starts:
                    remaining_conf[i:i + window_size] = np.nan
                    all_removed_segments.append((i, i + window_size, global_label))

        if clustered:
            print(f"  Clusters found: {len(set(labels))}, removed some segments.")
        else:
            print(f"  Clusters found: {len(set(labels))}, but none met the threshold.")

        global_label_offset += len(set(labels))
        window_size -= decay_size

    return remaining_conf, all_removed_segments


def plot_colored_segments(original_conf, removed_segments, residual_conf=None):
    import matplotlib.pyplot as plt
    import matplotlib.cm as cm
    import matplotlib.colors as mcolors

    # Assign a color to each cluster
    cluster_labels = sorted(set(lbl for _, _, lbl in removed_segments))
    cmap = cm.get_cmap('tab20', len(cluster_labels))
    cluster_to_color = {label: cmap(i) for i, label in enumerate(cluster_labels)}

    plt.figure(figsize=(14, 5))
    plt.plot(original_conf, label="Original", alpha=0.2, color='gray')

    # Plot segments grouped by cluster label
    for start, end, label in removed_segments:
        plt.plot(range(start, end), original_conf[start:end], color=cluster_to_color[label], label=f"Cluster {label}")

    # Plot residual if given
    if residual_conf is not None:
        plt.plot(residual_conf, label="Residual", linewidth=2, color='black')

    # Create a legend without duplicate labels
    handles, labels = plt.gca().get_legend_handles_labels()
    by_label = dict(zip(labels, handles))
    plt.legend(by_label.values(), by_label.keys(), bbox_to_anchor=(1.05, 1), loc='upper left')
    
    plt.title("Clusters of Repeating Notes")
    plt.xlabel("Time Index")
    plt.ylabel("Frequency")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

residual_conf, removed_segments = extract_notes_from_conf(
    conf=conf2,
    initial_window_size=60,
    decay_size=2,
    min_window_size=20,
    outlier_threshold=2,
    similairity_threshold=80
)

plot_colored_segments(conf2, removed_segments, residual_conf)



In [None]:
num1 =239
num2 =num1+1 
num3= num2+1
unique_cluster_labels = set(label for _, _, label in removed_segments)
print("Number of clusters:", len(unique_cluster_labels))

print("len",len(removed_segments))
print(removed_segments[num1])
print(removed_segments[num2])
print(removed_segments[num3])

play_segment(beat_audio, beat_sr, removed_segments[num1][0], removed_segments[num1][1])
play_segment(beat_audio, beat_sr, removed_segments[num2][0], removed_segments[num2][1])
play_segment(beat_audio, beat_sr, removed_segments[num3][0], removed_segments[num3][1])

In [None]:
import json
import numpy as np # Make sure numpy is imported if not already

def save_removed_segments_to_json(removed_segments, filename="removed_segments.json"):
    """
    Saves the list of removed segments directly to a JSON file.

    Args:
        removed_segments (list): The list of removed segments (e.g., a list of tuples).
        filename (str, optional): The name of the JSON file to save to.
                                   Defaults to "removed_segments.json".
    """
    # Convert any non-serializable elements (like numpy.int64) to standard Python types
    serializable_segments = []
    for segment in removed_segments:
        serializable_segment = [int(item) for item in segment]
        serializable_segments.append(serializable_segment)

    with open(filename, 'w') as f:
        json.dump(serializable_segments, f, indent=4)
    print(f"Removed segments list saved to '{filename}'")

save_removed_segments_to_json(removed_segments)

In [None]:
import json

def recover_removed_segments_list(filename="removed_segments.json"):
    """
    Recovers the list of removed segments from a JSON file saved in the
    simple list format.

    Args:
        filename (str, optional): The name of the JSON file to load from.
                                   Defaults to "removed_segments.json".

    Returns:
        list or None: The list of removed segments, or None if the file
                      is not found or doesn't contain a top-level list.
    """
    try:
        with open(filename, 'r') as f:
            data = json.load(f)
            if isinstance(data, list):
                return data
            else:
                print(f"Error: File '{filename}' does not contain a top-level list.")
                return None
    except FileNotFoundError:
        print(f"Error: File '{filename}' not found.")
        return None
    except json.JSONDecodeError:
        print(f"Error: Could not decode JSON from file '{filename}'.")
        return None

normalized_notes=[]
labels=[]
# Example of how to recover the list:
recovered_list = recover_removed_segments_list()
if recovered_list:
    print("Recovered removed segments list:")
    for segment in recovered_list:
        note=conf[segment[0]:segment[1]]/get_carnatic_frequencies("C#3")["Sa"]
        normalized_notes.append(note)
        labels.append(segment[2])
print(normalized_notes[2][4])


print(audio_path)
print(len(normalized_notes))
print(len(labels))
print(len(recovered_list))
print(get_carnatic_frequencies("C#3")["Sa"])

In [None]:
import csv
import json

csv_file = "Master_Dataset.csv"
headers = ['No','File','Tonic','Normalized_Notes','Start_End','Cluster','Notes','Gamaka']

rows_to_append = []
for i in range(len(recovered_list)):
    rows_to_append.append([
        i,  # Assuming you want to start from 1
        audio_path,
        "C#3",
        # dump the list as a JSON string
        json.dumps(list(normalized_notes[i])),
        json.dumps(list(recovered_list[i][0:2])),
        labels[i],
        "",
        ""
    ])

with open(csv_file, mode='a', newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
    for row in rows_to_append:
        if len(row) != len(headers):
            raise ValueError(f"Row has {len(row)} columns but expected {len(headers)}")
        writer.writerow(row)


In [None]:
import csv
import json

csv_file = "Master_Dataset.csv"

norm_notes = []
start_end = []

with open(csv_file, newline='', encoding='utf-8') as f:
    reader = csv.DictReader(f)
    for r in reader:
        norm_notes.append(json.loads(r['Normalized_Notes']))
        start_end.append(json.loads(r['Start_End']))

print(start_end)       # This will now show the full list of all start_end entries
print(norm_notes)      # Same for Normalized_Notes

for i in range(len(start_end)):
    print(i+2)
    play_segment(beat_audio, beat_sr, int(start_end[i][0]), int(start_end[i][1]))

In [None]:
interpnotes=[]
for i in norm_notes:
    interpnotes+=interpolate_with_nans(i, target_length=128).tolist()
print(len(interpnotes))

plt.plot(interpnotes, alpha=0.5)
plt.title("Interpolated Notes") 
plt.show()

In [None]:
conf2 = interpnotes

def extract_notes_from_conf(conf, initial_window_size, decay_size, min_window_size, outlier_threshold,similairity_threshold=100):
    conf = conf.copy()
    remaining_conf = conf.copy()
    all_removed_segments = []

    window_size = initial_window_size
    global_label_offset = 0

    total_iters = (initial_window_size - min_window_size) // decay_size + 1
    iter_count = 0

    while window_size >= min_window_size:
        iter_count += 1
        print(f"Iteration {iter_count}/{total_iters} — Window Size: {window_size}")

        hop_size = int(window_size / 12)
        segments, segment_starts = non_overlapping_segments(remaining_conf, window_size, hop_size)
        
        if len(segments) == 0:
            print("  Skipped — no valid segments")
            window_size -= decay_size
            continue

        dtw_dists = dtw_distance_matrix(segments)
        clustering = AgglomerativeClustering(
            n_clusters=None,
            distance_threshold=similairity_threshold,
            metric='precomputed',
            linkage='average'
        )
        labels = clustering.fit_predict(dtw_dists)

        cluster_dict = defaultdict(list)
        cluster_origins = defaultdict(list)

        for seg, start_idx, lbl in zip(segments, segment_starts, labels):
            cluster_dict[lbl].append(seg)
            cluster_origins[lbl].append(start_idx)

        clustered = False
        for label, starts in cluster_origins.items():
            if len(starts) >= outlier_threshold:
                clustered = True
                global_label = global_label_offset + label
                for i in starts:
                    remaining_conf[i:i + window_size] = np.nan
                    all_removed_segments.append((i, i + window_size, global_label))

        if clustered:
            print(f"  Clusters found: {len(set(labels))}, removed some segments.")
        else:
            print(f"  Clusters found: {len(set(labels))}, but none met the threshold.")

        global_label_offset += len(set(labels))
        window_size -= decay_size

    return remaining_conf, all_removed_segments


def plot_colored_segments(original_conf, removed_segments, residual_conf=None):
    import matplotlib.pyplot as plt
    import matplotlib.cm as cm
    import matplotlib.colors as mcolors

    # Assign a color to each cluster
    cluster_labels = sorted(set(lbl for _, _, lbl in removed_segments))
    cmap = cm.get_cmap('tab20', len(cluster_labels))
    cluster_to_color = {label: cmap(i) for i, label in enumerate(cluster_labels)}

    plt.figure(figsize=(14, 5))
    plt.plot(original_conf, label="Original", alpha=0.2, color='gray')

    # Plot segments grouped by cluster label
    for start, end, label in removed_segments:
        plt.plot(range(start, end), original_conf[start:end], color=cluster_to_color[label], label=f"Cluster {label}")

    # Plot residual if given
    if residual_conf is not None:
        plt.plot(residual_conf, label="Residual", linewidth=2, color='black')

    # Create a legend without duplicate labels
    handles, labels = plt.gca().get_legend_handles_labels()
    by_label = dict(zip(labels, handles))
    plt.legend(by_label.values(), by_label.keys(), bbox_to_anchor=(1.05, 1), loc='upper left')
    
    plt.title("Clusters of Repeating Notes")
    plt.xlabel("Time Index")
    plt.ylabel("Frequency")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

residual_conf, removed_segments = extract_notes_from_conf(
    conf=conf2,
    initial_window_size=60,
    decay_size=2,
    min_window_size=20,
    outlier_threshold=2,
    similairity_threshold=80
)

plot_colored_segments(conf2, removed_segments, residual_conf)