In [2]:
import os
import glob

import librosa
import torchaudio
import soundfile as sf
import pandas as pd
# from src.data.preproc import convert_libri
from src.data.util import log_compress, read_audioset_csv

In [50]:
test_flac = '/Users/lkieu/Desktop/Audioset/processed/-5-vmt2iKT0.flac'

waveform, sr = torchaudio.load(test_flac, normalize=True)


In [18]:
waveform = waveform.squeeze()

In [51]:
from pyannote.audio.pipelines import VoiceActivityDetection
# instantiate the model
from pyannote.audio import Model
model = Model.from_pretrained(
  "pyannote/segmentation-3.0")
pipeline = VoiceActivityDetection(segmentation=model)
HYPER_PARAMETERS = {
  # remove speech regions shorter than that many seconds.
  "min_duration_on": 0.0,
  # fill non-speech regions shorter than that many seconds.
  "min_duration_off": 0.0
}
pipeline.instantiate(HYPER_PARAMETERS)
vad = pipeline({'waveform': waveform, 'sample_rate': sr})
# `vad` is a pyannote.core.Annotation instance containing speech regions
str(vad)

'[ 00:00:00.030 -->  00:00:09.480] 0 SPEECH'

In [21]:
from transformers import Wav2Vec2ForSequenceClassification, AutoFeatureExtractor
import torch

model_id = "facebook/mms-lid-256"

processor = AutoFeatureExtractor.from_pretrained(model_id)
model = Wav2Vec2ForSequenceClassification.from_pretrained(model_id)
# English
inputs = processor(waveform, sampling_rate=16_000, return_tensors="pt")

with torch.no_grad():
    outputs = model(**inputs).logits

lang_id = torch.argmax(outputs, dim=-1)[0].item()
detected_lang = model.config.id2label[lang_id]
# 'eng'


In [22]:
detected_lang

'lat'

In [33]:
path = '/Users/lkieu/PycharmProjects/PhonemeAwareFoundational/test_audio/balanced_train_segments.csv'
df = read_audioset_csv(path)
df.head(3)

Unnamed: 0,YTID,start_seconds,end_seconds,positive_labels
0,--PJHxphWEs,30.0,40.0,"[/m/09x0r, /t/dd00088]"
1,--ZhevVpy1s,50.0,60.0,[/m/012xff]
2,--aE2O5G5WE,0.0,10.0,"[/m/03fwl, /m/04rlf, /m/09x0r]"


In [31]:
from src.data.preproc import convert_audioset, has_allowed_tag

audio_path = '/Users/lkieu/Desktop/Audioset/audio/bal_train'
paths = convert_audioset(audio_path)

# Filter for speech tags
df = read_audioset_csv(path)
ytids_to_paths = {
        os.path.splitext(os.path.basename(p))[0]: p for p in paths
    }

ytids = list(ytids_to_paths.keys())
df_filtered = df[df['YTID'].isin(ytids)].copy()
df_filtered['has_allowed_tag'] = df_filtered['positive_labels'].apply(has_allowed_tag)
df_result = df_filtered[df_filtered['has_allowed_tag']==True ].copy()
len(df_result)

534

In [38]:
df_result.head(3)

Unnamed: 0,YTID,start_seconds,end_seconds,positive_labels,has_allowed_tag
0,--PJHxphWEs,30.0,40.0,"[/m/09x0r, /t/dd00088]",True
2,--aE2O5G5WE,0.0,10.0,"[/m/03fwl, /m/04rlf, /m/09x0r]",True
25,-30H9V1IKps,6.0,16.0,"[/m/07yv9, /m/09x0r, /m/0gvgw0]",True


In [None]:
# Gather all the flac and json
data_dir = "./LibriLight/small"

def gather_flac_json_pairs(root_dir):
    flac_files = glob.glob(os.path.join(root_dir, '**', '*.flac'), recursive=True)
    pairs = []

    for flac_path in flac_files:
        json_path = os.path.splitext(flac_path)[0] + '.json'
        if os.path.exists(json_path):
            pairs.append((flac_path, json_path))
        else:
            print(f"Warning: No JSON companion for {flac_path}")

    return pairs

pairs = gather_flac_json_pairs(data_dir)

In [None]:
from collections import defaultdict
import os

def find_duplicate_filenames(paths):
    filename_to_paths = defaultdict(list)

    for path in paths:
        filename = os.path.basename(path)
        filename_to_paths[filename].append(path)

    duplicates = {fname: plist for fname, plist in filename_to_paths.items() if len(plist) > 1}

    for fname, plist in duplicates.items():
        print(f"Duplicate filename: {fname}")
        for p in plist:
            print(f"  {p}")

paths = list(map(lambda x: x[0], pairs))
find_duplicate_filenames(paths)


In [None]:
# Verify Sampling rate
def get_sample_rate(path):
    metadata = sf.info(path)
    return metadata.samplerate

sample_rate = {}
for path, _ in pairs:
    info = f'{get_sample_rate(path)} hz'
    if info not in sample_rate:
        sample_rate[info] = 1
    else:
        sample_rate[info] += 1
print(sample_rate)

In [None]:
# Distribution of Voice Activity block length.
# Specific to LibriLight
from collections import Counter
import json
from pathlib import Path
import matplotlib.pyplot as plt

def collect_duration_differences(flac_json_pairs):
    all_durations = []

    for flac_path, json_path in flac_json_pairs:
        with open(json_path, 'r') as f:
            data = json.load(f)

        voice_activity = data.get('voice_activity', [])
        durations = [end - start for start, end in voice_activity]
        all_durations.extend(durations)

    return all_durations

def plot_duration_distribution(durations, bins=50):
    plt.hist(durations, bins=bins, edgecolor='black')
    plt.title('Distribution of Voice Activity Durations')
    plt.xlabel('Duration (seconds)')
    plt.ylabel('Frequency')
    plt.show()

def top_n_durations(durations, n=10, rounding=2):
    rounded_durations = [round(d, rounding) for d in durations]
    counter = Counter(rounded_durations)
    most_common = counter.most_common(n)
    return most_common


durations = collect_duration_differences(pairs)
print(top_n_durations(durations))
print('min: ' + str(min(durations)))
print('max: ' + str(max(durations)))
plot_duration_distribution(durations)

In [None]:
# What if we allow for silence of 1s

def merge_close_blocks(voice_activity, threshold=1.0):
    if not voice_activity:
        return []

    # Sort by start time just in case
    voice_activity = sorted(voice_activity, key=lambda x: x[0])
    merged = [voice_activity[0]]

    for start, end in voice_activity[1:]:
        last_start, last_end = merged[-1]
        if start - last_end < threshold:
            # Merge intervals
            merged[-1][1] = max(last_end, end)
        else:
            merged.append([start, end])

    return merged

def get_duration_diff_merged(pairs):
    all_durations = []

    for flac_path, json_path in pairs:
        with open(json_path, 'r') as f:
            data = json.load(f)

        voice_activity = data.get('voice_activity', [])
        voice_activity = merge_close_blocks(voice_activity)
        durations = [end - start for start, end in voice_activity]
        all_durations.extend(durations)

    return all_durations

durations = get_duration_diff_merged(pairs)
print(top_n_durations(durations))
print('min: ' + str(min(durations)))
print('max: ' + str(max(durations)))
plot_duration_distribution(durations)

In [None]:
import torch
a = torch.tensor([[1,2],
                  [3,4]])
b = torch.tensor([[5,6],
                 [7,8]])
torch.maximum(a, a.max() - 1)

In [None]:
from torchaudio import transforms
import torchaudio

test_flac = '/Users/lkieu/PycharmProjects/PhonemeAwareFoundational/test_audio/canterburytales_09_chaucer_64kb.flac'
waveform, sample_rate = torchaudio.load(test_flac, normalize=True)
transform = transforms.MelSpectrogram(sample_rate=sample_rate, n_fft=1024, hop_length=256, n_mels=80)
melspec = transform(waveform)
# Define max size (in time frames)
max_frames = 500  # Example

def crop_or_pad(spec, max_frames):
    channels, n_mels, time_frames = spec.shape
    if time_frames > max_frames:
        return spec[:, :, :max_frames]
    elif time_frames < max_frames:
        pad_amount = max_frames - time_frames
        pad = torch.zeros((channels, n_mels, pad_amount), device=spec.device)
        return torch.cat((spec, pad), dim=2)
    else:
        return spec

melspec= crop_or_pad(melspec, max_frames)
print(melspec.shape)  # Should be (channels, n_mels, max_frames)


In [None]:
from src.data.util import log_compress
import librosa

log_spec_lib = librosa.power_to_db(melspec[0])
log_spec_torch = log_compress(melspec[0])

In [None]:
import matplotlib.pyplot as plt

def plot_spectrogram(specgram, title=None, ylabel="freq_bin", ax=None):
    if ax is None:
        _, ax = plt.subplots(1, 1)
    if title is not None:
        ax.set_title(title)
    ax.set_ylabel(ylabel)
    ax.imshow(specgram, origin="lower", aspect="auto", interpolation="nearest")

In [None]:
plot_spectrogram(log_spec_lib)

In [None]:
plot_spectrogram(log_spec_torch)

In [None]:
from src.data.util import whisper_norm, z_score_norm, min_max_norm
whisper_norm_log_spec = whisper_norm(log_spec_torch)
plot_spectrogram(whisper_norm_log_spec)

In [None]:
z_norm = z_score_norm(log_spec_torch)
plot_spectrogram(z_norm)

In [None]:
min_max_norm_log_spec = min_max_norm(log_spec_torch)
plot_spectrogram(min_max_norm_log_spec)