# MOSS Tokenization in Colab

This notebook converts audio files (`mp3/wav/flac/...`) into discrete MOSS tokens for retrieval training.

Pipeline in this notebook:
1. Clone project repo
2. Install dependencies
3. Upload audio files (or use Google Drive path)
4. Run tokenization (`audio -> tokens`)
5. Validate output token files
6. Build train/val/test split files

## 0) Runtime

In Colab: `Runtime -> Change runtime type -> GPU` (optional but recommended).

In [None]:
import platform
print('Python:', platform.python_version())

try:
    import torch
    print('Torch:', torch.__version__)
    print('CUDA available:', torch.cuda.is_available())
    if torch.cuda.is_available():
        print('GPU:', torch.cuda.get_device_name(0))
except Exception as exc:
    print('Torch check failed:', exc)

In [None]:
# Clone your repo
!git clone https://github.com/epitaph76/CL_ml.git
%cd /content/CL_ml

In [None]:
# Install project dependencies
!pip -q install -r requirements.txt

## 1) Configure input/output folders

Option A (simple): upload files to `/content/audio_input`.
Option B: use Google Drive folder path.

In [None]:
from pathlib import Path

INPUT_ROOT = Path('/content/audio_input')
OUTPUT_ROOT = Path('/content/CL_ml/data/tokens')
SPLITS_ROOT = Path('/content/CL_ml/data/splits')

INPUT_ROOT.mkdir(parents=True, exist_ok=True)
OUTPUT_ROOT.mkdir(parents=True, exist_ok=True)
SPLITS_ROOT.mkdir(parents=True, exist_ok=True)

print('INPUT_ROOT =', INPUT_ROOT)
print('OUTPUT_ROOT =', OUTPUT_ROOT)
print('SPLITS_ROOT =', SPLITS_ROOT)

In [None]:
# Optional: upload local audio files directly to Colab
# Skip this cell if files are already in INPUT_ROOT or on Drive.
from google.colab import files
import shutil

uploaded = files.upload()
for name in uploaded.keys():
    src = Path('/content') / name
    dst = INPUT_ROOT / name
    if src.exists():
        shutil.move(str(src), str(dst))

print('Uploaded files:', len(list(INPUT_ROOT.glob('*'))))

In [None]:
# Optional: mount Google Drive and use a Drive folder as input
# Example:
# from google.colab import drive
# drive.mount('/content/drive')
# INPUT_ROOT = Path('/content/drive/MyDrive/your_audio_folder')
# print('INPUT_ROOT switched to', INPUT_ROOT)

## 2) Run MOSS tokenization

In [None]:
# Full run
import shlex
import subprocess

cmd = [
    'python', '-m', 'src.tokenizer.moss_tokenize',
    '--input-root', str(INPUT_ROOT),
    '--output-root', str(OUTPUT_ROOT),
    '--device', 'auto',
]
print('Running:', ' '.join(shlex.quote(x) for x in cmd))
subprocess.run(cmd, check=True)


In [None]:
# Smoke run on a subset (uncomment and run if needed)
# import shlex
# import subprocess
# cmd = [
#     'python', '-m', 'src.tokenizer.moss_tokenize',
#     '--input-root', str(INPUT_ROOT),
#     '--output-root', str(OUTPUT_ROOT),
#     '--device', 'auto',
#     '--max-files', '10',
# ]
# print('Running:', ' '.join(shlex.quote(x) for x in cmd))
# subprocess.run(cmd, check=True)


## 3) Inspect token outputs

In [None]:
from pathlib import Path
import torch

token_files = sorted(Path(OUTPUT_ROOT).glob('*.pt')) + sorted(Path(OUTPUT_ROOT).glob('*.npz'))
print('Token files:', len(token_files))
for p in token_files[:5]:
    print('-', p.name)

if token_files and token_files[0].suffix == '.pt':
    sample = torch.load(token_files[0], map_location='cpu')
    print('sample track_id:', sample.get('track_id'))
    print('sample token_shape:', sample.get('token_shape'))
    print('tensor shape:', tuple(sample['tokens'].shape))

import shlex
import subprocess

cmd = [
    'python', '-m', 'src.dataset.build_splits',
    '--tokens-root', str(OUTPUT_ROOT),
    '--output-root', str(SPLITS_ROOT),
    '--val-ratio', '0.1',
    '--test-ratio', '0.1',
]
print('Running:', ' '.join(shlex.quote(x) for x in cmd))
subprocess.run(cmd, check=True)


In [None]:
!python -m src.dataset.build_splits \\\n  --tokens-root "$OUTPUT_ROOT" \\\n  --output-root "$SPLITS_ROOT" \\\n  --val-ratio 0.1 \\\n  --test-ratio 0.1

In [None]:
for name in ['train.txt', 'val.txt', 'test.txt', 'summary.json']:
    p = SPLITS_ROOT / name
    print('\\n===', name, '===')
    if p.exists():
        print(p.read_text(encoding='utf-8')[:500])
    else:
        print('not found')

## Next

After this notebook, move to model training: `TokenPairDataset -> embedder -> contrastive loss`.