In [None]:

from tqdm import tqdm
from dataset import get_data_loader, move_data_to_device, MyDataset
from hparams import Hparams
from IPython.display import Audio


args = Hparams.args
dataset = MyDataset(
        dataset_root=args['dataset_root'],
        split='train',
        sampling_rate=args['sampling_rate'],
        sample_length=args['sample_length'],
        frame_size=args['frame_size'],
        song_fns=None,
    )


In [None]:
from dataset import  read_michigan_dataset_index
read_michigan_dataset_index()

class PhomemeLibrary():
    def __init__(self, audio_source = ("michigan", "MV1")) -> None:

        # handling source audio
        self.audio_source = audio_source
        self.index = None
        if audio_source[0] == "michigan":
            ds_idx = read_michigan_dataset_index()
            self.index = ds_idx[ds_idx["participantID"] == audio_source[1]]

In [None]:
import librosa
def testItem(idx=0):
    print(f"samplerate: {dataset.sampling_rate }")
    pYinFrameTime = librosa.frames_to_time(dataset.sample_length, sr=dataset.sampling_rate, hop_length=200)
    melspecFrameTime = librosa.frames_to_time(dataset.sample_length, sr=dataset.sampling_rate, hop_length=321)
    print(f"melspec FrameTime: {melspecFrameTime}")
    print(f"pYin FrameTime: {pYinFrameTime}")
    
    mel_spectrogram, yin, pyin = dataset.__getitem__(idx)
    print(mel_spectrogram.shape[1])
    print(yin.shape)
    print(pyin.shape)

testItem(0)

In [None]:
import os
import numpy as np
import pandas as pd
import tarfile
import pinyin


def read_aidatatang_index(data_root=os.path.join(os.getcwd(),"data_full")):

    # handling transcripts
    transcript_path = os.path.join(data_root, 'aidatatang', 'transcript', 'aidatatang_200_zh_transcript.txt')
    with open(transcript_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    def parseLine(line):
        split = line.split(' ')
        transcriptIDX = split[0]
        # T0055 G0002 S0002
        prefix = transcriptIDX[:5]
        participant_id = transcriptIDX[5:10]
        sentence_id = transcriptIDX[10:]
        
        text = split[1:]
        text,text_ids = extract_pinyin(list(' '.join(text).strip()))
        return participant_id,sentence_id, text,text_ids
    
    lines=[parseLine(line) for line in lines]

    # handling actual index
    def read_subfolder(subfolder_path):
        files = os.listdir(subfolder_path)
        return files
    
    subfolders = ["dev","test","train"]
    subfolder_index= {subfolder:sorted(read_subfolder(os.path.join(data_root, 'aidatatang', 'corpus', subfolder))) for subfolder in subfolders}
    for subfolder in subfolders:
        indexing = set(subfolder_index[subfolder])
        # sanity
        assert len(indexing) == len(subfolder_index[subfolder])
        subfolder_index[subfolder] = indexing 

    df = pd.DataFrame.from_records(data = lines, columns=["participantID", "sentenceID", "transcript", "toneclass"])
    
    def get_category(x):
        pid = x["participantID"]
        fname = f"{pid}.tar.gz"
        for subfolder in subfolders:
            if fname in subfolder_index[subfolder]:
                return subfolder
        raise ValueError(f"Could not find {fname} in any subfolder")

    df["folder"] = df.apply(get_category, axis=1)
    return df

def extract_pinyin(sentence_word_list):
    pinyin_word_list = tuple([pinyin.get(x, format="numerical", delimiter=" ") for x in sentence_word_list])
    pinyin_word_list_tone_class = tuple([int(x[-1]) if len(x)>1 else 0 for x in pinyin_word_list])
    return pinyin_word_list,pinyin_word_list_tone_class
    
def read_aidatatang_data(participantID, sentenceID):

    fullFileName = f"T0055{participantID}{sentenceID}"
    data_root=os.path.join(os.getcwd(),"data_full")
    makePath = lambda x,y: os.path.join(data_root, 'aidatatang', 'corpus', x, f"{y}.tar.gz")
    subfolders = ["dev","test","train"]
    possiblePaths = [makePath(subfolder,participantID) for subfolder in subfolders]

    zipped_path = [path for path in possiblePaths if os.path.exists(path)][0]

    
    suffixed = {
        'AudioData': (".wav",lambda x: librosa.load(x, sr = 16000, mono=True)[0]) ,
        "MetaData": (".txt", lambda x: tuple(extract_pinyin(x.read().decode("utf-8")))), 
        "Transcript": (".trn", lambda x: x.read().decode("utf-8"))
        }

    with tarfile.open(zipped_path, 'r',) as tar_ref:

        data = {}
        for suf in suffixed.items():
            file_to_extract = f"./{participantID}/{fullFileName}{suf[1][0]}"
            print(file_to_extract)
            tar_ref.extractfile(file_to_extract)
            data[suf[0]] = suf[1][1](tar_ref.extractfile(file_to_extract))
        
    for x in data:
        print(x, data[x])



    return data

kiki = read_aidatatang_index()
# d = read_aidatatang_data(*read_aidatatang_index()[0][3][:2])

In [None]:
kiki[kiki["folder"] == 'dev'].drop_duplicates(subset = "transcript", keep= "first")

In [None]:
d = read_aidatatang_data("G0002", "S0001")

librosa.display.waveshow(d['AudioData'], sr=16000)
Audio(d['AudioData'], rate=16000)


In [None]:
def calculate_fundamentals(size, lower, upper, is_mel=False):
    if is_mel:
        freqs = librosa.mel_frequencies(n_mels=size)
    else:
        freqs = librosa.fft_frequencies(sr=16000, n_fft=size)
    
    mask  = (freqs<=upper) & ( freqs>=lower) & ( freqs!=0)
    fundamental_idxs = {x:y for y,x in enumerate(freqs.tolist())}
    fundamentals = freqs[mask]

    fmax = np.max(freqs)
    
    fundamentals_dict = {f:([],[]) for f in fundamentals}

    print(fmax)
    print(freqs)
    
    for k,v in fundamentals_dict.items():
        f = k
        harmonoic = 1
        while f <= fmax:
            f = f*harmonoic
            if f > fmax:
                break
            v[0].append(f)
            v[1].append(fundamental_idxs[f])
            harmonoic += 1

    return fundamentals_dict

def spectral_folding(folddict, spectrum):
    folded = []

    for k in sorted(folddict.keys()):
        idxs = folddict[k][1]
        folded[k] = folddict[v[1]]
    return folded

calculate_fundamentals(1024, 0, 150)

In [None]:
import librosa
import matplotlib.pyplot as plt

# Define the base frequency and number of chroma bins
base_freq = 100
n_chroma = 50


# Calculate the chroma filter
chroma_filter = librosa.filters.chroma(sr=16000, n_fft=4096, n_chroma=n_chroma, tuning=base_freq, norm=0,ctroct=4,octwidth=2)
fig, ax = plt.subplots()
img = librosa.display.specshow(chroma_filter, x_axis='linear', ax=ax)
print(chroma_filter.shape)
ax.set(ylabel='Chroma filter', title='Chroma filter bank')
fig.colorbar(img, ax=ax)

In [None]:
def spectralFlux(a,b):
    d = read_aidatatang_data(a,b)
    audioData = d['AudioData']
    # sosfilt = signal.butter(2,(20,200), 'bp', fs=16000, output='sos')
    # audioData = signal.sosfilt(sosfilt, audioData)
    # audioData = audioData[np.abs(audioData)<np.mean(np.abs(audioData))]

    melspec = librosa.feature.melspectrogram(y= audioData, sr=16000, n_fft=2048, hop_length=128, n_mels=256)
    print(melspec.shape)
    # chroma = librosa.feature.chroma_stft(y=audioData, sr=16000, n_fft=2048, hop_length=128, n_chroma=50, tuning=100, 
    # norm=0,ctroct=4,octwidth=2)
    base_freq =100
    n_chroma = 50
    hop_length = 8
    chroma_filter = librosa.filters.chroma(sr=16000, n_fft=1024, n_chroma=n_chroma, tuning=base_freq, norm=2,ctroct=4,octwidth=5,base_c=True)
    mag_spectrum = np.abs(librosa.stft(audioData, n_fft=1024, hop_length=hop_length))
    chroma_spectrum = np.dot(chroma_filter, mag_spectrum)

    # use the yin algo to get yin too
    yin = librosa.yin(audioData, sr=16000, hop_length=hop_length, fmin=100, fmax = 200)

    # for each window, find the min that is significant
    # find the highest power bin
    mag_spectrum_power_max = np.argmax(mag_spectrum, axis=0)

    mag_spectrum_power = np.zeros_like(mag_spectrum)
    for i in range(mag_spectrum.shape[1]):
        mag_spectrum_power[mag_spectrum_power_max[i],i] = 1

    print(mag_spectrum.shape)
    print(mag_spectrum_power)

    # try a wavelet transform using libros


In [None]:
def read_michigan_dataset_index(data_root=os.path.join(os.getcwd(),"data_full")):

    # handling transcripts
    audio = os.path.join(data_root, 'michigan', 'tone_perfect_all_mp3', 'tone_perfect')
    transcripts= os.path.join(data_root, 'michigan', 'tone_perfect_all_xml', 'tone_perfect')
    
    audioIndex = os.listdir(audio)
    transcriptIndex = os.listdir(transcripts)
    # ignoreing the metadata for now
    
    def parseAudioIndex(filename):
        elem = filename.split("_")
        word = elem[0]
        word_tone_class = int(word[-1]) 
        particpantID = elem[1]
        xml_fn = filename.replace(".mp3", ".xml").replace("MP3", "CUSTOM")
        return (particpantID, word, word_tone_class, filename,xml_fn)
    
    audioData = [parseAudioIndex(filename) for filename in audioIndex]
    return pd.DataFrame.from_records(data=audioData, columns=["participantID", "word", "toneclass", "filename",'xml_fn'])

def read_michigan_dataset_audio(filename, 
                                data_root=os.path.join(os.getcwd(),"data_full"),
                                sr = 16000,
                                mono=True
                                ):
    filepath = os.path.join(data_root, 'michigan', 'tone_perfect_all_mp3', 'tone_perfect', filename)
    return librosa.load(filepath, sr=sr, mono=mono)[0]

md = read_michigan_dataset_index()

md_dict = set(md["word"].unique())
# sanity
samplesentence = ['yi3', 'hou4', 'ni3', 'shi4', 'nan2', 'hai2', 'zi3']
assert set(samplesentence).issubset(md_dict)

def getSentence(pid,words, md=md, convert_fn = lambda x: x.replace("5", "4")):
    playerdata = md[md["participantID"]==pid]

    filenames  = []
    for word in words:
        word = convert_fn(word)

        words = playerdata[playerdata["word"]==word]
        if len(words) == 0:
            raise ValueError(f"Could not find {word} in any subfolder")
        filenames.append(words.iloc[0]["filename"])

    audiosamples = [read_michigan_dataset_audio(filename) for filename in filenames]
    return audiosamples
md
audiosamples = getSentence("MV1", samplesentence)


def mix_audio(audiosamples, overlap = 0, add_silence = 1, signal_length_seconds = None , min_samples_each_word = 0):
    frames_to_add = librosa.time_to_samples(add_silence, sr=16000)
    lens = [len(x) for x in audiosamples]
    total_len = 0

    if overlap == "auto" and not (signal_length_seconds is None):
        signal_samples = librosa.time_to_samples(signal_length_seconds, sr=16000)
        actual_total_len = np.sum(lens)
        overlap = (actual_total_len - signal_samples)/(len(lens)-1)
        assert overlap > 0
        for i in lens:
            if overlap > i:
                raise ValueError(f"Overlap {overlap} is larger than audio sample {i}")
        
    for idx,l in enumerate(lens):
        if idx == 0:
            total_len += l
        else:
            total_len += l - overlap
            
    final = np.zeros(total_len+frames_to_add+frames_to_add)

    base_frame_index = frames_to_add
    current_id = base_frame_index
    delims = []
    delims.append(current_id)
    for idx,a in enumerate(audiosamples):
        audLen = len(a)
        if idx == 0:
            final[current_id:current_id+audLen] = a
            current_id = current_id+audLen
        else:
            current_id -= overlap
            if current_id - delims[-1] < min_samples_each_word:
                current_id = delims[-1] + min_samples_each_word
            delims.append(current_id)
            final[current_id:current_id+audLen] = a
            current_id = current_id+audLen
    delims.append(current_id)
    return final, delims


mixed,_ = mix_audio(audiosamples, overlap=1000)
librosa.display.waveshow(mixed, sr=16000)
plt.show()

Audio(data=mixed, rate=16000)





In [None]:
import librosa
from librosa.sequence import dtw
import librosa
import numpy as np
import numpy as np



def dtw_distance(x, y, sr=16000, hop_length=128):
    """
    Computes the dynamic time warping distance between two audio signals x and y.
    """
    x_harmonic, x_percussive = librosa.effects.hpss(x)
    y_harmonic, y_percussive = librosa.effects.hpss(y)

    x = x_percussive
    y = y_percussive
    

    x_mfcc = librosa.feature.melspectrogram(y= x, sr=16000, n_fft=512, hop_length=hop_length, n_mels=128)
    y_mfcc = librosa.feature.melspectrogram(y= y, sr=16000, n_fft=512, hop_length=hop_length, n_mels=128)

    # convert to power specs

    # set the bottle halves of both to 0
    x_mfcc[:x_mfcc.shape[0]//3,:] = 0
    y_mfcc[:y_mfcc.shape[0]//3,:] = 0

    # # set the top havles of both to 0
    # x_mfcc[x_mfcc.shape[0]//2:,:] = 0
    # y_mfcc[y_mfcc.shape[0]//2:,:] = 0




    D, wp = dtw(x_mfcc, y_mfcc)
    return D, wp , x_mfcc, y_mfcc

# please plot the wraping path
def plot_warping_path(wp, D):
    """
    Plots the warping path on the distance matrix.
    """
    fig = plt.figure(figsize=(10, 10))
    plt.imshow(D, origin='lower', cmap='gray', interpolation='nearest')
    plt.plot(wp[:, 1], wp[:, 0], marker='o', color='r')
    plt.xlim([-0.5, D.shape[1]-0.5])
    plt.ylim([-0.5, D.shape[0]-0.5])
    plt.title('Warping path on distance matrix $D$')
    plt.show()



import matplotlib.pyplot as plt

def plot_warping(mfcc1, mfcc2, wp, delims_a, delims_b):
    """
    Plots the two MFCCs with lines indicating the warping path.
    """
    fig, ax = plt.subplots(1, 1, figsize=(4, 2))
    ax = [ax]
    # make sure both are in db
    spec1 = librosa.amplitude_to_db(mfcc1)
    spec2 = librosa.amplitude_to_db(mfcc2)

    # stack spec1  and spec2 them tgt
    spec = np.hstack([spec1, spec2])

    librosa.display.specshow(spec, sr=16000, x_axis='time', ax=ax[0])
    

    # wp_time = librosa.frames_to_time(wp[:, 1])
    # for i in range(wp.shape[0]):
    #     ax[0].plot([wp_time[i], wp_time[i]], [0, mfcc1.shape[0]-1], 'r')
    #     ax[1].plot([wp_time[i], wp_time[i]], [0, mfcc2.shape[0]-1], 'r')
    plt.show()



def calculate_delimiter_timing_mapping(delimiters_a, wp, sr=16000, hop_length=128):
    """
    Calculates the timing of the delimiters in seconds.
    """
    frame_time = librosa.frames_to_time(1, sr=sr, hop_length=hop_length)
    delimiters_a_frames = librosa.samples_to_frames(delimiters_a, hop_length=hop_length)    
    delimiters_b_frames = set()

    a_done = set()

    for i in range(wp.shape[0]):
        mapping = wp[i]
        if (mapping[0] in delimiters_a_frames) and (mapping[0] not in a_done):
            a_done.add(mapping[0])
            delimiters_b_frames.add(mapping[1])

    delimiters_a_time = librosa.frames_to_time(delimiters_a_frames, sr=sr, hop_length=hop_length)
    delimiters_b_time = librosa.frames_to_time(sorted(list(delimiters_b_frames)), sr=sr,hop_length=hop_length)
    return delimiters_a_time, delimiters_b_time


def get_non_silent(audio_sample, threshold=0.01):
    """
    Returns the start and end indexes for non-silent periods in an audio sample.
    """
    # Compute the short-term energy of the audio sample
    window_size = 1024
    hop_size = 512
    energy = np.array([sum(abs(audio_sample[i:i+window_size]**2)) for i in range(0, len(audio_sample)-window_size, hop_size)])
    
    # Normalize the energy
    energy /= max(energy)
    
    # Find the start and end indexes for non-silent periods
    non_silent = np.where(energy > threshold)[0]
    start = non_silent[0] * hop_size
    end = (non_silent[-1] + 1) * hop_size
    return start, end

def plot_audio_with_lines(audio_signal, sr=16000, threshold=0.01):
    """
    Plots an audio signal with red lines indicating the start and end of non-silent periods.
    """
    start, end = get_non_silent(audio_signal, threshold=threshold)
    duration = librosa.get_duration(y = audio_signal, sr=sr)
    time = np.linspace(0, duration, len(audio_signal))
    plt.plot(time, audio_signal)
    plt.axvline(x=start/sr, color='r')
    plt.axvline(x=end/sr, color='r')
    plt.xlabel('Time (s)')
    plt.ylabel('Amplitude')
    plt.show()


def plot_melspecs_with_lines(melspec1, melspec2, frame_indexes1, frame_indexes2, labels=None, hop_length=128):
    """
    Plots two mel spectrograms side by side with vertical lines at the specified frame indexes.
    """
    fig, axs = plt.subplots(1, 2, figsize=(15, 4))
    img1 = librosa.display.specshow(librosa.power_to_db(melspec1, ref=np.max), x_axis='time', y_axis='mel', sr=16000, ax=axs[0], hop_length=hop_length)
    img2 = librosa.display.specshow(librosa.power_to_db(melspec2, ref=np.max), x_axis='time', y_axis='mel', sr=16000, ax=axs[1],hop_length=hop_length)
    for i, frame_index in enumerate(frame_indexes1):
        axs[0].axvline(x=frame_index, color='r')
        if labels is not None:
            axs[0].text(frame_index, axs[0].get_ylim()[1]+2, labels[i], ha='center', va='bottom', fontsize=6, color='r', rotation=90,backgroundcolor = 'w')
    for i, frame_index in enumerate(frame_indexes2):
        axs[1].axvline(x=frame_index, color='r')
        if labels is not None:
            axs[1].text(frame_index, axs[1].get_ylim()[1]+2, labels[i], ha='center', va='bottom', fontsize=6, color='r', rotation=90, backgroundcolor = 'w')
    fig.colorbar(img1, ax=axs[0], format='%+2.0f dB')
    fig.colorbar(img2, ax=axs[1], format='%+2.0f dB')
    plt.margins(0, 0)
    plt.show()

def breakupAudio(incoming, delims_in_time, silence_duration=0.5, feathering = 0.1):
    delimsamples = librosa.time_to_samples(delims_in_time, sr=16000)
    silencesamples = librosa.time_to_samples(silence_duration, sr=16000)
    feather_samples = librosa.time_to_samples(feathering, sr=16000)
    
    silence_arr = np.zeros(silencesamples)
    brokenSamples = []

    for i in range(len(delims_in_time)-1):
        start = delimsamples[i] - feather_samples
        end = delimsamples[i+1] + feather_samples
        sample = incoming[start:end]
        brokenSamples.append(sample)
        brokenSamples.append(silence_arr)
    return np.concatenate(brokenSamples)


def direct_convert(dlim_time_a, dlim_time_b):
    start_a = dlim_time_a[0]
    end_a = dlim_time_a[-1]
    start_b = dlim_time_b[0]
    end_b = dlim_time_b[-1]

    dur_a = end_a - start_a
    dur_b = end_b - start_b

    # conver all a timings to scale to b timings
    a_percentages = [(x-start_a)/dur_a for x in dlim_time_a]
    new_b = [start_b + x*dur_b for x in a_percentages]
    return np.array(new_b)

def try_dtw():
    aidtang = read_aidatatang_data("G0002", "S0002")
    actual = aidtang["AudioData"]
    sentence_list = aidtang["MetaData"][0]
    print(sentence_list[-1])
    if "\n" in sentence_list[-1]:
        sentence_list = sentence_list[:-1]
    start, end = get_non_silent(actual, threshold=0.1)
    sig_len = librosa.samples_to_time(end-start, sr=16000)

    padding = 0.3
    padding_samples = librosa.time_to_samples(padding, sr=16000)
    actual = actual[start-padding_samples:end+padding_samples]


    generated, delimiters = mix_audio(
        getSentence("MV1", sentence_list),
        overlap=2000, 
        signal_length_seconds=sig_len,
        min_samples_each_word=3000
        )
    

    sr = 16000
    D, wp, ma, mb = dtw_distance(generated, actual, sr=sr, hop_length=64)
    delim_fa, delim_fb = calculate_delimiter_timing_mapping(delimiters, wp, hop_length=64)
    # delim_fb = direct_convert(delim_fa, delim_fb)
    # delim_fb_mod = np.array([0,-0.05,0,0,0,0,0,0,0,0,0,0,0,0])
    # delim_fb = delim_fb + delim_fb_mod
    print(f"delim_fa: {delim_fa}")
    print(f"delim_fb: {delim_fb}")
    plot_melspecs_with_lines(ma, mb, delim_fa, delim_fb, hop_length=64, labels=sentence_list+["end"])

    broken_actual = breakupAudio(actual, delim_fb, silence_duration=2,feathering = 0)
    broken_generated = breakupAudio(generated, delim_fa, silence_duration=2,feathering = 0)

    return (
        Audio(data=generated, rate=16000), Audio(data=actual, rate=16000),
         Audio(data=broken_generated, rate=16000),Audio(data=broken_actual, rate=16000)
    )



a,b,c,d = try_dtw()
a


In [None]:
d

In [None]:
import dataset
import sklearn.model_selection
from hparams import Hparams_michigan
from dataset import read_michigan_dataset_index

def get_data_loader_michigan(args):
        # dataset_root=args['dataset_root'],
        # split=split,
        # sampling_rate=args['sampling_rate'],
        # sample_length=args['sample_length'],
        # frame_size=args['frame_size'],
    index = read_michigan_dataset_index()
    tone_classes = index["toneclass"].values
    ids = list(range(len(index)))
    train_ids, test_ids= sklearn.model_selection.train_test_split(ids, test_size=0.2, random_state=42, shuffle=True, stratify=tone_classes)

    train_index = index.iloc[train_ids]
    test_index = index.iloc[test_ids]

    train_ds = dataset.DatasetMichigan(
        dataset_index=train_index, 
        dataset_root=args['dataset_root'], 
        sampling_rate=args['sampling_rate'], 
        preload_audio=args['preload_audio'],
        sample_length=args['sample_length'],
        pad_audio=args['pad_audio'],
        )
    
    test_ds = dataset.DatasetMichigan(
        dataset_index=test_index, 
        dataset_root=args['dataset_root'], 
        sampling_rate=args['sampling_rate'], 
        preload_audio=args['preload_audio'],
        sample_length=args['sample_length'],
        pad_audio=args['pad_audio'],
        )
    return train_ds, test_ds
get_data_loader_michigan(Hparams_michigan.args)[0].plot_item(0)

In [None]:
get_data_loader_michigan(Hparams_michigan.args)[1].plot_item(0)

In [None]:
import mir_eval.sonify
from IPython.display import Audio
sr = 22050

y_sweep = librosa.chirp(fmin=librosa.note_to_hz('C3'),
                        fmax=librosa.note_to_hz('C5'),
                        sr=sr,
                        duration=1)

Audio(data=y_sweep, rate=sr)