### Mel Spectrogram -> Audio

https://huggingface.co/kashif/soundstream_mel_decoder

In [2]:

SAMPLE_RATE = 16000
N_FFT = 1024
HOP_LENGTH = 320
WIN_LENGTH = 640
N_MEL_CHANNELS = 128
MEL_FMIN = 0.0
MEL_FMAX = int(SAMPLE_RATE // 2)
CLIP_VALUE_MIN = 1e-5
CLIP_VALUE_MAX = 1e8


In [3]:
import os
os.getcwd()

'/Users/ling/Desktop/GT/6201AudioContentAnalysis/PatternForPrediction'

In [None]:
import numpy as np
import librosa

# Load and normalize audio
y, sr = librosa.load('./datasets/PPDD-Jul2018_aud_mono_small/prime_wav/0a983538-61b5-4b9d-9ad9-23e05f548e5c.wav')
y, _ = librosa.effects.trim(y)
y = librosa.util.normalize(y)

S = librosa.feature.melspectrogram(
    y=y, 
    sr=SAMPLE_RATE, 
    n_fft=N_FFT,
    hop_length=HOP_LENGTH,
    win_length=WIN_LENGTH,
    n_mels=N_MEL_CHANNELS,
    fmin=MEL_FMIN,
    fmax=MEL_FMAX,
)

# Clip manually before log conversion
mel = np.clip(S, CLIP_VALUE_MIN, CLIP_VALUE_MAX)
# mel = np.log10(S)   # the model expects a log-magnitude Mel spectrogram
# mel = mel[np.newaxis, :, :]  The model expects shape(1, n_mels, n_frames) 

from diffusers import OnnxRuntimeModel
from IPython.display import Audio

melgan = OnnxRuntimeModel.from_pretrained("kashif/soundstream_mel_decoder")

audio = melgan(input_features=mel.astype(np.float32))
Audio(audio, rate=SAMPLE_RATE)


  y, sr = librosa.load('./datasets/PPDD-Jul2018_aud_mono_small/cont_foil_wav/PatternForPrediction/datasets/PPDD-Jul2018_aud_mono_small/cont_foil_wav/0a983538-61b5-4b9d-9ad9-23e05f548e5c.wav')


FileNotFoundError: [Errno 2] No such file or directory: './datasets/PPDD-Jul2018_aud_mono_small/cont_foil_wav/PatternForPrediction/datasets/PPDD-Jul2018_aud_mono_small/cont_foil_wav/0a983538-61b5-4b9d-9ad9-23e05f548e5c.wav'

In [4]:
list(["a", "b", "c", "d", "e"])


['a', 'b', 'c', 'd', 'e']

In [6]:
import os

os.listdir("./datasets/PPDD-Jul2018_aud_mono_small/prime_csv")[:5]

['c31ae90e-c752-495e-ba45-1982f50c612b.csv',
 'd42c9e96-a8d6-466f-9056-9f019ef0ff5f.csv',
 '61e8409b-b1e0-4ca2-a62c-2685bb4eb870.csv',
 '7d83865a-1d0e-41ad-b4a8-ec76cc5bcf8c.csv',
 'dd1e320f-13d3-4b9a-a543-4e8a600263d3.csv']

In [7]:
a = [1, 2, 3]
b = [2, 3, 4]
a+b

[1, 2, 3, 2, 3, 4]

In [12]:
import os
os.getcwd()

'/Users/ling/Desktop/GT/6201AudioContentAnalysis/6201PatternForPrediction'

In [25]:
import csv
with open("./datasets/PPDD-Jul2018_aud_mono_small/prime_csv/96a93bb4-f274-4bef-9db2-aa3cdeaad1a1.csv", "r", encoding="utf-8") as f:  # open in binary mode
    reader = csv.reader(f)
    for row in reader:
        if not row or all(cell.strip() == "" for cell in row):
            continue  # skip empty lines
        print(float(row[0]))
        

148.75
148.83333
149.0
149.33333
149.5
149.66667
149.83333
150.75
150.83333
151.0
151.33333
151.5
151.66667
151.83333
153.83333
154.0
154.33333
156.75
156.83333
157.0
157.33333
157.5
157.66667
157.83333
158.75
158.83333
159.0
159.33333
159.5
159.66667
159.83333
161.83333
162.0
162.33333
164.5
164.83333
165.33333
165.66667
165.83333
166.0
166.5
166.83333
167.83333
168.0
168.5
168.83333
169.33333
169.5
169.83333
174.75
174.83333
175.0
175.33333
175.5
175.66667
175.83333
176.66667
176.83333
177.0
177.33333
177.5
177.66667
177.83333
179.83333
180.0
180.25
182.66667
182.83333
183.0
183.33333
183.5
183.66667
183.83333


In [28]:
import csv
from enum import Enum

class CsvColumns(Enum):
    ONSET = 0      # Time of the note (quarter-note beats)
    MIDI = 1          # MIDI note number
    MORPHETIC = 2  # Morph pitch number
    DUR = 3            # Duration in beats
    CHAN = 4       # MIDI channel

REST_STATE = "r"

def get_seqs_from_csv(csv_file: str, onset_offset: float):
    pitch_seq = []
    onset_seq = []

    events = []
    with open(csv_file, "r", encoding="utf-8") as f:
        reader = csv.reader(f)
        for row in reader:
            if not row or all(cell.strip() == "" for cell in row):
                continue  # skip empty lines
            # Convert all to float first
            events.append([float(cell) for cell in row])

    n = len(events)
    for i in range(n):
        onset = events[i][CsvColumns.ONSET.value] - onset_offset
        pitch = events[i][CsvColumns.MIDI.value]
        dur = events[i][CsvColumns.DUR.value]

        # append pitch
        pitch_seq.append(pitch)

        # add onset / rest logic
        onset_seq.append(pitch)
        if i < n - 1:
            next_onset = events[i+1][CsvColumns.ONSET.value] - onset_offset
            gap = next_onset - (onset + dur)
            if gap > 0:
                pitch_seq.append(REST_STATE)
                onset_seq.append(gap)

    return pitch_seq, onset_seq

def get_onset_offset(csv_file: str):
    # Onsets do not necessarily start at 0. 
    # We subtract the first onset from all subsequent onsets so that each song begins at 0.
    # This normalization ensures between-song normalization during training.
    with open(csv_file, "r", encoding="utf-8") as f:
        reader = csv.reader(f)
        for row in reader:
            if not row or all(cell.strip() == "" for cell in row):
                continue  # skip empty lines
            return float(row[CsvColumns.ONSET.value])
    raise ValueError(f"No valid rows found in {csv_file}")



import numpy as np
from collections import defaultdict

class VariableOrderMarkov:
    """
    Variable-order Markov Model.
    Stores n-grams probability distribution for all orders from 1..max_order.
    Allows generating sequences with lower order than training order.
    """

    def __init__(self, max_order=3):
        # order is the numeric value n in n-gram.
        self.max_order = max_order

        # counts for each n-gram
        #   -> counts[1]: counts for each 1-gram
        #       -> {1-gram context: {next state: count}}
        #   -> counts[2]: counts for each 2-gram
        #       -> {2-gram context: {next state: count}}
        #   -> etc...
        self.counts = {
            k: defaultdict(lambda: defaultdict(float))
            for k in range(1, max_order + 1)
        }

        # transition probability for each n-gram
        self.probs = {
            k: defaultdict(lambda: defaultdict(float))
            for k in range(1, max_order + 1)
        }

        # all unique states (our vocab)
        self.states = set()

        # tracks whether probabilities are up to date.
        self.is_trained = False

    # -----------------------------------------------------
    # Training
    # - called for each audio piece
    # -----------------------------------------------------
    def train(self, seq):
        L = len(seq)
        if L < 2:
            return
        for i in range(L):
            self.states.add(seq[i])
        for order in range(1, self.max_order + 1):
            for i in range(order, L):
                context = tuple(seq[i - order:i])
                nxt = seq[i]
                self.counts[order][context][nxt] += 1
        self.is_trained = False

    # -----------------------------------------------------
    # Probability computation
    # - called after all pieces are trained
    # -----------------------------------------------------
    def compute_probabilities(self):
        for order in range(1, self.max_order + 1):
            for context, next_counts in self.counts[order].items():
                total = sum(next_counts.values())
                if total == 0:
                    continue
                for nxt, count in next_counts.items():
                    self.probs[order][context][nxt] = count / total

        self.is_trained = True

    # -----------------------------------------------------
    # Generation
    # -----------------------------------------------------
    def generate(self, generate_length, seq_prime=None, order=None):
        """
        generate_length: total number of states to generate after the prime
        order: which order to generate with (1..max_order)
        seq_prime: optional priming sequence (list)
        """
        if not self.is_trained:
            self.compute_probabilities()

        if order is None:
            order = self.max_order

        if seq_prime is None:
            seq_prime = [np.random.choice(list(self.states))]
        prime_len = len(seq_prime)
        seq = list(seq_prime)

        while len(seq) - prime_len < generate_length:
            context = tuple(seq[-order:]) if len(seq) >= order else tuple(seq)

            # Try decreasing orders until we find a valid context
            o = min(order, len(context))
            next_state = None

            while o > 0 and next_state is None:
                c = tuple(context[-o:])
                dist = self.probs[o].get(c, None)
                if dist:
                    next_state = np.random.choice(
                        list(dist.keys()),
                        p=list(dist.values())
                    )
                else:
                    o -= 1

            # If no context found at any order → random fallback
            if next_state is None:
                next_state = np.random.choice(list(self.states))

            seq.append(next_state)

        return seq[prime_len:]  # only return the generated continuation



import csv
import fluidsynth
from mido import MidiFile, MidiTrack, Message
from preprocessing import CsvColumns

def sequence_to_midi(
        midi_file_path, 
        onset_offset, pitch_seq, onset_seq, 
        csv_file_path = None,
        velocity=64):
    """
    Convert pitch + onset sequences into a MIDI file and optionally export to CSV.
    
    Args:
        onset_offset: first onset to normalize sequence
        pitch_seq: list of pitches (with 'r' for rests)
        onset_seq: list of onsets in quarter-note beats
        midi_file_path: path to save MIDI
        csv_file_path: optional path to save CSV
        velocity: MIDI note velocity
    """
    assert len(pitch_seq) == len(onset_seq), "pitch_seq and onset_seq must be same length"

    mid = MidiFile()
    track = MidiTrack()
    mid.tracks.append(track)

    csv_rows = []
    tick_from_prev_note = 0
    for i, pitch in enumerate(pitch_seq):
        onset = onset_seq[i] - onset_offset

        # compute note duration as time until next onset
        if i+1 >= len(onset_seq):
            # last note → set a default short duration
            next_onset = onset + 0.25  # quarter-beat default
        else:
            next_onset = onset_seq[i+1] - onset_offset
        note_duration_beats = next_onset - onset
        note_duration_ticks = int(note_duration_beats * mid.ticks_per_beat)

        if pitch != "r":
            # the time for note_on and note_off events is always the delta tick from the pervious event
            track.append(Message('note_on', note=int(pitch), velocity=velocity, time=tick_from_prev_note))
            track.append(Message('note_off', note=int(pitch), velocity=velocity, time=note_duration_ticks))

            tick_from_prev_note = 0

            # Add row for CSV
            if csv_file_path:
                csv_rows.append({
                    CsvColumns.ONSET.value: onset,
                    CsvColumns.MIDI.value: pitch,
                    CsvColumns.DUR.value: note_duration_beats
                })
        else:
            # rest → no note, just advance prev_onset
            tick_from_prev_note = note_duration_ticks

    mid.save(midi_file_path)
    print(f"MIDI saved to {midi_file_path}")

    # Save CSV if path provided
    if csv_file_path:
        with open(csv_file_path, "w", newline="") as f:
            writer = csv.DictWriter(f)
            writer.writeheader()
            writer.writerows(csv_rows)
        print(f"CSV saved to {csv_file_path}")
    return midi_file_path

def midi_to_wav(midi_file, wav_file, soundfont="FluidR3_GM.sf2"):
    """
    Render a MIDI file to WAV using FluidSynth.
    """
    fs = fluidsynth.Synth()
    fs.start(driver="file", filename=wav_file)
    sfid = fs.sfload(soundfont)
    fs.program_select(0, sfid, 0, 0)
    
    fs.midi_file_play(midi_file)
    fs.delete()
    
    print(f"WAV saved to {wav_file}")
    return wav_file



import os
import random
import shutil
from markov import VariableOrderMarkov
from preprocessing import get_seqs_from_csv, get_onset_offset
from postprocessing import sequence_to_midi, midi_to_wav


# initialize parallel markov models for pitch and onset respectively
max_order = 2
generate_length = 10
pitch_markov = VariableOrderMarkov(max_order=max_order)
onset_markov = VariableOrderMarkov(max_order=max_order)

# get all CSV files from prime and continuation directory
prime_csv_dir = "./datasets/PPDD-Jul2018_aud_mono_small/prime_csv"
cont_csv_dir = "./datasets/PPDD-Jul2018_aud_mono_small/cont_true_csv"
prime_midi_dir = "./datasets/PPDD-Jul2018_aud_mono_small/prime_midi"
cont_midi_dir = "./datasets/PPDD-Jul2018_aud_mono_small/cont_true_midi"
prime_wav_dir = "./datasets/PPDD-Jul2018_aud_mono_small/prime_wav"
cont_wav_dir = "./datasets/PPDD-Jul2018_aud_mono_small/cont_true_wav"
prime_csv_files = os.listdir(prime_csv_dir)
cont_csv_files = os.listdir(cont_csv_dir)
num_files = len(prime_csv_files)
assert num_files == len(cont_csv_files), "prime directory and continuation directory must have the same number of files"

test_file_index = random.randrange(start=0, stop=num_files)
test_file_id = prime_csv_files[test_file_index]
print(f"Randomly chosen {test_file_id} as the test file.")
assert test_file_id == cont_csv_files[test_file_index], "prime directory and continuation directory must have the same number of files in the same order" 

# train both markov models
for i in range(num_files):
    prime_file = os.path.join(prime_csv_dir, prime_csv_files[i])
    cont_file = os.path.join(cont_csv_dir, cont_csv_files[i])
    print(i)
    onset_offset = get_onset_offset(prime_file)
    prime_pitch_seq, prime_onset_seq = get_seqs_from_csv(prime_file, onset_offset)
    cont_pitch_seq, cont_onset_seq = get_seqs_from_csv(cont_file, onset_offset)
    pitch_markov.train(prime_pitch_seq + cont_pitch_seq)
    onset_markov.train(prime_onset_seq + cont_onset_seq)

# generate for test file
test_prime_csv = os.path.join(prime_csv_dir, test_file_id)
test_cont_csv = os.path.join(cont_csv_dir, test_file_id)
onset_offset = get_onset_offset(test_prime_csv)
pitch_seq_prime, onset_seq_prime = get_seqs_from_csv(test_prime_csv, onset_offset)
true_pitch_cont, true_onset_cont = get_seqs_from_csv(test_cont_csv, onset_offset)
generated_pitch_cont = pitch_markov.generate(generate_length, seq_prime=pitch_seq_prime)
generated_onset_cont = onset_markov.generate(generate_length, seq_prime=onset_seq_prime)

# write outputs
output_dir = f"./test_generation/{test_file_id}"
os.makedirs(output_dir, exist_ok=True)
midi_filepath = sequence_to_midi(
    os.path.join(output_dir, "generated_cont.mid"), 
    onset_offset,
    generated_pitch_cont,
    generated_onset_cont,
    csv_file_path=os.path.join(output_dir, "generated_cont.csv"),
)
wav_filepath = midi_to_wav(
    midi_filepath,
    os.path.join(output_dir, "generated_cont.wav")
)
# copy prime and true continuation files
shutil.copy(test_prime_csv, os.path.join(output_dir, "prime.csv"))
shutil.copy(test_cont_csv, os.path.join(output_dir, "true_cont.csv"))
shutil.copy(os.path.join(prime_midi_dir, test_file_id), os.path.join(output_dir, "prime.midi"))
shutil.copy(os.path.join(cont_midi_dir, test_file_id), os.path.join(output_dir, "true_cont.midi"))
shutil.copy(os.path.join(prime_wav_dir, test_file_id), os.path.join(output_dir, "prime.wav"))
shutil.copy(os.path.join(cont_wav_dir, test_file_id), os.path.join(output_dir, "true_cont.wav"))


Randomly chosen d99f6dd9-8f68-4962-97ff-036a72051e5c.csv as the test file.
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15


UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 3131: invalid start byte

In [None]:
csv_file = os.path.join(prime_csv_dir, prime_csv_files[15])
print(csv_file)
with open(csv_file, "r", encoding="utf-8") as f:
        reader = csv.reader(f)
        for row in reader:
            if not row or all(cell.strip() == "" for cell in row):
                continue  # skip empty lines
            print(float(row[CsvColumns.ONSET.value]))


./datasets/PPDD-Jul2018_aud_mono_small/prime_csv/.DS_Store
<_csv.reader object at 0x10b9dd150>


UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 3131: invalid start byte