# Dataset, preparazione

In [None]:
import librosa
import numpy as np
import soundfile as sf
from tqdm import tqdm
import os
import random

In [None]:
from google.colab import drive
import os
import sys

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#uso questa cella quando da problemi di montaggio su drive
import os
import shutil

# 1. Pulisci tutto brutalmente
print("üßπ Pulizia forzata di /content/drive...")
!umount /content/drive 2>/dev/null  # Prova a smontare (ignora errori se gi√† smontato)
!rm -rf /content/drive              # Cancella la cartella (√® solo un punto di mount locale)

# 2. Ricrea punto di mount pulito
os.makedirs('/content/drive')

# 3. Monta ora
print("üîÑ Tentativo di montaggio...")
from google.colab import drive
drive.mount('/content/drive')

üßπ Pulizia forzata di /content/drive...
üîÑ Tentativo di montaggio...
Mounted at /content/drive


In [None]:
# Prima versione della funzione che degrada casualmente ogni audio, tagliando a 10 secondi
def create_degraded_pair(audio_path, output_dir, quality_level='random'):

    if quality_level == 'random':
        quality_level = random.choice(['lo', 'mid', 'hi'])

    y, sr = librosa.load(audio_path, sr=16000)
    y = y[:160000]  # taglio a 10 sec

    if quality_level == 'lo':
        y_deg = np.round(y * 255) / 255
        y_deg = librosa.resample(y_deg, orig_sr=16000, target_sr=8000)
        y_deg = librosa.resample(y_deg, orig_sr=8000, target_sr=16000)
    elif quality_level == 'mid':
        y_deg = np.round(y * 4095) / 4095
        y_deg = librosa.resample(y_deg, orig_sr=16000, target_sr=12000)
        y_deg = librosa.resample(y_deg, orig_sr=12000, target_sr=16000)
    else:
        y_deg = np.round(y * 32767) / 32767
        y_deg += np.random.normal(0, 0.0001, y_deg.shape)

    # Ensure output directories exist
    os.makedirs(os.path.join(output_dir, 'clean'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'degraded'), exist_ok=True)

    sf.write(f'{output_dir}/clean/{os.path.basename(audio_path)}', y, sr)
    sf.write(f'{output_dir}/degraded/{os.path.basename(audio_path)}', y_deg, sr)

    return quality_level


In [None]:
#Degradazione canzoni phonk

source_audio_dir = '/content/drive/MyDrive/audio-restoration/samples/data/raw/phonk_2025/'
audio_filenames = [f for f in os.listdir(source_audio_dir) if f.endswith('.wav')]
output_dir = '/content/drive/MyDrive/audio-restoration/samples/data/degraded_phonk'

# Loop su tutti i file
for audio_filename in tqdm(audio_filenames):
    full_audio_path = os.path.join(source_audio_dir, audio_filename)
    deg = create_degraded_pair(full_audio_path, output_dir, quality_level='random')
    print(f"‚úÖ {audio_filename} ‚Üí {deg.upper()}")

In [None]:
#Degradazione canzoni top 100 global

source_audio_dir = '/content/drive/MyDrive/audio-restoration/samples/data/raw/top_100_global/'
audio_filenames = [f for f in os.listdir(source_audio_dir) if f.endswith('.wav')]
output_dir = '/content/drive/MyDrive/audio-restoration/samples/data/degraded_top_100_global'

# Loop su tutti i file
for audio_filename in tqdm(audio_filenames):
    full_audio_path = os.path.join(source_audio_dir, audio_filename)
    deg = create_degraded_pair(full_audio_path, output_dir, quality_level='random')
    print(f"‚úÖ {audio_filename} ‚Üí {deg.upper()}")

# Degradazione Audio con Random Crop all'interno della canzone (v1)

In [None]:
def create_degraded_pair_random_crop(audio_path, output_dir, quality_level='random', version='v1'):
    """
    Crea coppia (clean, degraded) con taglio CASUALE
    """

    if quality_level == 'random':
        quality_level = random.choice(['lo', 'mid', 'hi'])

    # Load audio
    y, sr = librosa.load(audio_path, sr=16000)

    # Taglia a 10 sec CASUALMENTE
    clip_length = 160000  # 10 sec @ 16kHz

    if len(y) > clip_length:
        # Scegli punto di inizio casuale
        start_idx = random.randint(0, len(y) - clip_length)
        y = y[start_idx:start_idx + clip_length]
    else:
        # Se audio < 10s, padding
        y = np.pad(y, (0, clip_length - len(y)), mode='constant')

    # Degradazione
    if quality_level == 'lo':
        y_deg = np.round(y * 255) / 255
        y_deg = librosa.resample(y_deg, orig_sr=16000, target_sr=8000)
        y_deg = librosa.resample(y_deg, orig_sr=8000, target_sr=16000)
    elif quality_level == 'mid':
        y_deg = np.round(y * 4095) / 4095
        y_deg = librosa.resample(y_deg, orig_sr=16000, target_sr=12000)
        y_deg = librosa.resample(y_deg, orig_sr=12000, target_sr=16000)
    else:  # hi
        y_deg = np.round(y * 32767) / 32767
        y_deg += np.random.normal(0, 0.0001, y_deg.shape)

    # controlla se le directory esistono
    os.makedirs(os.path.join(output_dir, 'clean'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'degraded'), exist_ok=True)

    # Salva con versione nel nome
    base_name = os.path.basename(audio_path).replace('.wav', '')

    sf.write(f'{output_dir}/clean/{base_name}_{version}.wav', y, sr)
    sf.write(f'{output_dir}/degraded/{base_name}_{version}.wav', y_deg, sr)

    return quality_level


# ============================================
# PHONK DEGRADATION (v1)
# ============================================

print("\n" + "="*60)
print("üéµ PHONK DEGRADATION (v1 - random crop)")
print("="*60 + "\n")

source_audio_dir = '/content/drive/MyDrive/audio-restoration/samples/data/raw/phonk_2025/'
audio_filenames = [f for f in os.listdir(source_audio_dir) if f.endswith('.wav')]
output_dir = '/content/drive/MyDrive/audio-restoration/samples/data/degraded_phonk'

print(f"Found {len(audio_filenames)} phonk files\n")

for audio_filename in tqdm(audio_filenames, desc="Processing Phonk"):
    full_audio_path = os.path.join(source_audio_dir, audio_filename)
    deg = create_degraded_pair_random_crop(full_audio_path, output_dir, quality_level='random', version='v1')

print("‚úÖ Phonk done!\n")


# ============================================
# TOP 100 GLOBAL DEGRADATION (v1)
# ============================================

print("="*60)
print("üéµ TOP 100 GLOBAL DEGRADATION (v1 - random crop)")
print("="*60 + "\n")

source_audio_dir = '/content/drive/MyDrive/audio-restoration/samples/data/raw/top_100_global/'
audio_filenames = [f for f in os.listdir(source_audio_dir) if f.endswith('.wav')]
output_dir = '/content/drive/MyDrive/audio-restoration/samples/data/degraded_top_100_global'

print(f"Found {len(audio_filenames)} top 100 files\n")

for audio_filename in tqdm(audio_filenames, desc="Processing Top 100"):
    full_audio_path = os.path.join(source_audio_dir, audio_filename)
    deg = create_degraded_pair_random_crop(full_audio_path, output_dir, quality_level='random', version='v1')

print("‚úÖ Top 100 done!\n")


# ============================================
# VERIFICA FINALE
# ============================================

print("="*60)
print("üìä VERIFICA")
print("="*60)

for folder_name, output_dir in [
    ("Phonk", '/content/drive/MyDrive/audio-restoration/samples/data/degraded_phonk'),
    ("Top 100", '/content/drive/MyDrive/audio-restoration/samples/data/degraded_top_100_global')
]:
    clean_dir = os.path.join(output_dir, 'clean')
    degraded_dir = os.path.join(output_dir, 'degraded')

    clean_files = [f for f in os.listdir(clean_dir) if f.endswith('.wav')]

    v0_count = len([f for f in clean_files if '_v0' in f])
    v1_count = len([f for f in clean_files if '_v1' in f])

    print(f"\n{folder_name}:")
    print(f"  v0 (first 10s): {v0_count}")
    print(f"  v1 (random 10s): {v1_count}")
    print(f"  Total: {v0_count + v1_count}")

print("\n‚úÖ ALL DONE!")


# Degradazione Audio Generalizzata (Configurable Version)

In [None]:
VERSION = 'v1'  # ‚Üê CAMBIA QUI: v1, v2, v3, v4, ...

def create_degraded_pair_random_crop(audio_path, output_dir, quality_level='random', version='v0'):
    """
    Crea coppia (clean, degraded) con taglio CASUALE

    Args:
        audio_path: path al file audio
        output_dir: cartella di output
        quality_level: 'lo', 'mid', 'hi', o 'random'
        version: versione dei file (v0, v1, v2, v3, ...)
    """

    if quality_level == 'random':
        quality_level = random.choice(['lo', 'mid', 'hi'])

    # Load audio
    y, sr = librosa.load(audio_path, sr=16000)

    # Taglia a 10 sec CASUALMENTE
    clip_length = 160000  # 10 sec @ 16kHz

    if len(y) > clip_length:
        # Scegli punto di inizio casuale
        start_idx = random.randint(0, len(y) - clip_length)
        y = y[start_idx:start_idx + clip_length]
    else:
        # Se audio < 10s, padding
        y = np.pad(y, (0, clip_length - len(y)), mode='constant')

    # Degradazione
    if quality_level == 'lo':
        y_deg = np.round(y * 255) / 255
        y_deg = librosa.resample(y_deg, orig_sr=16000, target_sr=8000)
        y_deg = librosa.resample(y_deg, orig_sr=8000, target_sr=16000)
    elif quality_level == 'mid':
        y_deg = np.round(y * 4095) / 4095
        y_deg = librosa.resample(y_deg, orig_sr=16000, target_sr=12000)
        y_deg = librosa.resample(y_deg, orig_sr=12000, target_sr=16000)
    else:  # hi
        y_deg = np.round(y * 32767) / 32767
        y_deg += np.random.normal(0, 0.0001, y_deg.shape)


    os.makedirs(os.path.join(output_dir, 'clean'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'degraded'), exist_ok=True)

    # Salva con versione nel nome
    base_name = os.path.basename(audio_path).replace('.wav', '')

    sf.write(f'{output_dir}/clean/{base_name}_{version}.wav', y, sr)
    sf.write(f'{output_dir}/degraded/{base_name}_{version}.wav', y_deg, sr)

    return quality_level


In [None]:
# ============================================
# PHONK DEGRADATION
# ============================================

print("\n" + "="*60)
print(f"üéµ PHONK DEGRADATION ({VERSION} - random crop)")
print("="*60 + "\n")

source_audio_dir = '/content/drive/MyDrive/audio-restoration/samples/data/raw/phonk_2025/'
audio_filenames = [f for f in os.listdir(source_audio_dir) if f.endswith('.wav')]
output_dir = '/content/drive/MyDrive/audio-restoration/samples/data/degraded_phonk'

print(f"Found {len(audio_filenames)} phonk files\n")

for audio_filename in tqdm(audio_filenames, desc="Processing Phonk"):
    full_audio_path = os.path.join(source_audio_dir, audio_filename)
    deg = create_degraded_pair_random_crop(full_audio_path, output_dir, quality_level='random', version=VERSION)

print(f"‚úÖ Phonk {VERSION} done!\n")

In [None]:
# ============================================
# TOP 100 GLOBAL DEGRADATION
# ============================================

print("="*60)
print(f"üéµ TOP 100 GLOBAL DEGRADATION ({VERSION} - random crop)")
print("="*60 + "\n")

source_audio_dir = '/content/drive/MyDrive/audio-restoration/samples/data/raw/top_100_global/'
audio_filenames = [f for f in os.listdir(source_audio_dir) if f.endswith('.wav')]
output_dir = '/content/drive/MyDrive/audio-restoration/samples/data/degraded_top_100_global'

print(f"Found {len(audio_filenames)} top 100 files\n")

for audio_filename in tqdm(audio_filenames, desc="Processing Top 100"):
    full_audio_path = os.path.join(source_audio_dir, audio_filename)
    deg = create_degraded_pair_random_crop(full_audio_path, output_dir, quality_level='random', version=VERSION)

print(f"‚úÖ Top 100 {VERSION} done!\n")

In [None]:
# ============================================
# VERIFICA FINALE
# ============================================

print("="*60)
print("üìä VERIFICA")
print("="*60)

for folder_name, output_dir in [
    ("Phonk", '/content/drive/MyDrive/audio-restoration/samples/data/degraded_phonk'),
    ("Top 100", '/content/drive/MyDrive/audio-restoration/samples/data/degraded_top_100_global')
]:
    clean_dir = os.path.join(output_dir, 'clean')
    degraded_dir = os.path.join(output_dir, 'degraded')

    clean_files = [f for f in os.listdir(clean_dir) if f.endswith('.wav')]

    # conta le varie versioni
    version_counts = {}
    for file in clean_files:
        # Estrai la versione dal nome (song_01_v1.wav ‚Üí v1)
        parts = file.split('_')
        if len(parts) >= 2:
            ver = parts[-1].replace('.wav', '')
            version_counts[ver] = version_counts.get(ver, 0) + 1

    print(f"\n{folder_name}:")
    for ver in sorted(version_counts.keys()):
        print(f"  {ver}: {version_counts[ver]} files")

    total = sum(version_counts.values())
    print(f"  Total: {total} files")

print("\n‚úÖ ALL DONE!")
print(f"Generated version: {VERSION}")

# Simulazione compressione di MusicGen (Secondo Dataset)

In [None]:
#l'idea qui √® di prendere alcuni dei file originali delle canzoni che abbiamo
#e comprimerle come farebbe MusicGen in fase di generazione

# --- INSTALLAZIONI MINIME ---
# Assicuriamoci di avere l'ultima versione di transformers
!pip install -q git+https://github.com/huggingface/transformers.git

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m521.0/521.0 kB[0m [31m13.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for transformers (pyproject.toml) ... [?25l[?25hdone


In [None]:
import torch
from transformers import EncodecModel
from pathlib import Path
import numpy as np
import librosa
import soundfile as sf

# Usiamo EnCodec (24kHz), il "motore audio" alla base di MusicGen.
# MusicGen non genera audio grezzo, ma genera questi token compressi che EnCodec poi decodifica.
# Usando EnCodec direttamente, simuliamo esattamente lo stadio finale di generazione di MusicGen.
MODEL_ID = "facebook/encodec_24khz"
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

print(f"‚è≥ Caricamento EnCodec ({MODEL_ID})...")
model = EncodecModel.from_pretrained(MODEL_ID).to(DEVICE)
print("‚úÖ Modello pronto.")

def degrade_musicgen_style(input_path, output_path, bandwidth=3.0):
    """
    Applica una degradazione realistica "Neural Codec Artifacts" usando EnCodec.

    Il processo simula la perdita di informazioni dovuta alla Quantizzazione Vettoriale Residua (RVQ).
    A bassi bitrate (es. 3.0 kbps), il modello √® costretto a scartare dettagli fini
    (alte frequenze, transienti rapidi) introducendo:
    1. Artefatti metallici/robotici
    2. Sfasamento di fase (Phase Smearing)
    3. Taglio della banda (Bandwidth Limitation)
    """

    # 1. Caricamento e Pre-processing
    # EnCodec √® addestrato su audio a 24kHz. Dobbiamo ricampionare l'input.
    # Usiamo Librosa per robustezza nel caricamento di formati vari.
    waveform_np, sr = librosa.load(str(input_path), sr=24000, mono=True)

    # 2. Preparazione Tensore
    # Trasformiamo numpy array in tensore PyTorch con dimensioni [Batch, Channels, Time].
    # EnCodec si aspetta [1, 1, T] per input mono.
    waveform = torch.from_numpy(waveform_np).unsqueeze(0).unsqueeze(0).to(DEVICE)

    # 3. Il "Danno": Compressione e Decompressione (Analysis -> Synthesis)
    with torch.no_grad():
        # ENCODE: L'audio viene compresso in una rappresentazione latente discreta (codebook indices).
        # Il parametro 'bandwidth' controlla quanto aggressiva √® la compressione.
        # 3.0 kbps = Alta compressione -> Molti artefatti.
        encoder_outputs = model.encode(waveform, bandwidth=bandwidth)
        codes = encoder_outputs.audio_codes
        scales = encoder_outputs.audio_scales

        # DECODE: Il modello tenta di ricostruire l'audio dai codici compressi.
        # √à qui che nascono gli artefatti: l'informazione persa nella quantizzazione non torna pi√π.
        decoder_outputs = model.decode(codes, scales)
        reconstructed_audio = decoder_outputs.audio_values[0] # Rimuovi dimensione batch -> [1, Time]

    # 4. Post-processing e Salvataggio
    # Portiamo su CPU e convertiamo in Numpy per il salvataggio.
    reconstructed_cpu = reconstructed_audio.cpu().squeeze().numpy() # [Time]

    # Normalizzazione del Volume (Peak Normalization)
    # I codec neurali a volte alterano il guadagno. Normalizziamo per evitare clipping
    # o volumi troppo bassi, garantendo che il confronto Clean/Degraded sia equo.
    max_val = np.abs(reconstructed_cpu).max()
    if max_val > 0:
        reconstructed_cpu = reconstructed_cpu / max_val

    # Salvataggio su disco a 24kHz (Sampling rate nativo dell'artefatto).
    sf.write(str(output_path), reconstructed_cpu, 24000)

‚è≥ Caricamento EnCodec (facebook/encodec_24khz)...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/809 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/93.1M [00:00<?, ?B/s]

Loading weights:   0%|          | 0/252 [00:00<?, ?it/s]

‚úÖ Modello pronto.


In [None]:
#degradazione degli stessi file di prima (stessa scelta delle versioni con stessi tagli) ma come farebbe MusicGen
import shutil

# --- CONFIGURAZIONE ---
SOURCE_CLEAN_DIR = Path("/content/drive/MyDrive/audio-restoration/samples/data/degraded_top_100_global/clean")
BASE_OUTPUT_DIR = Path("/content/drive/MyDrive/audio-restoration/data/test_musicgen_style")

# Crea struttura cartelle
NEW_CLEAN_DIR = BASE_OUTPUT_DIR / "clean"
NEW_DEGRADED_DIR = BASE_OUTPUT_DIR / "degraded"
NEW_CLEAN_DIR.mkdir(parents=True, exist_ok=True)
NEW_DEGRADED_DIR.mkdir(parents=True, exist_ok=True)

# üëá NOME DEL FILE DA TESTARE
TARGET_FILENAME = ""

def prepare_single_test_file(filename):
    source_file = SOURCE_CLEAN_DIR / filename

    if not source_file.exists():
        print(f"‚ùå Errore: Il file '{filename}' non esiste in {SOURCE_CLEAN_DIR}")
        return

    print(f"üéµ Preparazione Test per: {filename}")

    # 1. Copia Clean
    target_clean = NEW_CLEAN_DIR / filename
    shutil.copy2(source_file, target_clean)
    print(f"   ‚úÖ Copiato Clean -> {target_clean}")

    # 2. Genera Degraded (MusicGen Style)
    target_degraded = NEW_DEGRADED_DIR / filename
    try:
        print("   üîÑ Generazione Degrado EnCodec (3kbps)...")
        degrade_musicgen_style(source_file, target_degraded, bandwidth=3.0)
        print(f"   ‚úÖ Generato Degraded -> {target_degraded}")
    except Exception as e:
        print(f"   ‚ùå Errore generazione: {e}")

# ESEGUI
prepare_single_test_file(TARGET_FILENAME)


üéµ Preparazione Test per: -Prey, Scythermane, DJ FLORA, Nxxkz - Seu Amor Morto.wav
   ‚úÖ Copiato Clean -> /content/drive/MyDrive/audio-restoration/data/test_musicgen_style/clean/-Prey, Scythermane, DJ FLORA, Nxxkz - Seu Amor Morto.wav
   üîÑ Generazione Degrado EnCodec (3kbps)...
   ‚úÖ Generato Degraded -> /content/drive/MyDrive/audio-restoration/data/test_musicgen_style/degraded/-Prey, Scythermane, DJ FLORA, Nxxkz - Seu Amor Morto.wav


In [None]:
#stessa cosa su un'intera cartella

import shutil
from tqdm import tqdm
from pathlib import Path

# --- CONFIGURAZIONE ---
SOURCE_CLEAN_DIR = Path("/content/drive/MyDrive/audio-restoration/samples/data/degraded_phonk/clean")

BASE_OUTPUT_DIR = Path("/content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/degraded_phonk")

# Configurazione EnCodec
BANDWIDTH = 3.0 # kbps

# --- SETUP CARTELLE ---
NEW_CLEAN_DIR = BASE_OUTPUT_DIR / "clean"
NEW_DEGRADED_DIR = BASE_OUTPUT_DIR / "degraded"

NEW_CLEAN_DIR.mkdir(parents=True, exist_ok=True)
NEW_DEGRADED_DIR.mkdir(parents=True, exist_ok=True)

# Lista di tutti i file WAV nella sorgente
all_files = list(SOURCE_CLEAN_DIR.glob("*.wav"))

print(f"üöÄ Inizio generazione massiva Dataset MusicGen ({len(all_files)} file)...")
print(f"üìÇ Sorgente: {SOURCE_CLEAN_DIR}")
print(f"üìÇ Destinazione: {BASE_OUTPUT_DIR}")
print("-" * 50)

# --- CICLO DI GENERAZIONE ---
success_count = 0
error_count = 0

for source_file in tqdm(all_files):
    filename = source_file.name

    # Percorsi target
    target_clean = NEW_CLEAN_DIR / filename
    target_degraded = NEW_DEGRADED_DIR / filename

    try:
        # 1. Copia Clean (se non esiste gi√†)
        if not target_clean.exists():
            shutil.copy2(source_file, target_clean)

        # 2. Genera Degraded (se non esiste gi√†)
        if not target_degraded.exists():
            # Chiama la tua funzione degrade_musicgen_style definita prima
            degrade_musicgen_style(source_file, target_degraded, bandwidth=BANDWIDTH)

        success_count += 1

    except Exception as e:
        print(f"\n‚ùå Errore su {filename}: {e}")
        error_count += 1

print("-" * 50)
print(f"‚úÖ Completato! Processati {success_count} file con successo.")
if error_count > 0:
    print(f"‚ö†Ô∏è {error_count} file hanno generato errori.")
print(f"I dati sono pronti in: {BASE_OUTPUT_DIR}")


üöÄ Inizio generazione massiva Dataset MusicGen (552 file)...
üìÇ Sorgente: /content/drive/MyDrive/audio-restoration/samples/data/degraded_phonk/clean
üìÇ Destinazione: /content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/degraded_phonk
--------------------------------------------------


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 552/552 [01:33<00:00,  5.93it/s]

--------------------------------------------------
‚úÖ Completato! Processati 552 file con successo.
I dati sono pronti in: /content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/degraded_phonk





# Splitting Training Dataset 80/20

In [None]:
#Grouped Train/Val Split (Canzoni intere, voglio che tutte le versioni di una canzone siano in train OPPURE in val)
#non devono esserci un altre versioni da una parte e alcune da un'altra
import os
import shutil
import random
from pathlib import Path
from tqdm import tqdm

# --- CONFIGURAZIONE ---
# Percorsi SORGENTE (dove sono i file generati, non ancora divisi)
PHONK_SRC = Path('/content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/degraded_phonk') #sono i path del secondo Dataset
TOP100_SRC = Path('/content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/top_100_global')

# Percorso DESTINAZIONE (dove creeremo train/val puliti)
DEST_DIR = Path('/content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/splits_grouped')

# Parametri
VAL_SPLIT = 0.2
RANDOM_SEED = 42

def get_song_name(filename):
    """Estrae il nome base della canzone rimuovendo _vX.wav"""
    name = filename.replace('.wav', '')
    # Rimuove _v1, _v2, ecc se presenti alla fine
    if '_v' in name[-3:]:
        name = name[:-3]
    return name

def group_files_by_song(base_dir):
    """Raggruppa i file clean/degraded per canzone"""
    clean_dir = base_dir / 'clean'
    songs = {} # { "Alex Warren - Eternity": [file1, file2...] }

    files = sorted(list(clean_dir.glob('*.wav')))
    for f in files:
        song_name = get_song_name(f.name)
        if song_name not in songs:
            songs[song_name] = []
        songs[song_name].append(f.name)

    return songs

print("üîç Analisi file sorgenti...")

# 1. Raggruppa Top 100
top100_songs = group_files_by_song(TOP100_SRC)
top100_names = sorted(list(top100_songs.keys()))
print(f"Top 100: Trovate {len(top100_names)} canzoni uniche ({sum(len(v) for v in top100_songs.values())} files)")

# 2. Raggruppa Phonk
phonk_songs = group_files_by_song(PHONK_SRC)
phonk_names = sorted(list(phonk_songs.keys()))
print(f"Phonk: Trovate {len(phonk_names)} canzoni uniche ({sum(len(v) for v in phonk_songs.values())} files)")

# 3. Bilanciamento 90/10 (sulle CANZONI, non sui file)
# Vogliamo che le canzoni Phonk siano il 10% del totale delle canzoni usate
total_top100_needed = int(len(phonk_names) * 9)

if total_top100_needed > len(top100_names):
    # Caso normale: abbiamo tante top100
    selected_top100 = top100_names # Prendiamo tutte le disponibili
    # Riduciamo Phonk per matchare
    selected_phonk = phonk_names[:int(len(top100_names)/9)]
else:
    # Caso raro: troppe poche top100
    selected_top100 = top100_names[:total_top100_needed]
    selected_phonk = phonk_names

print(f"\nSelezione finale: {len(selected_top100)} Top100 + {len(selected_phonk)} Phonk songs")

# 4. Split Train/Val (per Canzone)
random.seed(RANDOM_SEED)
random.shuffle(selected_top100)
random.shuffle(selected_phonk)

def split_list(lst, split_ratio):
    split_idx = int(len(lst) * (1 - split_ratio))
    return lst[:split_idx], lst[split_idx:]

t100_train, t100_val = split_list(selected_top100, VAL_SPLIT)
phonk_train, phonk_val = split_list(selected_phonk, VAL_SPLIT)

train_songs = t100_train + phonk_train
val_songs = t100_val + phonk_val

print(f"Split: {len(train_songs)} canzoni Train, {len(val_songs)} canzoni Val")

# 5. Copia fisica dei file
def copy_songs(song_list, source_songs_dict, source_base_dir, dest_split):
    clean_dest = DEST_DIR / dest_split / 'clean'
    deg_dest = DEST_DIR / dest_split / 'degraded'
    os.makedirs(clean_dest, exist_ok=True)
    os.makedirs(deg_dest, exist_ok=True)

    count = 0
    for song in song_list:
        files = source_songs_dict[song]
        for fname in files:
            # Copia Clean
            shutil.copy2(source_base_dir / 'clean' / fname, clean_dest / fname)
            # Copia Degraded
            shutil.copy2(source_base_dir / 'degraded' / fname, deg_dest / fname)
            count += 1
    return count

print("\nüöÄ Inizio copia file...")

# Copia Top100
n_t100_train = copy_songs(t100_train, top100_songs, TOP100_SRC, 'train')
n_t100_val = copy_songs(t100_val, top100_songs, TOP100_SRC, 'val')

# Copia Phonk
n_ph_train = copy_songs(phonk_train, phonk_songs, PHONK_SRC, 'train')
n_ph_val = copy_songs(phonk_val, phonk_songs, PHONK_SRC, 'val')

print("\n‚úÖ DONE!")
print(f"Nuovo dataset creato in: {DEST_DIR}")
print(f"Train files: {n_t100_train + n_ph_train}")
print(f"Val files: {n_t100_val + n_ph_val}")


üîç Analisi file sorgenti...
Top 100: Trovate 98 canzoni uniche (588 files)
Phonk: Trovate 92 canzoni uniche (552 files)

Selezione finale: 98 Top100 + 10 Phonk songs
Split: 86 canzoni Train, 22 canzoni Val

üöÄ Inizio copia file...

‚úÖ DONE!
Nuovo dataset creato in: /content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/splits_grouped
Train files: 516
Val files: 132


# Lista canzoni e Check presenza nel training/val

In [None]:
#restituisce percorsi completi delle cartelle
!find /content/drive/MyDrive/audio-restoration/ -name "splits_grouped" -type d


/content/drive/MyDrive/audio-restoration/data/splits_grouped


In [None]:
#raggruppa canzoni e le varie versioni

import os
from collections import defaultdict
from pathlib import Path

def list_songs_inventory(target_folder):
    target_path = Path(target_folder)

    if not target_path.exists():
        print(f"‚ùå Errore: La cartella {target_folder} non esiste.")
        return

    inventory = defaultdict(list)
    files = sorted([f.name for f in target_path.glob("*.wav")])

    print(f"üìÇ Analisi cartella: {target_path}\n")

    for filename in files:
        name_without_ext = filename.replace(".wav", "")

        # Logica robusta: controlla se finisce con _vNUMERO
        # Divide il nome all'ultimo underscore "_"
        parts = name_without_ext.rsplit('_', 1)

        if len(parts) > 1 and parts[1].startswith('v') and parts[1][1:].isdigit():
            # Divide in: "Seu Amor Morto_v5" -> Base: "Seu Amor Morto", Versione: "v5"
            base_name = parts[0]
            version = parts[1]
        else:
            # Caso: "Seu Amor Morto" (senza versione) -> Base: "Seu Amor Morto", Versione: "Intro/Originale"
            base_name = name_without_ext
            version = "Intro (v0)"

        inventory[base_name].append(version)

    # Stampa Ordinata
    print(f"üìö TROVATE {len(inventory)} CANZONI UNICHE:\n" + "="*60)

    # Ordiniamo le canzoni alfabeticamente
    for song in sorted(inventory.keys()):
        versions = inventory[song]
        # Ordiniamo le versioni in modo intelligente (v1, v2, v10...)
        versions.sort(key=lambda x: int(x[1:]) if x.startswith('v') and x[1:].isdigit() else 0)

        print(f"üéµ {song}")
        print(f"   ‚Ü≥ {len(versions)} file: {', '.join(versions)}")
        print("-" * 30)

# ESECUZIONE
FOLDER_TO_SCAN = "/content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/splits_grouped/train/clean"
list_songs_inventory(FOLDER_TO_SCAN)


üìÇ Analisi cartella: /content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/splits_grouped/train/clean

üìö TROVATE 86 CANZONI UNICHE:
üéµ $zwecki - HYPERCHARGE
   ‚Ü≥ 6 file: Intro (v0), v1, v2, v3, v4, v5
------------------------------
üéµ $zwecki - INFINITY
   ‚Ü≥ 6 file: Intro (v0), v1, v2, v3, v4, v5
------------------------------
üéµ -Prey, MXRCURY, Scythermane, dearukia - STAY FOR THE NIGHT
   ‚Ü≥ 6 file: Intro (v0), v1, v2, v3, v4, v5
------------------------------
üéµ ATLXS, GXMZ - MONTAGEM VIDA
   ‚Ü≥ 6 file: Intro (v0), v1, v2, v3, v4, v5
------------------------------
üéµ Alex Warren - Eternity
   ‚Ü≥ 6 file: Intro (v0), v1, v2, v3, v4, v5
------------------------------
üéµ Alex Warren - Ordinary
   ‚Ü≥ 6 file: Intro (v0), v1, v2, v3, v4, v5
------------------------------
üéµ BLACKPINK - JUMP
   ‚Ü≥ 6 file: Intro (v0), v1, v2, v3, v4, v5
------------------------------
üéµ Bad Bunny - TitiÃÅ Me PreguntoÃÅ
   ‚Ü≥ 6 file: Intro (v0), v1, v2, v3, v4, v5
--

In [None]:
#Check presenza canzone nel dataset training/val
import os
from pathlib import Path

def check_song_location(search_query, base_data_dir="/content/drive/MyDrive/audio-restoration/data/splits_grouped"):
    base_path = Path(base_data_dir)

    found_in_train = []
    found_in_val = []

    print(f"üïµÔ∏è  RICERCA: '{search_query}'\n" + "="*50)

    # Scansione ricorsiva
    for file_path in base_path.rglob("*.wav"):
        if search_query.lower() in file_path.name.lower():
            # Identifica se √® Train o Val dal percorso
            parent_dirs = file_path.parts

            if "train" in parent_dirs:
                found_in_train.append(file_path.name)
            elif "val" in parent_dirs:
                found_in_val.append(file_path.name)

    # REPORT
    if found_in_train:
        print(f"üèãÔ∏è  TROVATO IN TRAINING ({len(found_in_train)} file):")
        for f in sorted(found_in_train): print(f"   - {f}")
    else:
        print("‚úÖ  Assente dal Training.")

    print("-" * 20)

    if found_in_val:
        print(f"üß™  TROVATO IN VALIDATION ({len(found_in_val)} file):")
        for f in sorted(found_in_val): print(f"   - {f}")
    else:
        print("‚ùå  Assente dal Validation.")

    print("=" * 50)

    # CONTROLLO SICUREZZA
    if found_in_train and found_in_val:
        print("\nüö® ALLARME DATA LEAKAGE! üö®")
        print("Attenzione: Versioni della stessa canzone sono presenti sia in Train che in Val.")
        print("Il test su questi file NON √® valido perch√© il modello li ha gi√† visti.")
    elif found_in_val and not found_in_train:
        print("\n‚úÖ OK PER IL TEST: La canzone √® solo nel Validation Set.")
    elif found_in_train and not found_in_val:
        print("\n‚ö†Ô∏è ATTENZIONE: La canzone √® nel Training Set. Usala solo per verificare l'overfitting, non per testare la generalizzazione.")

# ESECUZIONE
# Inserisci parte del nome della canzone
CANZONE_DA_CERCARE = "THINGS YOU DO"
check_song_location(CANZONE_DA_CERCARE)


üïµÔ∏è  RICERCA: 'THINGS YOU DO'
‚úÖ  Assente dal Training.
--------------------
‚ùå  Assente dal Validation.


#Handling delle canzoni phonk nelle cartelle di training e validation

In [None]:
# --- RIMOZIONE CANZONI + LOG ---
import os
from pathlib import Path
from datetime import datetime

# CONFIGURAZIONE
SPLIT_DIR = Path('/content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/splits_grouped/train')
LOG_FILE = Path('/content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/splits_grouped/dataset_changes_log_removing.txt')

SONGS_TO_REMOVE = [
    "-Prey, MXRCURY, Scythermane, dearukia - STAY FOR THE NIGHT",
    "$zwecki - HYPERCHARGE",
    "$zwecki - INFINITY",
    "DJ ALEX - MONTAGEM NIVEL 1",
    "DJ KHRLP - Soma So"
]

def log_message(msg):
    """Scrive sia a schermo che nel file di log"""
    print(msg)
    with open(LOG_FILE, 'a') as f:
        f.write(msg + '\n')

# Intestazione Log
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message(f"\n{'='*50}")
log_message(f"üóëÔ∏è RIMIZIONE CANZONI - {timestamp}")
log_message(f"Target: {SPLIT_DIR}")
log_message(f"{'='*50}")

deleted_count = 0
for song in SONGS_TO_REMOVE:
    log_message(f"\nCercando: {song}...")

    for sub in ['clean', 'degraded']:
        folder = SPLIT_DIR / sub
        found_files = list(folder.glob(f"{song}*.wav"))

        for f in found_files:
            try:
                os.remove(f)
                log_message(f"  ‚ùå Cancellato: {sub}/{f.name}")
                deleted_count += 1
            except Exception as e:
                log_message(f"  ‚ö†Ô∏è Errore cancellazione {f.name}: {e}")

log_message(f"\n‚úÖ Operazione completata. Rimossi {deleted_count} file totali.")



üóëÔ∏è RIMIZIONE CANZONI - 2026-01-01 11:23:03
Target: /content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/splits_grouped/train

Cercando: -Prey, MXRCURY, Scythermane, dearukia - STAY FOR THE NIGHT...
  ‚ùå Cancellato: clean/-Prey, MXRCURY, Scythermane, dearukia - STAY FOR THE NIGHT.wav
  ‚ùå Cancellato: clean/-Prey, MXRCURY, Scythermane, dearukia - STAY FOR THE NIGHT_v1.wav
  ‚ùå Cancellato: clean/-Prey, MXRCURY, Scythermane, dearukia - STAY FOR THE NIGHT_v2.wav
  ‚ùå Cancellato: clean/-Prey, MXRCURY, Scythermane, dearukia - STAY FOR THE NIGHT_v3.wav
  ‚ùå Cancellato: clean/-Prey, MXRCURY, Scythermane, dearukia - STAY FOR THE NIGHT_v4.wav
  ‚ùå Cancellato: clean/-Prey, MXRCURY, Scythermane, dearukia - STAY FOR THE NIGHT_v5.wav
  ‚ùå Cancellato: degraded/-Prey, MXRCURY, Scythermane, dearukia - STAY FOR THE NIGHT.wav
  ‚ùå Cancellato: degraded/-Prey, MXRCURY, Scythermane, dearukia - STAY FOR THE NIGHT_v1.wav
  ‚ùå Cancellato: degraded/-Prey, MXRCURY, Scythermane, dearuki

In [None]:
# --- AGGIUNTA CANZONI + LOG ---
import shutil
import random
import os
from pathlib import Path
from datetime import datetime

# CONFIGURAZIONE
PHONK_SRC = Path('/content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/degraded_phonk')
DEST_DIR = Path('/content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/splits_grouped/train')
LOG_FILE = Path('/content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/splits_grouped/dataset_changes_log_adding.txt')
NUM_TO_ADD = 5

def log_message(msg):
    print(msg)
    with open(LOG_FILE, 'a') as f:
        f.write(msg + '\n')

# Intestazione Log
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message(f"\n{'='*50}")
log_message(f"‚ûï AGGIUNTA CANZONI - {timestamp}")
log_message(f"Destinazione: {DEST_DIR}")
log_message(f"{'='*50}")

# 1. Trova canzoni esistenti
existing_files = list((DEST_DIR / 'clean').glob('*.wav'))
existing_songs = set()
for f in existing_files:
    name = f.name.replace('.wav', '')
    if '_v' in name[-3:]: name = name[:-3]
    existing_songs.add(name)

# 2. Trova canzoni disponibili
all_phonk_files = list((PHONK_SRC / 'clean').glob('*.wav'))
all_phonk_songs = set()
song_file_map = {}

for f in all_phonk_files:
    name = f.name.replace('.wav', '')
    if '_v' in name[-3:]: name = name[:-3]
    all_phonk_songs.add(name)
    if name not in song_file_map:
        song_file_map[name] = []
    song_file_map[name].append(f.name)

# 3. Identifica candidate
candidates = list(all_phonk_songs - existing_songs)
log_message(f"Candidate disponibili: {len(candidates)}")

if len(candidates) < NUM_TO_ADD:
    log_message("‚ö†Ô∏è Non ci sono abbastanza canzoni nuove da aggiungere!")
else:
    # Selezione (Random o Manuale qui sotto)
    #chosen_ones = random.sample(candidates, NUM_TO_ADD)
    chosen_ones = [
                   "Grioten, Sadfriendd, xlout - MOVE LIKE THAT!",
                   "LXNGVX - TOMA DE NOVO",
                   "MXZI, Rushex, MONTAGEM - MONTAGEM BATCHI"
                  ]

    log_message(f"\nSelezionate per aggiunta: {chosen_ones}")

    copied_count = 0
    for song in chosen_ones:
        files_to_copy = song_file_map[song]
        for fname in files_to_copy:
            try:
                shutil.copy2(PHONK_SRC / 'clean' / fname, DEST_DIR / 'clean' / fname)
                shutil.copy2(PHONK_SRC / 'degraded' / fname, DEST_DIR / 'degraded' / fname)
                copied_count += 1
                log_message(f"  -> Copiato: {fname}")
            except Exception as e:
                log_message(f"  ‚ö†Ô∏è Errore copia {fname}: {e}")

    log_message(f"\n‚úÖ Fatto! Aggiunti {copied_count} file ({len(chosen_ones)} canzoni).")



‚ûï AGGIUNTA CANZONI - 2026-01-01 11:28:44
Destinazione: /content/drive/MyDrive/audio-restoration/data/musicgen_finetuning/splits_grouped/train
Candidate disponibili: 87

Selezionate per aggiunta: ['Grioten, Sadfriendd, xlout - MOVE LIKE THAT!', 'LXNGVX - TOMA DE NOVO', 'MXZI, Rushex, MONTAGEM - MONTAGEM BATCHI']
  -> Copiato: Grioten, Sadfriendd, xlout - MOVE LIKE THAT!.wav
  -> Copiato: Grioten, Sadfriendd, xlout - MOVE LIKE THAT!_v1.wav
  -> Copiato: Grioten, Sadfriendd, xlout - MOVE LIKE THAT!_v2.wav
  -> Copiato: Grioten, Sadfriendd, xlout - MOVE LIKE THAT!_v3.wav
  -> Copiato: Grioten, Sadfriendd, xlout - MOVE LIKE THAT!_v4.wav
  -> Copiato: Grioten, Sadfriendd, xlout - MOVE LIKE THAT!_v5.wav
  -> Copiato: LXNGVX - TOMA DE NOVO.wav
  -> Copiato: LXNGVX - TOMA DE NOVO_v1.wav
  -> Copiato: LXNGVX - TOMA DE NOVO_v2.wav
  -> Copiato: LXNGVX - TOMA DE NOVO_v3.wav
  -> Copiato: LXNGVX - TOMA DE NOVO_v4.wav
  -> Copiato: LXNGVX - TOMA DE NOVO_v5.wav
  -> Copiato: MXZI, Rushex, MONTAGEM