In [8]:
import librosa
import numpy as np
import pandas as pd
import scipy
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline

from datasets import load_dataset
import torch
from IPython.display import Audio
from jiwer import wer

from torch.nn import CosineSimilarity


In [2]:
def anonym(freq, samples, winLengthinms=20, shiftLengthinms=10, lp_order=20, mcadams=0.8):
    """ output_wav = anonym(freq, samples, winLengthinms=20, shiftLengthinms=10, lp_order=20, mcadams=0.8)

    Anonymization using McAdam coefficient.

    :input: freq, int, sampling rate in Hz, 16000 in this case
    :input: samples, np.array, (L, 1) where L is the length of the waveform
    :input: winLengthinms, int, analysis window length (ms), default 20 ms
    :input: shiftLengthinms, int, window shift (ms), default 10 ms
    :input: lp_order, int, order of LP analysis, default 20
    :input: mcadams, float, alpha coefficients, default 0.8

    :output: output_wav, np.array, same shape as samples
    """

    eps = np.finfo(np.float32).eps
    samples = samples + eps

    # window length and shift (in sampling points)
    winlen = np.floor(winLengthinms * 0.001 * freq).astype(int)
    shift = np.floor(shiftLengthinms * 0.001 * freq).astype(int)
    length_sig = len(samples)

    # fft processing parameters
    NFFT = 2 ** (np.ceil((np.log2(winlen)))).astype(int)
    # anaysis and synth window which satisfies the constraint
    wPR = np.hanning(winlen)
    K = np.sum(wPR) / shift
    win = np.sqrt(wPR / K)
    # number of of complete frames
    Nframes = 1 + np.floor((length_sig - winlen) / shift).astype(int)

    # Buffer for output signal
    # this is used for overlap - add FFT processing
    sig_rec = np.zeros([length_sig])

    # For each frame
    for m in np.arange(1, Nframes):

        # indices of the mth frame
        index = np.arange(m * shift, np.minimum(m * shift + winlen, length_sig))

        # windowed mth frame (other than rectangular window)
        frame = samples[index] * win

        # get lpc coefficients
        a_lpc = librosa.core.lpc(frame + eps, order=lp_order)

        # get poles
        poles = scipy.signal.tf2zpk(np.array([1]), a_lpc)[1]

        #index of imaginary poles
        ind_imag = np.where(np.isreal(poles) == False)[0]

        #index of first imaginary poles
        ind_imag_con = ind_imag[np.arange(0, np.size(ind_imag), 2)]

        # here we define the new angles of the poles, shifted accordingly to the mcadams coefficient
        # values >1 expand the spectrum, while values <1 constract it for angles>1
        # values >1 constract the spectrum, while values <1 expand it for angles<1
        # the choice of this value is strongly linked to the number of lpc coefficients
        # a bigger lpc coefficients number constraints the effect of the coefficient to very small variations
        # a smaller lpc coefficients number allows for a bigger flexibility
        new_angles = np.angle(poles[ind_imag_con]) ** mcadams
        #new_angles = np.angle(poles[ind_imag_con])**path[m]

        # make sure new angles stay between 0 and pi
        new_angles[np.where(new_angles >= np.pi)] = np.pi
        new_angles[np.where(new_angles <= 0)] = 0

        # copy of the original poles to be adjusted with the new angles
        new_poles = poles
        for k in np.arange(np.size(ind_imag_con)):
            # compute new poles with the same magnitued and new angles
            new_poles[ind_imag_con[k]] = np.abs(poles[ind_imag_con[k]]) * np.exp(1j * new_angles[k])
            # applied also to the conjugate pole
            new_poles[ind_imag_con[k] + 1] = np.abs(poles[ind_imag_con[k] + 1]) * np.exp(-1j * new_angles[k])

        # recover new, modified lpc coefficients
        a_lpc_new = np.real(np.poly(new_poles))

        # get residual excitation for reconstruction
        res = scipy.signal.lfilter(a_lpc,np.array(1),frame)

        # reconstruct frames with new lpc coefficient
        frame_rec = scipy.signal.lfilter(np.array([1]),a_lpc_new,res)
        frame_rec = frame_rec * win
        outindex = np.arange(m * shift, m * shift + len(frame_rec))

        # overlap add
        sig_rec[outindex] = sig_rec[outindex] + frame_rec

    # sig_rec = (sig_rec / np.max(np.abs(sig_rec)) * (np.iinfo(np.int16).max - 1)).astype(np.int16)
    return sig_rec

In [48]:
processor_1 = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")
model_1 = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts")
embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation")
speaker_embeddings = torch.tensor(embeddings_dataset[7306]["xvector"]).unsqueeze(0)
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")

In [49]:
def text_to_speech(text):
  inputs = processor_1(text=text, return_tensors="pt")
  speech = model_1.generate_speech(inputs["input_ids"], speaker_embeddings, vocoder=vocoder)
  return speech


In [50]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

model_id = "openai/whisper-large-v3"

model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id, torch_dtype=torch_dtype,  use_safetensors=True
)
model.to(device)

processor = AutoProcessor.from_pretrained(model_id)

pipe = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    max_new_tokens=128,
    chunk_length_s=30,
    batch_size=16,
    return_timestamps=True,
    torch_dtype=torch_dtype,
    device=device,
)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [10]:
def speech_to_text(speech):
  result = pipe(speech)
  return result["text"]

In [None]:
# anonimus = anonym(sample_rate, amplitudes.numpy(), mcadams=0.8)
# Audio(anonimus, rate=16000)
# torch.save(anonimus,"/content/sample_data/after.pt")

In [None]:
# result = speech_to_text(anonimus)

In [43]:

reference = "Surrounding yourself with people who are on a higher level than you can instill the discipline required for you to change and become better."
print(reference)
speech = text_to_speech(reference)
anonimus = anonym( samples=speech.numpy(), mcadams=0.8, freq=16000)
hypothesis = speech_to_text(anonimus)
print(hypothesis)
error = wer(reference, hypothesis)
print(error)

Surrounding yourself with people who are on a higher level than you can instill the discipline required for you to change and become better.
 Surrounding yourself with people who are on a higher level, then you can instill the discipline required for you to change and become better.
0.08333333333333333


In [12]:
Audio(speech, rate=16000)

In [18]:
Audio(anonimus, rate=16000)

In [44]:
embendings = torch.hub.load('RF5/simple-speaker-embedding', 'convgru_embedder')
embendings.eval()

Downloading: "https://github.com/RF5/simple-speaker-embedding/zipball/master" to /root/.cache/torch/hub/master.zip
Downloading: "https://github.com/RF5/simple-speaker-embedding/releases/download/v1.0/convgru_ckpt_00700000_strip.pt" to /root/.cache/torch/hub/checkpoints/convgru_ckpt_00700000_strip.pt
100%|██████████| 121M/121M [00:00<00:00, 219MB/s]


ConvGRUEmbedder(
  (model): ConvRNNEmbedder(
    (conv_encoder): ConvEncoder(
      (conv_layers): ModuleList(
        (0): Sequential(
          (0): Conv1d(1, 512, kernel_size=(10,), stride=(5,), bias=False)
          (1): Dropout(p=0.0, inplace=False)
          (2): GroupNorm(512, 512, eps=1e-05, affine=True)
          (3): GELU(approximate='none')
        )
        (1-4): 4 x Sequential(
          (0): Conv1d(512, 512, kernel_size=(3,), stride=(2,), bias=False)
          (1): Dropout(p=0.0, inplace=False)
          (2): GELU(approximate='none')
        )
        (5-6): 2 x Sequential(
          (0): Conv1d(512, 512, kernel_size=(2,), stride=(2,), bias=False)
          (1): Dropout(p=0.0, inplace=False)
          (2): GELU(approximate='none')
        )
      )
    )
    (rnn): GRU(512, 768, num_layers=3, batch_first=True, dropout=0.3, bidirectional=True)
    (head): Linear(in_features=1536, out_features=256, bias=True)
  )
)

In [45]:
original = embendings(speech[None])
anon = embendings(torch.tensor(anonimus, dtype=torch.float)[None])

In [46]:
CosineSimilarity()(original, anon)

tensor([0.3310], device='cuda:0', grad_fn=<SumBackward1>)

In [47]:
text_1 = "Anonimization has the goal of manipulating speech signals in order to degrade the reliability of automatic approaches to speaker recognition, \
while preserving other aspects of speech, such \
as those relating to intelligibility and naturalness. \
It’s important to note that voice anonymization \
involves altering not only the speaker’s voice but \
also linguistic content, extralinguistic traits, and \
background sounds that might reveal the speaker’s \
identity."
speech_1 = text_to_speech(text_1)
original_1 = embendings(speech_1[None])
CosineSimilarity()(original, original_1)

tensor([0.8741], device='cuda:0', grad_fn=<SumBackward1>)

In [None]:
CosineSimilarity()(anon, original_1)

tensor([0.2093], device='cuda:0', grad_fn=<SumBackward1>)

In [62]:
texts= ["Building a wall on the U.S.-Mexico border will take literally years.",
         "Wisconsin is on pace to double the number of layoffs this year.",
         "With its easily transported, thick-skinned and sweet-tasting fruit, the Gros Michel banana plant dominated the plantations of Central America",
         "Growing practices in South East Asia haven’t helped matters.",
         "Sleeping only seven hours a night, Kilian Jornet seems almost superhuman.",
         "For more than two hundred years the pessimists have been winning the public debate.",
         "Says that Tennessee law requires that schools receive half of proceeds -- $31 million per year -- from a half-cent increase in the Shelby County sales tax.",
         "We know that more than half of Hillary Clintons meetings while she was secretary of state were given to major contributors to the Clinton Foundation.",
         "In an attempt to inject some positivity into their feedback, many managers rely on sandwiching negative feedback between two positive comments.",
         "But according to the authors, the world isn't as bad as we think.",
         "Says the unemployment rate for college graduates is 4.4 percent and over 10 percent for noncollege-educated.",
         "Each year, 18,000 people die in America because they don't have health care."]

In [63]:
avg_cosine_distance = 0

speech_emb = []
for text in texts:
    text_speech = text_to_speech(text)
    speech_emb.append(embendings(text_speech[None]))

for first_emb in speech_emb:
    for second_emb in speech_emb:
        avg_cosine_distance += CosineSimilarity()(first_emb, second_emb)

avg_cosine_distance /= (len(texts) ** 2)
print("Average cosine distance between all pairs:", avg_cosine_distance)

Average cosine distance between all pairs: tensor([0.9043], device='cuda:0', grad_fn=<DivBackward0>)


In [5]:
from datasets import load_from_disk

In [6]:
path_to_dataset = '../benchmark/commonvoice'

input_data = load_from_disk(path_to_dataset)

In [9]:
audio = next(iter(input_data))['audio']
anon = anonym(audio['sampling_rate'], np.array(audio['array']))

In [10]:
%timeit anonym(audio['sampling_rate'], np.array(audio['array']))

388 ms ± 6.78 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
stats

{'cossim_mean': 0.7339200650390826,
 'cossim_std': 0.10892950603378224,
 'wer_mean': 0.19881861812554882,
 'wer_std': 0.28556099929587375}

In [20]:
min(audio['array'])

-0.40841078758239746

In [11]:
import scipy.io.wavfile as wavf

In [6]:
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline

device = "cuda" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

model_id = "openai/whisper-large-v3"

model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id, torch_dtype=torch_dtype,  use_safetensors=True
)
model.to(device)

processor = AutoProcessor.from_pretrained(model_id)

pipe_stt = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    max_new_tokens=128,
    chunk_length_s=30,
    batch_size=16,
    return_timestamps=True,
    torch_dtype=torch_dtype,
    device=device,
)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [7]:
from jiwer import wer

In [24]:
import librosa

embendings = torch.hub.load('RF5/simple-speaker-embedding', 'convgru_embedder')
embendings.eval();

def get_emb(path, sr):
    _, wav = wavf.read(path)
    wav = torch.from_numpy(wav).float()
    return embendings(wav[None])

Using cache found in /home/ansafronov/.cache/torch/hub/RF5_simple-speaker-embedding_master


In [9]:
from tqdm.notebook import tqdm

In [13]:
audio

NameError: name 'audio' is not defined

In [25]:
from torch.nn import CosineSimilarity
import pandas as pd


path_to_dataset = '../benchmark/commonvoice'

def get_stats_lpc(model):
    input_data = load_from_disk(path_to_dataset)

    output_data = []
    for idx, inp in enumerate(tqdm(input_data)):
        file_name = inp['path'].split('/')[-1]
        audio = inp['audio']
        anon = model(audio['sampling_rate'], np.array(audio['array']))
        anon = np.clip(anon, -1, 1)

        in_path = '../output/audio/source/' + file_name
        wavf.write(in_path, audio['sampling_rate'], np.array(audio['array']))
        out_path = '../output/audio/lpc/' + file_name
        wavf.write(out_path, audio['sampling_rate'], anon)

        emb_in = get_emb(in_path, sr=audio['sampling_rate'])
        emb_out = get_emb(out_path, sr=audio['sampling_rate'])

        text_reconstructed = pipe_stt(out_path)['text']
        text = inp['sentence']

        output_data.append({
            'cossim': CosineSimilarity()(emb_in, emb_out).cpu()[0].item(),
            'wer': wer(text, text_reconstructed)
        })
            
    data = pd.DataFrame(output_data)
    return {
        'cossim_mean': data['cossim'].mean(),
        'cossim_std': data['cossim'].std(ddof=1),
        'wer_mean': data['wer'].mean(),
        'wer_std': data['wer'].std(ddof=1)
    }

stats = get_stats_lpc(anonym)

  0%|          | 0/101 [00:00<?, ?it/s]

Whisper did not predict an ending timestamp, which can happen if audio is cut off in the middle of a word. Also make sure WhisperTimeStampLogitsProcessor was used during generation.
Whisper did not predict an ending timestamp, which can happen if audio is cut off in the middle of a word. Also make sure WhisperTimeStampLogitsProcessor was used during generation.
Whisper did not predict an ending timestamp, which can happen if audio is cut off in the middle of a word. Also make sure WhisperTimeStampLogitsProcessor was used during generation.
Whisper did not predict an ending timestamp, which can happen if audio is cut off in the middle of a word. Also make sure WhisperTimeStampLogitsProcessor was used during generation.
Whisper did not predict an ending timestamp, which can happen if audio is cut off in the middle of a word. Also make sure WhisperTimeStampLogitsProcessor was used during generation.
Whisper did not predict an ending timestamp, which can happen if audio is cut off in the m

In [26]:
stats

{'cossim_mean': 0.7339200650390826,
 'cossim_std': 0.10892950603378224,
 'wer_mean': 0.19881861812554882,
 'wer_std': 0.28556099929587375}

In [None]:
torch.cuda.empty_cache()

In [None]:
get_stats_lpc(anonym)