In [None]:
from typing import List, Optional, Tuple, Literal
from pydantic import BaseModel, Field
import json
import re
import outlines
import json
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from huggingface_hub import login as hf_login
import os
import warnings
from music21 import chord, pitch,harmony
from Notevent import Event, Instrument
from orchestrator import EnhancedMusicOrchestrator
from dynamics import MultiLLMDynamicsGenerator


In [None]:
def test_complete_llm_pipeline_updated(pipe, user_prompt="sad piano song", phrases=4, use_targeting=True):
    """
    Complete end-to-end test: Text prompt -> Audio file
    Updated to work with the enhanced orchestrator and dynamics system

    Args:
        pipe: LLM pipeline
        user_prompt: Natural language music request
        phrases: Number of melody phrases to generate
        use_targeting: Whether to use chord targeting
    """

    print(f"🎵 COMPLETE LLM PIPELINE TEST")
    print(f"=" * 50)
    print(f"User Request: '{user_prompt}'")
    print(f"Chord Targeting: {'Enabled' if use_targeting else 'Disabled'}")
    print()

    # Step 1: Generate Musical Framework
    print("Step 1: Generating musical framework...")
    orchestrator = EnhancedMusicOrchestrator(pipe)
    framework = orchestrator.orchestrate(user_prompt, num_phrases=phrases)

    print(f"✅ Framework generated:")
    print(f"   Style: {framework['orchestration']['emotion']} {framework['orchestration']['style']}")
    print(f"   BPM: {framework['orchestration']['bpm']}")
    print(f"   Instrument: {framework['orchestration']['instrument']}")
    print(f"   Description: {framework['orchestration']['melody_description']}")
    print(f"   Key: {framework['key']}")
    print(f"   Chord Progression ({len(framework['chord_progression'])} chords): {' → '.join(framework['chord_progression'])}")
    print()

    # Step 2: Generate Melody
    print("Step 2: Generating melody...")
    successful_phrases = 0
    
    for i in range(phrases):
        try:
            phrase = orchestrator.melody_generator(use_targeting=use_targeting)
            
            if phrase and len(phrase.get('notes', [])) > 0:
                successful_phrases += 1
            else:
                print(f"  Phrase {i+1} generated no notes")
                
        except Exception as e:
            print(f" Phrase {i+1} failed: {e}")

    print(f"Generated {len(orchestrator.running_melody)} phrases:")
    print(f"   Success rate: {successful_phrases}/{phrases} ({(successful_phrases/phrases)*100:.1f}%)")
    
    total_notes = 0
    for i, phrase in enumerate(orchestrator.running_melody, 1):
        note_count = len(phrase['notes'])
        total_notes += note_count
        beats = sum(note['beats'] for note in phrase['notes'])
        print(f"   Phrase {i}: {note_count} notes, {beats}beats")

    print(f"   Total: {total_notes} notes")
    print()

    # Step 2.5: Optional chord targeting analysis
    if use_targeting and total_notes > 0:
        print("Step 2.5: Analyzing chord targeting...")
        orchestrator.analyze_melody_targeting()
        print()

    # Step 3: Generate Dynamics
    print("Step 3: Adding dynamics...")
    try:
        dynamics_gen = MultiLLMDynamicsGenerator(pipe)
        complete_events = dynamics_gen.generate_dynamics(
            running_melody=orchestrator.running_melody,
            orchestration=orchestrator.orchestration
        )

        print(f"Generated {len(complete_events)} complete events with dynamics")

        # Show sample events
        print("Sample dynamics:")
        for i, event in enumerate(complete_events[:5]):
            art = event.get('articulation') or 'normal'
            print(f"   {i+1}: {event['pitch']} {event['beats']}b vel:{event['velocity']} {art}")
        if len(complete_events) > 5:
            print(f"   ... and {len(complete_events) - 5} more events")
        print()

    except Exception as e:
        print(f" Dynamics generation failed: {e}")
        return None

    # Step 4: Generate Audio Files
    print("Step 4: Generating audio files...")
    try:
        # Create filename based on user prompt
        safe_filename = "".join(c for c in user_prompt if c.isalnum() or c in (' ', '-', '_')).rstrip()
        safe_filename = safe_filename.replace(' ', '_')[:20]

        # Generate audio files
        audio_result = create_audio_files(
            events=complete_events,
            orchestration=orchestrator.orchestration,
            filename_base=safe_filename
        )

        if audio_result:
            print(f"✅ Audio generation complete!")
            print(f"   WAV: {audio_result['wav_file']}")
            print(f"   MIDI: {audio_result['midi_file']}")
            print(f"   Duration: {audio_result['duration']:.2f} seconds")
            print()

            return {
                'orchestration': orchestrator.orchestration,
                'melody': orchestrator.running_melody,
                'events': complete_events,
                'audio_result': audio_result,
                'success_rate': successful_phrases / phrases,
                'total_notes': total_notes,
                'framework': framework
            }
        else:
            print(f"Audio generation failed")
            return None

    except Exception as e:
        print(f"Audio generation failed: {e}")
        return None

# Quick test function for single generations
def quick_test_pipeline(pipe, prompt="generate a Bach-like melody"):
    """Quick test of the pipeline with default settings."""
    
    result = test_complete_llm_pipeline_updated(
        pipe=pipe,
        user_prompt=prompt,
        phrases=4,
        use_targeting=True
    )
    
    if result:
        print(f" Generated {result['total_notes']} notes")
        print(f" MIDI file: {result['audio_result']['midi_file']}")
        return result
    else:
        print("Pipeline test failed")
        return None


In [None]:

output_dir = "Musical_Analysis/Generated_corpus"
os.makedirs(output_dir, exist_ok=True) 

for i in range(100):
    result = test_complete_llm_pipeline_updated(
        pipe, 
        'compose a melody in the exact style of Johann Sebastian Bach', 
        phrases=6, 
        use_targeting=True
    )
    
    # Build zero-padded filename
    filename = f"compose_a_melody_in_{i:03d}.mid"
    output_path = os.path.join(output_dir, filename)
    
    # Save the generated MIDI
    shutil.copy("compose_a_melody_in_.mid", output_path)
    print(f"Generated Bach melody {i+1}/100 → {output_path}")
