In [269]:
from pathlib import Path

audio_dir = Path("librispeech/audio")
audio_ext = ".flac"
paths = list(audio_dir.rglob(f"**/*{audio_ext}"))

In [270]:
import pandas as pd
import torchaudio
import torch

align_dir = Path("librispeech/alignments")
align_df = pd.read_csv(align_dir / "alignments.csv")

model_name = "hubert_base"
layer = 12


try:
    bundle = getattr(torchaudio.pipelines, model_name.upper())
except AttributeError:
    raise ValueError(f"Invalid model name: {model_name}")

model = bundle.get_model()
model.eval()

Wav2Vec2Model(
  (feature_extractor): FeatureExtractor(
    (conv_layers): ModuleList(
      (0): ConvLayerBlock(
        (layer_norm): GroupNorm(512, 512, eps=1e-05, affine=True)
        (conv): Conv1d(1, 512, kernel_size=(10,), stride=(5,), bias=False)
      )
      (1-4): 4 x ConvLayerBlock(
        (conv): Conv1d(512, 512, kernel_size=(3,), stride=(2,), bias=False)
      )
      (5-6): 2 x ConvLayerBlock(
        (conv): Conv1d(512, 512, kernel_size=(2,), stride=(2,), bias=False)
      )
    )
  )
  (encoder): Encoder(
    (feature_projection): FeatureProjection(
      (layer_norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (projection): Linear(in_features=512, out_features=768, bias=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (transformer): Transformer(
      (pos_conv_embed): ConvolutionalPositionalEmbedding(
        (conv): ParametrizedConv1d(
          768, 768, kernel_size=(128,), stride=(1,), padding=(64,), groups=16
          (parametriza

In [271]:
def cut_encoding(
    encoding: torch.Tensor, word_boundaries: list[float], hop_ms: int = 20
) -> torch.Tensor:
    hop_size = hop_ms / 1000  # seconds per frame
    start_frame = int(word_boundaries[0] / hop_size)
    end_frame = int(word_boundaries[1] / hop_size)
    return encoding[start_frame:end_frame]

In [272]:
import numpy as np
from scipy.spatial.distance import cdist


def collapse_runs(seq):
    collapsed = []
    prev = None
    for code in seq:
        if code != prev:
            collapsed.append(int(code))
            prev = code
    return collapsed


def greedy_segment(sequence: np.ndarray, codebook: np.ndarray) -> np.ndarray:
    dists = cdist(sequence, codebook)
    frame_codes = np.argmin(dists, axis=1)

    segments = []
    prev_code = frame_codes[0]

    for t in range(1, len(frame_codes)):
        if frame_codes[t] != prev_code:
            segments.append(int(prev_code))
            prev_code = frame_codes[t]

    segments.append(int(prev_code))
    return segments

In [273]:
from typing import Tuple

import numba
import numpy as np
import scipy.spatial.distance as distance


def segment(
    sequence: np.ndarray, codebook: np.ndarray, gamma: float
) -> Tuple[np.ndarray, np.ndarray]:
    """Group speech representations into phone-like segments.

    Args:
        sequence (NDArray): speech representations of shape (T, D) where T is the number of frames and D is the feature dimension.
        codebook (NDArray): cluster centriods of the discrete units of shape (K, D) where K is the number of codes.
        gamma float: Duration regularizer weight. Larger values result in a coarser segmentation.

    Returns:
        NDArray[int]: list of discrete units representing each segment sound types of shape (N,).
        NDArray[int]: list of segment boundaries of shape (N+1,).
    """
    dists = distance.cdist(sequence, codebook).astype(np.float32)
    alpha, P = _segment(dists, gamma)
    return _backtrack(alpha, P)


@numba.njit()
def _segment(dists, gamma):
    T, K = dists.shape

    alpha = np.zeros(T + 1, dtype=np.float32)
    P = np.zeros((T + 1, 2), dtype=np.int32)
    D = np.zeros((T, T, K), dtype=np.float32)

    for t in range(T):
        for k in range(K):
            D[t, t, k] = dists[t, k]
    for t in range(T):
        for s in range(t + 1, T):
            D[t, s, :] = D[t, s - 1, :] + dists[s, :] - gamma

    for t in range(T):
        alpha[t + 1] = np.inf
        for s in range(t + 1):
            k = np.argmin(D[s, t, :])
            alpha_min = alpha[s] + D[s, t, k]
            if alpha_min < alpha[t + 1]:
                P[t + 1, :] = s, k
                alpha[t + 1] = alpha_min
    return alpha, P


@numba.njit()
def _backtrack(alpha, P):
    rhs = len(alpha) - 1
    segments = []
    boundaries = [rhs]
    while rhs != 0:
        lhs, code = P[rhs, :]
        segments.append(code)
        boundaries.append(lhs)
        rhs = lhs
    segments.reverse()
    boundaries.reverse()
    return segments

In [274]:
import joblib

kmeans_path = f"models/kmeans_{model_name}_layer{layer}_k100.pkl"
kmeans = joblib.load(kmeans_path)

In [275]:
import numpy as np
import itertools
import editdistance


def normalized_edit_distance(sequences):
    pairs = list(itertools.combinations(sequences, 2))
    if not pairs:
        return 0.0

    dists = []
    for a, b in pairs:
        dist = editdistance.eval(a, b)
        norm = dist / max(len(a), len(b))
        dists.append(norm)

    return np.mean(dists)


In [335]:
codes = []
labels = []
for path in paths:
    wav_df = align_df[align_df["filename"] == path.stem]

    waveform, sr_loaded = torchaudio.load(str(path))
    if sr_loaded != 16000:
        waveform = torchaudio.functional.resample(waveform, sr_loaded, 16000)

    with torch.inference_mode():
        features, _ = model.extract_features(waveform, num_layers=layer)
        encoding = features[layer - 1].squeeze().cpu().numpy()

    for w in range(1, max(wav_df["word_id"]) + 1):
        word_df = wav_df[wav_df["word_id"] == w]

        text = str(word_df["text"].values[0])
        phones = tuple(str(word_df["phones"].values[0]).split(","))
        if text == "nan":
            continue

        start = float(word_df["word_start"].values[0])
        end = float(word_df["word_end"].values[0])
        word_encoding = cut_encoding(encoding, [start, end])
        if len(word_encoding) == 0:
            continue
        word_codes = kmeans.predict(word_encoding)
        labels.append(phones)

        codes.append(segment(word_encoding, kmeans.cluster_centers_, 1.0))

        if len(codes) >= 500:
            break

        if len(codes) % 100 == 0:
            print(f"{len(codes)}")

    if len(codes) >= 500:
        break


100
200
300
400


In [336]:
print(len(codes))

500


In [None]:
import numpy as np


def calc_edit_distance(a, b):
    dist = editdistance.eval(a, b)
    norm = dist / max(len(a), len(b))

    return norm

In [337]:
dists = []
rows = []
cols = []
for i in range(len(codes)):
    for j in range(i, len(codes)):
        dists.append(calc_edit_distance(codes[i], codes[j]))
        rows.append(i)
        cols.append(j)

In [338]:
print(np.mean(dists))

0.9538479055770988


In [339]:
rows = np.array(rows)
cols = np.array(cols)
dists = np.array(dists)

In [340]:
import igraph as ig

g = ig.Graph()
g.add_vertices(len(codes))
mask = dists < 0.4
edges = list(zip(rows[mask], cols[mask]))
weights = dists[mask].astype(float)
weights = np.where(weights > 0, weights, 1e-10).tolist()

if edges:
    g.add_edges(edges)
    g.es[-len(weights) :].set_attribute_values("weight", weights)

In [341]:
import leidenalg as la

partition = la.find_partition(
    g,
    la.CPMVertexPartition,
    resolution_parameter=0.01,
    weights="weight",
    seed=42,
)

In [None]:
dists = []
for row in partition:
    clust = []
    for el in row:
        phones = "-".join(labels[el])
        print(phones, end=" ")
        clust.append(labels[el])

    dist = normalized_edit_distance(clust)
    dists.append(dist)
    print()
    print()

print(f"{np.mean(dists) * 100:.3f}%")

('DH', 'AH1') ('DH', 'AH1') ('DH', 'AH0') ('DH', 'AH0') ('DH', 'AH1') ('DH', 'AH1') ('DH', 'AH0') ('DH', 'AH1') ('DH', 'AH0') ('DH', 'EH1', 'R') ('DH', 'AH0') ('DH', 'AH0') ('DH', 'AH0') ('DH', 'AH0') ('DH', 'AH0') ('DH', 'IY0') ('DH', 'AH0') ('DH', 'AH1') ('DH', 'EH1', 'R') ('DH', 'IY0') ('DH', 'AH0') ('DH', 'AH0', 'T') ('DH', 'AH0') ('DH', 'AH0') ('DH', 'AH0') 

('W', 'AH1', 'Z') ('W', 'AH1', 'Z') ('W', 'AH0', 'Z') ('W', 'AH1', 'Z') ('W', 'AH0', 'Z') ('W', 'AH0', 'Z') ('W', 'IH1', 'CH') ('W', 'AH0', 'Z') ('W', 'AH0', 'Z') ('W', 'AO1', 'Z') ('W', 'AH0', 'Z') ('W', 'AH0', 'Z') ('W', 'IH0', 'TH') ('W', 'AO1', 'Z') ('W', 'AH0', 'Z') ('W', 'AH0', 'Z') ('W', 'AH0', 'Z') ('W', 'AH0', 'Z') ('W', 'IH1', 'TH') ('W', 'AH1', 'Z') 

('AH0',) ('AH1', 'V') ('AH0', 'V') ('AH0', 'V') ('AH0', 'V') ('AH1', 'V') ('AH0', 'V') ('AH0', 'V') ('AH1', 'V') ('AH0', 'V') ('AH1', 'P') ('AH1', 'V') 

('AY1',) ('AY1',) ('AY1',) ('AY1',) ('AY1',) ('ER0',) ('AY1',) ('AY1',) ('AY1',) ('HH', 'AW1') ('AY1',) 

('AH0',)