In [1]:
# pretrained substitute
# Encodec as a replacement for SoundStream, and MERT as a replacement for w2v-BERT.
# idea from https://github.com/zhvng/open-musiclm

In [2]:
!pip install transformers torch datasets

# download audiolm_pytorch manually so i can inject print statements
# !pip uninstall -y audiolm_pytorch
import urllib.request
import os
import zipfile
if not os.path.isfile("audiolm-pytorch.zip"):
  urllib.request.urlretrieve("https://github.com/lucidrains/audiolm-pytorch/archive/refs/heads/main.zip", "audiolm-pytorch.zip")
if not os.path.isdir("audiolm-pytorch"):
  with zipfile.ZipFile("audiolm-pytorch.zip", 'r') as zip_ref:
    zip_ref.extractall("audiolm-pytorch")
# !mv audiolm-pytorch/audiolm-pytorch-personal_hacks/audiolm_pytorch .
!rm -rf audiolm-pytorch # not the one with underscore which is the actual library

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
# # semantic- MERT
# # https://huggingface.co/m-a-p/MERT-v0
# # MERT-v0 is a completely unsupervised model trained on 1000 hour music audios.

# from transformers import Wav2Vec2Processor, HubertModel
# import torch
# from torch import nn
# from datasets import load_dataset

# # load demo audio and set processor
# dataset = load_dataset("hf-internal-testing/librispeech_asr_demo", "clean", split="validation")
# dataset = dataset.sort("id")
# sampling_rate = dataset.features["audio"].sampling_rate
# processor = Wav2Vec2Processor.from_pretrained("facebook/hubert-large-ls960-ft")

# # loading our model weights
# model = HubertModel.from_pretrained("m-a-p/MERT-v0")

# # audio file is decoded on the fly
# inputs = processor(dataset[0]["audio"]["array"], sampling_rate=sampling_rate, return_tensors="pt")
# with torch.no_grad():
#     outputs = model(**inputs, output_hidden_states=True)

# # take a look at the output shape, there are 13 layers of representation
# # each layer performs differently in different downstream tasks, you should choose empirically
# all_layer_hidden_states = torch.stack(outputs.hidden_states).squeeze()
# print(all_layer_hidden_states.shape) # [13 layer, 292 timestep, 768 feature_dim]

# # # for utterance level classification tasks, you can simply reduce the representation in time
# # time_reduced_hidden_states = all_layer_hidden_states.mean(-2)
# # print(time_reduced_hidden_states.shape) # [13, 768]

# # # you can even use a learnable weighted average representation
# # aggregator = nn.Conv1d(in_channels=13, out_channels=1, kernel_size=1)
# # weighted_avg_hidden_states = aggregator(time_reduced_hidden_states.unsqueeze(0)).squeeze()
# # print(weighted_avg_hidden_states.shape) # [768]


In [4]:
# original semantic transformer
import torch
from audiolm_pytorch import HubertWithKmeans, SemanticTransformer, SemanticTransformerTrainer
import os
import urllib

# hubert checkpoints can be downloaded at
# https://github.com/facebookresearch/fairseq/tree/main/examples/hubert

hubert_ckpt = 'hubert/hubert_base_ls960.pt'
hubert_quantizer = f'hubert/hubert_base_ls960_L9_km500.bin' # listed in row "HuBERT Base (~95M params)", column Quantizer
if not os.path.isdir("hubert"):
  os.makedirs("hubert")
if not os.path.isfile(hubert_ckpt):
  hubert_ckpt_download = f"https://dl.fbaipublicfiles.com/{hubert_ckpt}"
  urllib.request.urlretrieve(hubert_ckpt_download, f"./{hubert_ckpt}")
if not os.path.isfile(hubert_quantizer):
  hubert_quantizer_download = f"https://dl.fbaipublicfiles.com/{hubert_quantizer}"
  urllib.request.urlretrieve(hubert_quantizer_download, f"./{hubert_quantizer}")

wav2vec = HubertWithKmeans(
    checkpoint_path = './hubert/hubert_base_ls960.pt',
    kmeans_path = './hubert/hubert_base_ls960_L9_km500.bin'
)

semantic_transformer = SemanticTransformer(
    num_semantic_tokens = wav2vec.codebook_size,
    dim = 1024,
    depth = 6
).cuda()

https://scikit-learn.org/stable/modules/model_persistence.html#security-maintainability-limitations


In [5]:
# import wave
# import struct

# # dataset = load_dataset("hf-internal-testing/librispeech_asr_demo", "clean", split="validation")
# # dataset[0]["audio"]["array"]
# sampling_rate = dataset.features["audio"].sampling_rate

# def save_wav(file_name, audio, sample_rate=sampling_rate):
#   # Open up a wav file
#   wav_file=wave.open(file_name,"w")
#   # wav params
#   nchannels = 1
#   sampwidth = 2
#   # 44100 is the industry standard sample rate - CD quality.  If you need to
#   # save on file size you can adjust it downwards. The stanard for low quality
#   # is 8000 or 8kHz.
#   nframes = len(audio)
#   comptype = "NONE"
#   compname = "not compressed"
#   wav_file.setparams((nchannels, sampwidth, sample_rate, nframes, comptype, compname))
#   # WAV files here are using short, 16 bit, signed integers for the 
#   # sample size.  So we multiply the floating point data we have by 32767, the
#   # maximum value for a short integer.  NOTE: It is theortically possible to
#   # use the floating point -1.0 to 1.0 data directly in a WAV file but not
#   # obvious how to do that using the wave module in python.
#   for sample in audio:
#     wav_file.writeframes(struct.pack('h', int( sample * 32767.0 )))
#   wav_file.close()
#   return
# save_wav("test.wav", dataset[1]["audio"]["array"])

In [6]:
from audiolm_pytorch import SemanticTransformerWrapper
import numpy as np

# in case not already loaded
from datasets import load_dataset
# load demo audio and set processor
dataset = load_dataset("hf-internal-testing/librispeech_asr_demo", "clean", split="validation")

batch_size = 2
# sample data is 77040 samples at 16kHz sampling rate
# just reshape it here so batch size for prime_wave is effectively 1
samples = np.array([dataset[1]["audio"]["array"], dataset[1]["audio"]["array"]])
prime_wave = torch.tensor(samples).reshape(2, 77040).cuda()
# raise AssertionError(prime_wave.shape)
max_length = 2048
semantic = SemanticTransformerWrapper(
            wav2vec = wav2vec,
            transformer = semantic_transformer,
            audio_conditioner = None,
            unique_consecutive = True
        ).cuda()
semantic_tokens = semantic.generate(
            text_embeds = None, # no text, it's not musicLM
            batch_size = batch_size,
            prime_wave = prime_wave,
            max_length = max_length
        )
# semantic.device # should be cuda



embed.keys(): dict_keys(['x', 'padding_mask', 'features'])
embed['x'] shape: torch.Size([2, 240, 768]), embed['features'].shape: torch.Size([2, 240, 768])
wav_input shape: torch.Size([2, 77040]), embed shape: torch.Size([480, 768]), packed_shape: [torch.Size([2, 240])]
codebook_indices before unpacking: torch.Size([480])
codebook_indices after unpacking: torch.Size([2, 240])
ids.shape: torch.Size([2, 240]) and prime_wave True


generating semantic:  17%|█▋        | 324/1905 [00:10<00:52, 30.37it/s]


KeyboardInterrupt: ignored

In [None]:
# ?semantic.wav2vec
# torch.tensor(dataset[1]["audio"]["array"]).cuda().device
# semantic_tokens.shape
# semantic_tokens[:,-1] # unfortunately doesn't seem to be the EOS we're looking for
# semantic_tokens[:, 0]

In [None]:
# dataset[1]["audio"]["array"].shape
# # len(dataset[1]["audio"]["array"]) # 77040
# # dataset.features["audio"].sampling_rate # 16000
# # so 4.815 seconds of audio

In [None]:
torch.empty((batch_size, 0), dtype = torch.long)