In [1]:
from glob import glob
import pandas as pd
import json
import os
import torch
import IPython.display as ipd

torch.set_grad_enabled(False)

from transformers import AutoTokenizer, AddedToken
from streaming import MDSWriter
from streaming.base.format.mds.encodings import Encoding, _encodings
from streaming import LocalDataset
import numpy as np
from tqdm import tqdm
from multiprocess import Pool
import itertools

def chunks(l, n):
    for i in range(0, len(l), n):
        yield (l[i: i + n], i // n)

def multiprocessing(strings, function, cores=6, returned=True):
    df_split = chunks(strings, len(strings) // cores)
    pool = Pool(cores)
    pooled = pool.map(function, df_split)
    pool.close()
    pool.join()

    if returned:
        return list(itertools.chain(*pooled))

class UInt32(Encoding):
    def encode(self, obj) -> bytes:
        return obj.tobytes()

    def decode(self, data: bytes):
        return np.frombuffer(data, np.uint32)

_encodings['uint32'] = UInt32

columns = {
    'input_ids': 'uint32',
    'position_ids': 'uint32',
    'attention_mask': 'uint32',
    'audio': 'str',
    'text': 'str'
}
hashes = 'sha1', 'xxh64'

def new_path(f):
    splitted = f.split('/')
    base_folder = splitted[0] + '_trim'
    splitted = '/'.join([base_folder] + splitted[1:])
    return splitted

def new_path_neucodec(f):
    splitted = f.split('/')
    folder = f.split('/')[0]
    folder = folder + '_neucodec'
    new_f = os.path.join(folder, '/'.join(splitted[1:]))
    new_f = new_f.replace('.mp3', '.json').replace('.wav', '.json')
    return new_f

  import pynvml  # type: ignore[import]


In [2]:
from datasets import load_dataset

ds = load_dataset("Scicom-intl/Malaysian-Emilia")

In [3]:
rows = ds['train'].to_list()

In [4]:
reject = load_dataset("Scicom-intl/Malaysian-Emilia", "audio_length_ratio_text")

In [5]:
reject_audio = set()

for i in tqdm(range(len(reject['train']))):
    if not reject['train'][i]['audio_length_ratio_text_accept']:
        reject_audio.add(reject['train'][i]['audio_filename'])

len(reject_audio)

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1572014/1572014 [00:50<00:00, 30906.75it/s]


21291

In [6]:
tokenizer = AutoTokenizer.from_pretrained('Qwen/Qwen3-1.7B-Base')
extra = [AddedToken('<|speech_start|>')]
for i in range(65536):
    extra.append(AddedToken(f'<|s_{i}|>'))
tokenizer.add_tokens(extra)

65537

In [7]:
import gc

def collator(batch, batch_position_ids):
    input_ids = []
    position_ids = []
    masks = []
    for i in range(len(batch)):
        l = len(batch[i])
        input_ids.extend(batch[i])
        position_ids.extend(batch_position_ids[i])
        masks.append(l)
    
    return {
        'input_ids': np.array(input_ids).astype(np.uint32),
        'position_ids': np.array(position_ids).astype(np.uint32),
        'attention_mask': np.array(masks).astype(np.uint32),
        'audio': '',
        'text': '',
    }

def slice_and_balance(nested_list, size):
    first = []
    balance = []
    current_size = 0

    for sublist in nested_list:
        if current_size < size:
            remaining_space = size - current_size
            if len(sublist) <= remaining_space:
                first.append(sublist)
                current_size += len(sublist)
            else:
                first.append(sublist[:remaining_space])
                balance.append(sublist[remaining_space:])
                current_size = size
        else:
            balance.append(sublist)
    
    return first, balance

In [8]:
import time

sequence_length = 1024 * 10
def loop(files, block_size = sequence_length):
    rows, index = files
    out_root = f'gfs/01be5b33/malaysian-emilia/tokenized-{index}'
    os.system(f'rm -rf {out_root}')
    count = 0
    temp = []
    position_ids = []
    last_block, last_position_block = None, None
    with MDSWriter(out=out_root, columns=columns, compression=None, hashes=hashes) as out:
        for row in tqdm(rows):

            if 'malaysian-chinese' in row['reference_audio'].split('/')[0]:
                continue

            if row['reference_audio'] in reject_audio:
                continue

            if row['target_audio'] in reject_audio:
                continue

            try:
                with open(new_path_neucodec(new_path(row['reference_audio']))) as fopen:
                    left = json.load(fopen)
            except:
                continue
            
            try:
                with open(new_path_neucodec(new_path(row['target_audio']))) as fopen:
                    right = json.load(fopen)
            except:
                continue

            left_text = row['reference_text'].strip()
            right_text = row['target_text'].strip()

            if len(left_text.split()) > len(left):
                continue

            if len(right_text.split()) > len(right):
                continue
            
            left_token = ''.join([f'<|s_{t}|>' for t in left])
            right_token = ''.join([f'<|s_{t}|>' for t in right])
            
            left_prompt = f'<|im_start|>{left_text}<|speech_start|>{left_token}<|im_end|>'
            right_prompt = f'<|im_start|>{right_text}<|speech_start|>{right_token}<|im_end|>'

            prompt = left_prompt + right_prompt
            
            outputs = tokenizer(prompt, add_special_tokens = False)
            position = range(len(outputs['input_ids']))
            length = len(outputs['input_ids'])
            
            if count + length > block_size:
                o = collator(temp, position_ids)
                if o['input_ids'].shape[0] > 0:
                    out.write(o)
                temp = [outputs['input_ids']]
                position_ids = [position]
                count = length
                
            else:
                temp.append(outputs['input_ids'])
                position_ids.append(range(len(outputs['input_ids'])))
                count += len(outputs['input_ids'])
        
        if len(temp):
            o = collator(temp, position_ids)
            if o['input_ids'].shape[0] > 0:
                out.write(o)

In [9]:
multiprocessing(rows, loop, cores = 40, returned = False)

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 216615/216615 [00:00<00:00, 1915182.48it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 216615/216615 [00:00<00:00, 3512971.04it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 216615/216615 [00:00<00:00, 3196201.92it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 216615/216615 [00:00<00:00, 2947069.67it/s]
100%|███████████████████████

In [10]:
folders = sorted(glob('gfs/01be5b33/malaysian-emilia/tokenized-*'), key = lambda x: int(x.split('-')[-1]))

In [11]:
!rm -rf gfs/01be5b33/multipacking-malaysian-emilia

In [12]:
with MDSWriter(out='gfs/01be5b33/multipacking-malaysian-emilia', columns=columns, compression=None, hashes=hashes) as out:
    for f in folders:
        try:
            dataset = LocalDataset(local=f)
            for i in tqdm(range(len(dataset))):
                out.write(dataset[i])
        except Exception as e:
            print(e)
            pass

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 24625/24625 [00:07<00:00, 3162.02it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 24939/24939 [00:07<00:00, 3179.77it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 24769/24769 [00:08<00:00, 3078.72it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 24379/24379 [00:07<00:00, 3164.13it/s]
100%|███████████████████████

index -1 is out of bounds for axis 0 with size 0
index -1 is out of bounds for axis 0 with size 0
index -1 is out of bounds for axis 0 with size 0
index -1 is out of bounds for axis 0 with size 0
index -1 is out of bounds for axis 0 with size 0
index -1 is out of bounds for axis 0 with size 0
index -1 is out of bounds for axis 0 with size 0
index -1 is out of bounds for axis 0 with size 0
index -1 is out of bounds for axis 0 with size 0


100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 22849/22849 [00:07<00:00, 3127.63it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 28684/28684 [00:08<00:00, 3189.83it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 28358/28358 [00:08<00:00, 3155.06it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 28868/28868 [00:08<00:00, 3215.06it/s]
100%|███████████████████████

In [13]:
dataset = LocalDataset('gfs/01be5b33/multipacking-malaysian-emilia')

In [14]:
(len(dataset) * 10240) / 1e9

7.94181632

In [17]:
len(dataset)

775568

In [19]:
# !hf upload Scicom-intl/Malaysian-Emilia-multipacking-10k gfs/01be5b33/multipacking-malaysian-emilia \
# --repo-type=dataset --private