In [7]:
os.chdir('/content/drive/MyDrive/tunesformer-main/')  # Cambia el directorio
print(os.getcwd())    # Muestra el directorio actual, debe salir /content

/content/drive/MyDrive/tunesformer-main


In [15]:
import os
import re
import json
import pandas as pd
from pathlib import Path
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    TrainingArguments,
    Trainer,
    DataCollatorForLanguageModeling
)
from string import Template
import random
from tqdm import tqdm

@dataclass
class ABCRecord:
    """Estructura para almacenar datos ABC procesados"""
    index: str
    title: str
    composer: str
    meter: str
    length: str
    key: str
    rhythm: str
    area: str
    source: str
    notes: str
    abc_content: str
    full_abc: str

class ABCProcessor:
    """Procesador para datos ABC notation"""

    def __init__(self):
        self.abc_pattern = re.compile(r'X:\s*(\d+)\s*\n((?:[^X]|X(?!\s*\d))*?)(?=X:\s*\d|\Z)', re.MULTILINE | re.DOTALL)
        self.field_patterns = {
            'T': re.compile(r'^T:\s*(.+)$', re.MULTILINE),
            'C': re.compile(r'^C:\s*(.+)$', re.MULTILINE),
            'M': re.compile(r'^M:\s*(.+)$', re.MULTILINE),
            'L': re.compile(r'^L:\s*(.+)$', re.MULTILINE),
            'K': re.compile(r'^K:\s*(.+)$', re.MULTILINE),
            'R': re.compile(r'^R:\s*(.+)$', re.MULTILINE),
            'A': re.compile(r'^A:\s*(.+)$', re.MULTILINE),
            'S': re.compile(r'^S:\s*(.+)$', re.MULTILINE),
            'N': re.compile(r'^N:\s*(.+)$', re.MULTILINE)
        }

    def parse_abc_file(self, file_path: str) -> List[ABCRecord]:
        """Parsea un archivo ABC y extrae todas las canciones"""
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()

        return self.parse_abc_content(content)

    def parse_abc_content(self, content: str) -> List[ABCRecord]:
        """Parsea contenido ABC y extrae todas las canciones"""
        records = []
        matches = self.abc_pattern.findall(content)

        for index, abc_body in matches:
            full_abc = f"X:{index}\n{abc_body}"
            record = self._extract_fields(index, abc_body, full_abc)
            if record:
                records.append(record)

        return records

    def _extract_fields(self, index: str, abc_body: str, full_abc: str) -> Optional[ABCRecord]:
        """Extrae campos individuales del contenido ABC"""
        try:
            # Extraer campos usando regex
            fields = {}
            for field, pattern in self.field_patterns.items():
                match = pattern.search(abc_body)
                fields[field] = match.group(1).strip() if match else ""

            # Extraer las notas musicales (después del campo K:)
            notes_match = re.search(r'K:\s*[^\n]*\n(.*)', abc_body, re.DOTALL)
            notes = notes_match.group(1).strip() if notes_match else ""

            # Limpiar notas de comentarios y líneas vacías
            notes_lines = []
            for line in notes.split('\n'):
                line = line.strip()
                if line and not line.startswith('%'):
                    notes_lines.append(line)
            notes = '\n'.join(notes_lines)

            return ABCRecord(
                index=index,
                title=fields.get('T', f"Untitled {index}"),
                composer=fields.get('C', "Unknown"),
                meter=fields.get('M', "4/4"),
                length=fields.get('L', "1/8"),
                key=fields.get('K', "C"),
                rhythm=fields.get('R', ""),
                area=fields.get('A', ""),
                source=fields.get('S', ""),
                notes=fields.get('N', ""),
                abc_content=notes,
                full_abc=full_abc
            )
        except Exception as e:
            print(f"Error procesando ABC {index}: {e}")
            return None

    def validate_abc(self, record: ABCRecord) -> bool:
        """Valida que el registro ABC tenga campos mínimos requeridos"""
        return bool(record.key and record.meter and record.abc_content)

    def process_directory(self, directory_path: str) -> List[ABCRecord]:
        """Procesa todos los archivos ABC en un directorio"""
        all_records = []
        abc_files = list(Path(directory_path).glob("*.abc"))

        for file_path in tqdm(abc_files, desc="Procesando archivos ABC"):
            try:
                records = self.parse_abc_file(str(file_path))
                valid_records = [r for r in records if self.validate_abc(r)]
                all_records.extend(valid_records)
                print(f"Procesado {file_path.name}: {len(valid_records)} registros válidos")
            except Exception as e:
                print(f"Error procesando {file_path}: {e}")

        return all_records

class MusicInstructionGenerator:
    """Genera instrucciones de entrenamiento para tareas musicales"""

    def __init__(self):
        self.prompt_template = Template("Human: ${inst} </s> Assistant: ")

        # Plantillas de instrucciones
        self.instruction_templates = {
            'generation': [
                "Compone una pieza musical en {key} con métrica {meter}",
                "Crea una melodía en estilo {rhythm} en la tonalidad de {key}",
                "Desarrolla una composición usando la métrica {meter}",
                "Genera una pieza musical inspirada en {area} usando {key}",
            ],
            'analysis': [
                "Analiza esta pieza musical y describe su estructura:",
                "Identifica los elementos musicales principales en:",
                "Describe las características de esta composición:",
                "Examina los aspectos técnicos de:",
            ],
            'harmonization': [
                "Proporciona una armonización para esta melodía:",
                "Añade acordes apropiados a:",
                "Crea progresiones armónicas para:",
                "Desarrolla acompañamiento para:",
            ],
            'variation': [
                "Crea variaciones de este tema musical:",
                "Desarrolla ornamentaciones para:",
                "Genera embellecimientos para:",
                "Produce variaciones melódicas de:",
            ]
        }

    def generate_training_samples(self, records: List[ABCRecord]) -> List[Dict]:
        """Genera muestras de entrenamiento a partir de registros ABC"""
        training_samples = []

        for record in tqdm(records, desc="Generando muestras de entrenamiento"):
            # Generar múltiples tipos de instrucciones por registro
            samples = []

            # 1. Generación musical condicionada
            samples.extend(self._create_generation_samples(record))

            # 2. Análisis musical
            samples.extend(self._create_analysis_samples(record))

            # 3. Tareas de armonización
            if len(record.abc_content.split('\n')) > 1:
                samples.extend(self._create_harmonization_samples(record))

            training_samples.extend(samples)

        return training_samples

    def _create_generation_samples(self, record: ABCRecord) -> List[Dict]:
        """Crea muestras de generación musical"""
        samples = []
        templates = self.instruction_templates['generation']

        for template in templates[:2]:  # Usar solo 2 templates para evitar redundancia
            instruction = template.format(
                key=record.key,
                meter=record.meter,
                rhythm=record.rhythm or "tradicional",
                area=record.area or "clásico"
            )

            sample = {
                'instruction': instruction,
                'response': record.full_abc,
                'type': 'generation'
            }
            samples.append(sample)

        return samples

    def _create_analysis_samples(self, record: ABCRecord) -> List[Dict]:
        """Crea muestras de análisis musical"""
        samples = []
        template = random.choice(self.instruction_templates['analysis'])

        # Crear descripción analítica
        analysis = self._generate_analysis_text(record)

        sample = {
            'instruction': f"{template}\n\n{record.full_abc}",
            'response': analysis,
            'type': 'analysis'
        }
        samples.append(sample)

        return samples

    def _create_harmonization_samples(self, record: ABCRecord) -> List[Dict]:
        """Crea muestras de armonización"""
        samples = []

        # Extraer solo la melodía principal (primera línea de notas)
        melody_lines = record.abc_content.split('\n')
        if len(melody_lines) > 0:
            melody = melody_lines[0]
            template = random.choice(self.instruction_templates['harmonization'])

            # Crear ABC solo con la melodía
            melody_abc = f"X:{record.index}\nT:{record.title}\nM:{record.meter}\nL:{record.length}\nK:{record.key}\n{melody}"

            sample = {
                'instruction': f"{template}\n\n{melody_abc}",
                'response': record.full_abc,
                'type': 'harmonization'
            }
            samples.append(sample)

        return samples

    def _generate_analysis_text(self, record: ABCRecord) -> str:
        """Genera texto de análisis musical"""
        analysis_parts = []

        # Información básica
        analysis_parts.append(f"Esta pieza está en la tonalidad de {record.key} con métrica {record.meter}.")

        if record.rhythm:
            analysis_parts.append(f"Es un {record.rhythm}.")

        if record.composer and record.composer != "Unknown":
            analysis_parts.append(f"Compuesta por {record.composer}.")

        # Análisis estructural básico
        lines = record.abc_content.split('\n')
        analysis_parts.append(f"La pieza consta de {len(lines)} líneas melódicas.")

        # Detectar repeticiones
        if ':|' in record.abc_content or '|:' in record.abc_content:
            analysis_parts.append("Contiene secciones con repeticiones indicadas por barras de repetición.")

        # Detectar ornamentaciones
        if any(char in record.abc_content for char in ['~', '^', '_', '=']):
            analysis_parts.append("Incluye ornamentaciones y alteraciones accidentales.")

        return " ".join(analysis_parts)

    def format_for_training(self, samples: List[Dict]) -> List[str]:
        """Formatea las muestras para entrenamiento"""
        formatted_samples = []

        for sample in samples:
            prompt = self.prompt_template.safe_substitute({"inst": sample['instruction']})
            full_text = prompt + sample['response']
            formatted_samples.append(full_text)

        return formatted_samples

class MusicDataset(Dataset):
    """Dataset personalizado para datos musicales"""

    def __init__(self, texts: List[str], tokenizer, max_length: int = 2048):
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]

        # Tokenizar
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': encoding['input_ids'].flatten()
        }

class ChatMusicianTrainer:
    """Entrenador principal para ChatMusician"""

    def __init__(self, model_name: str = "m-a-p/ChatMusician"):
        self.model_name = model_name
        self.tokenizer = None
        self.model = None

    def setup_model(self, output_dir: str = "./chatmusician-finetuned"):
        """Configura el modelo y tokenizer"""
        print("Cargando modelo y tokenizer...")

        self.tokenizer = AutoTokenizer.from_pretrained(
            self.model_name,
            trust_remote_code=True,
            padding_side="right"
        )

        # Añadir token de padding si no existe
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_name,
            torch_dtype=torch.float16,
            device_map="auto",
            trust_remote_code=True
        )

        # Habilitar gradient checkpointing para ahorrar memoria
        self.model.gradient_checkpointing_enable()

        print("Modelo cargado exitosamente")

    def prepare_training_data(self, abc_directory: str) -> Tuple[List[str], List[Dict]]:
        """Procesa datos ABC y genera muestras de entrenamiento"""
        print("Procesando datos ABC...")

        # Procesar archivos ABC
        processor = ABCProcessor()
        records = processor.process_directory(abc_directory)
        print(f"Se procesaron {len(records)} registros ABC válidos")

        # Generar instrucciones de entrenamiento
        instructor = MusicInstructionGenerator()
        training_samples = instructor.generate_training_samples(records)
        print(f"Se generaron {len(training_samples)} muestras de entrenamiento")

        # Formatear para entrenamiento
        formatted_texts = instructor.format_for_training(training_samples)

        return formatted_texts, training_samples

    def train(self,
              abc_directory: str,
              output_dir: str = "./chatmusician-finetuned",
              num_epochs: int = 3,
              batch_size: int = 4,
              learning_rate: float = 2e-5,
              max_length: int = 2048,
              save_steps: int = 500):
        """Entrena el modelo con datos ABC"""

        # Configurar modelo
        self.setup_model(output_dir)

        # Preparar datos
        formatted_texts, training_samples = self.prepare_training_data(abc_directory)

        # Dividir en train/validation
        split_idx = int(len(formatted_texts) * 0.9)
        train_texts = formatted_texts[:split_idx]
        val_texts = formatted_texts[split_idx:]

        # Crear datasets
        train_dataset = MusicDataset(train_texts, self.tokenizer, max_length)
        val_dataset = MusicDataset(val_texts, self.tokenizer, max_length)

        # Configurar argumentos de entrenamiento
        training_args = TrainingArguments(
            output_dir=output_dir,
            num_train_epochs=num_epochs,
            per_device_train_batch_size=batch_size,
            per_device_eval_batch_size=batch_size,
            gradient_accumulation_steps=4,
            warmup_steps=100,
            learning_rate=learning_rate,
            fp16=True,
            logging_steps=50,
            save_steps=save_steps,
            eval_steps=save_steps,
            #evaluation_strategy="steps",
            save_total_limit=3,
            load_best_model_at_end=True,
            metric_for_best_model="eval_loss",
            greater_is_better=False,
            dataloader_num_workers=4,
            remove_unused_columns=False,
        )

        # Data collator
        data_collator = DataCollatorForLanguageModeling(
            tokenizer=self.tokenizer,
            mlm=False,
        )

        # Crear trainer
        trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            data_collator=data_collator,
        )

        # Entrenar
        print("Iniciando entrenamiento...")
        trainer.train()

        # Guardar modelo final
        trainer.save_model()
        self.tokenizer.save_pretrained(output_dir)

        # Guardar metadatos del entrenamiento
        with open(os.path.join(output_dir, "training_info.json"), 'w') as f:
            json.dump({
                'total_samples': len(formatted_texts),
                'train_samples': len(train_texts),
                'val_samples': len(val_texts),
                'epochs': num_epochs,
                'batch_size': batch_size,
                'learning_rate': learning_rate,
                'max_length': max_length
            }, f, indent=2)

        print(f"Entrenamiento completado. Modelo guardado en: {output_dir}")

def main():
    """Función principal de ejemplo"""
    # Configuración
    ABC_DIRECTORY = "data_curation/abcs"  # Directorio con archivos .abc
    OUTPUT_DIR = "./chatmusician-custom"

    # Verificar que existe el directorio
    if not os.path.exists(ABC_DIRECTORY):
        print(f"Error: No existe el directorio {ABC_DIRECTORY}")
        print("Crea el directorio y coloca tus archivos .abc allí")
        return

    # Crear entrenador
    trainer = ChatMusicianTrainer()

    # Entrenar modelo
    trainer.train(
        abc_directory=ABC_DIRECTORY,
        output_dir=OUTPUT_DIR,
        num_epochs=3,
        batch_size=2,  # Ajustar según tu GPU
        learning_rate=2e-5,
        max_length=1536  # Ajustar según el tamaño promedio de tus archivos ABC
    )

if __name__ == "__main__":
    main()

Cargando modelo y tokenizer...


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]



Modelo cargado exitosamente
Procesando datos ABC...


Procesando archivos ABC: 0it [00:00, ?it/s]


Se procesaron 0 registros ABC válidos


Generando muestras de entrenamiento: 0it [00:00, ?it/s]

Se generaron 0 muestras de entrenamiento





ValueError: --load_best_model_at_end requires the save and eval strategy to match, but found
- Evaluation strategy: IntervalStrategy.NO
- Save strategy: SaveStrategy.STEPS

Prueba el modelo entrenado

In [None]:
import torch
import torchaudio
import re
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig
from string import Template
import json
import os

class ChatMusicianTester:
    """Clase para probar el modelo ChatMusician entrenado"""

    def __init__(self, model_path: str):
        self.model_path = model_path
        self.tokenizer = None
        self.model = None
        self.prompt_template = Template("Human: ${inst} </s> Assistant: ")

        # Configuración de generación
        self.generation_config = GenerationConfig(
            temperature=0.2,
            top_k=40,
            top_p=0.9,
            do_sample=True,
            num_beams=1,
            repetition_penalty=1.1,
            min_new_tokens=10,
            max_new_tokens=1536,
            pad_token_id=None  # Se configurará después
        )

    def load_model(self):
        """Carga el modelo entrenado"""
        print(f"Cargando modelo desde: {self.model_path}")

        try:
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.model_path,
                trust_remote_code=True
            )

            self.model = AutoModelForCausalLM.from_pretrained(
                self.model_path,
                torch_dtype=torch.float16,
                device_map="auto",
                trust_remote_code=True
            ).eval()

            # Configurar pad_token para generation_config
            self.generation_config.pad_token_id = self.tokenizer.eos_token_id

            print("Modelo cargado exitosamente")

        except Exception as e:
            print(f"Error cargando el modelo: {e}")
            raise

    def generate_music(self, instruction: str, save_audio: bool = True, output_file: str = None) -> str:
        """Genera música basada en una instrucción"""
        if not self.model or not self.tokenizer:
            raise ValueError("Modelo no cargado. Ejecuta load_model() primero.")

        # Formatear prompt
        prompt = self.prompt_template.safe_substitute({"inst": instruction})

        # Tokenizar
        inputs = self.tokenizer(prompt, return_tensors="pt", add_special_tokens=False)

        # Generar
        with torch.no_grad():
            response = self.model.generate(
                input_ids=inputs["input_ids"].to(self.model.device),
                attention_mask=inputs['attention_mask'].to(self.model.device),
                eos_token_id=self.tokenizer.eos_token_id,
                generation_config=self.generation_config,
            )

        # Decodificar respuesta
        generated_text = self.tokenizer.decode(
            response[0][inputs["input_ids"].shape[1]:],
            skip_special_tokens=True
        )

        print(f"\nInstrucción: {instruction}")
        print(f"Respuesta generada:\n{generated_text}")

        # Renderizar audio si es posible
        if save_audio:
            self._render_audio(generated_text, output_file or "generated_music.wav")

        return generated_text

    def _render_audio(self, response_text: str, output_file: str):
        """Renderiza ABC notation a audio usando symusic"""
        try:
            # Importar symusic
            from symusic import Score, Synthesizer, BuiltInSF3, dump_wav

            # Extraer notación ABC
            abc_pattern = r'(X:\d+\n(?:[^\n]*\n?)+)'
            abc_matches = re.findall(abc_pattern, response_text + '\n')

            if abc_matches:
                abc_notation = abc_matches[0]
                print(f"\nNotación ABC extraída:\n{abc_notation}")

                # Convertir a Score y renderizar
                try:
                    score = Score.from_abc(abc_notation)
                    synthesizer = Synthesizer()
                    audio = synthesizer.render(score, stereo=True)

                    # Guardar audio
                    torchaudio.save(output_file, torch.FloatTensor(audio), 44100)
                    print(f"Audio guardado en: {output_file}")

                except Exception as e:
                    print(f"Error renderizando audio: {e}")
                    print("Verifica que la notación ABC sea válida")
            else:
                print("No se encontró notación ABC válida en la respuesta")

        except ImportError:
            print("symusic no está instalado. Instálalo con: pip install symusic")
        except Exception as e:
            print(f"Error renderizando audio: {e}")

    def run_test_suite(self):
        """Ejecuta una suite de pruebas del modelo"""
        print("=== SUITE DE PRUEBAS CHATMUSICIAN ===\n")

        test_instructions = [
            # Generación básica
            "Compone una melodía sencilla en Do mayor con métrica 4/4",

            # Generación con estilo
            "Crea un jig irlandés en Re mayor",

            # Generación con progresión de acordes
            "Desarrolla una pieza musical usando la progresión de acordes: 'Dm', 'C', 'Dm', 'Dm', 'C', 'Dm', 'C', 'Dm'",

            # Generación influencia de compositor
            "Desarrolla una melodía influenciada por las composiciones de Bach",

            # Análisis musical
            """Analiza esta pieza musical y describe su estructura:
X:1
T:Ejemplo de Análisis
M:4/4
L:1/8
K:G
|: G2 AB c2 d2 | e2 dc B2 A2 | G2 AB c2 d2 | e2 d2 c4 :|
|: e2 fg a2 g2 | f2 ed c2 B2 | A2 Bc d2 c2 | B2 A2 G4 :|""",
        ]

        results = []

        for i, instruction in enumerate(test_instructions, 1):
            print(f"\n{'='*50}")
            print(f"PRUEBA {i}/{len(test_instructions)}")
            print(f"{'='*50}")

            try:
                result = self.generate_music(
                    instruction,
                    save_audio=True,
                    output_file=f"test_output_{i}.wav"
                )
                results.append({
                    'test_id': i,
                    'instruction': instruction,
                    'result': result,
                    'status': 'success'
                })

            except Exception as e:
                print(f"Error en prueba {i}: {e}")
                results.append({
                    'test_id': i,
                    'instruction': instruction,
                    'result': str(e),
                    'status': 'error'
                })

        # Guardar resultados
        with open('test_results.json', 'w', encoding='utf-8') as f:
            json.dump(results, f, indent=2, ensure_ascii=False)

        print(f"\n{'='*50}")
        print("RESUMEN DE PRUEBAS COMPLETADO")
        print(f"{'='*50}")

        successful_tests = sum(1 for r in results if r['status'] == 'success')
        print(f"Pruebas exitosas: {successful_tests}/{len(results)}")
        print("Resultados guardados en: test_results.json")

    def interactive_mode(self):
        """Modo interactivo para probar el modelo"""
        print("=== MODO INTERACTIVO CHATMUSICIAN ===")
        print("Escribe tus instrucciones musicales (escribe 'quit' para salir)")
        print("Ejemplos:")
        print("- Compone una balada en La menor")
        print("- Crea un vals en Mi bemol mayor")
        print("- Genera variaciones de este tema: [notación ABC]")
        print("\n" + "="*50 + "\n")

        counter = 1
        while True:
            try:
                instruction = input(f"\nInstrucción {counter}: ").strip()

                if instruction.lower() in ['quit', 'exit', 'salir']:
                    print("¡Hasta luego!")
                    break

                if not instruction:
                    continue

                print("\nGenerando...")
                result = self.generate_music(
                    instruction,
                    save_audio=True,
                    output_file=f"interactive_output_{counter}.wav"
                )

                counter += 1

            except KeyboardInterrupt:
                print("\n\n¡Hasta luego!")
                break
            except Exception as e:
                print(f"Error: {e}")

def main():
    """Función principal"""
    import argparse

    parser = argparse.ArgumentParser(description="Probar modelo ChatMusician entrenado")
    parser.add_argument("--model_path", required=True, help="Ruta al modelo entrenado")
    parser.add_argument("--mode", choices=['test', 'interactive', 'single'],
                      default='test', help="Modo de ejecución")
    parser.add_argument("--instruction", help="Instrucción única para modo 'single'")

    args = parser.parse_args()

    # Verificar que existe el modelo
    if not os.path.exists(args.model_path):
        print(f"Error: No existe el directorio del modelo: {args.model_path}")
        return

    # Crear tester
    tester = ChatMusicianTester(args.model_path)

    try:
        # Cargar modelo
        tester.load_model()

        # Ejecutar según el modo
        if args.mode == 'test':
            tester.run_test_suite()
        elif args.mode == 'interactive':
            tester.interactive_mode()
        elif args.mode == 'single':
            if not args.instruction:
                print("Error: Se requiere --instruction para el modo 'single'")
                return
            tester.generate_music(args.instruction)

    except Exception as e:
        print(f"Error ejecutando el tester: {e}")

if __name__ == "__main__":
    main()