# Meeting Workflow Test Notebook

This notebook tests the WhisperX-based meeting workflow functionality with the MP3 file in `data/test/`.

In [1]:
from dotenv import load_dotenv, find_dotenv
import os

# .env ÌååÏùº Î°úÎìú
dotenv_path = find_dotenv()
load_dotenv(dotenv_path, override=True)

# ÌïÑÏöîÌïú ÌôòÍ≤Ω Î≥ÄÏàòÎì§ ÌôïÏù∏
print("Environment Variables:")
print(f"  OPENAI_API_KEY: {'Set' if os.getenv('OPENAI_API_KEY') else 'Not Set'}")
print(f"  HF_TOKEN: {'Set' if os.getenv('HF_TOKEN') else 'Not Set'}")
print(f"  WHISPERX_DEVICE: {os.getenv('WHISPERX_DEVICE', 'cpu')}")
print(f"  WHISPERX_MODEL: {os.getenv('WHISPERX_MODEL', 'large-v2')}")
print(f"  WHISPERX_LANGUAGE: {os.getenv('WHISPERX_LANGUAGE', 'ko')}")

Environment Variables:
  OPENAI_API_KEY: Set
  HF_TOKEN: Set
  WHISPERX_DEVICE: cpu
  WHISPERX_MODEL: large-v2
  WHISPERX_LANGUAGE: ko


In [2]:
import os
os.environ['PATH'] = '/opt/homebrew/opt/ffmpeg@7/bin:' + os.environ['PATH']

import subprocess
result = subprocess.run(['which', 'ffmpeg'], capture_output=True, text=True)
print(result.stdout)  # /opt/homebrew/opt/ffmpeg@7/bin/ffmpeg ÎÇòÏôÄÏïº Ìï®

/opt/homebrew/opt/ffmpeg@7/bin/ffmpeg



In [3]:
# Import required modules
import sys
import time
import asyncio
from pathlib import Path

# Meeting workflow imports
from app.agents.workflows.meeting_workflow import (
    process_meeting, 
    process_meeting_stream,
    create_meeting_workflow
)
from app.agents.state import MeetingState
from app.agents.nodes.meeting import (
    transcribe_audio,
    merge_transcript,
    generate_minutes
)
from app.core.config import settings

print("All imports successful!")
print(f"Settings loaded: {type(settings)}")

  from .autonotebook import tqdm as notebook_tqdm
torchcodec is not installed correctly so built-in audio decoding will fail. Solutions are:
* use audio preloaded in-memory as a {'waveform': (channel, time) torch.Tensor, 'sample_rate': int} dictionary;
* fix torchcodec installation. Error message was:

Could not load libtorchcodec. Likely causes:
          1. FFmpeg is not properly installed in your environment. We support
             versions 4, 5, 6, 7, and 8, and we attempt to load libtorchcodec
             for each of those versions. Errors for versions not installed on
             your system are expected; only the error for your installed FFmpeg
             version is relevant. On Windows, ensure you've installed the
             "full-shared" version which ships DLLs.
          2. The PyTorch version (2.8.0) is not compatible with
             this version of TorchCodec. Refer to the version compatibility
             table:
             https://github.com/pytorch/torchcodec

All imports successful!
Settings loaded: <class 'app.core.config.Settings'>


## Test Setup

Check if the test MP3 file exists and prepare test configuration

In [4]:
# Test file path
test_audio_path = "../data/test/Ïú†ÌÄ¥Ï¶à.mp3"
absolute_test_path = Path(test_audio_path).resolve()

print("Test File Information:")
print(f"  Path: {test_audio_path}")
print(f"  Exists: {absolute_test_path.exists()}")

if absolute_test_path.exists():
    file_size = absolute_test_path.stat().st_size
    print(f"  File size: {file_size:,} bytes ({file_size / (1024*1024):.2f} MB)")
    print(f"  ‚úÖ Test file is ready")
else:
    print(f"  ‚ùå Test file not found!")

# Test configuration
test_config = {
    "audio_file_path": str(absolute_test_path),
    "user_id": "test_user",
    "session_id": "test_session_001"
}

print(f"\nTest Configuration:")
for key, value in test_config.items():
    print(f"  {key}: {value}")

Test File Information:
  Path: ../data/test/Ïú†ÌÄ¥Ï¶à.mp3
  Exists: True
  File size: 2,001,188 bytes (1.91 MB)
  ‚úÖ Test file is ready

Test Configuration:
  audio_file_path: /Users/kimjunghyeon/Desktop/workspace/ai-agent/data/test/Ïú†ÌÄ¥Ï¶à.mp3
  user_id: test_user
  session_id: test_session_001


## Test Case 1: Individual Node Testing

Test each node function individually to isolate any issues

In [5]:
async def test_transcribe_node():
    """Test the transcribe_audio node individually"""
    print("=== Testing Transcribe Audio Node ===")
    
    if not absolute_test_path.exists():
        print("‚ùå Test file not found, skipping transcribe test")
        return None
    
    # Create mock state
    state = {
        "audio_file_path": str(absolute_test_path),
        "session_id": "test_session",
        "user_id": "test_user",
        "transcript": [],
        "merged_transcript": "",
        "minutes": ""
    }
    
    print(f"Input state keys: {list(state.keys())}")
    print(f"Audio file: {state['audio_file_path']}")
    
    try:
        start_time = time.time()
        print("Starting transcription...")
        
        result = await transcribe_audio(state)
        
        end_time = time.time()
        processing_time = end_time - start_time
        
        print(f"‚úÖ Transcription completed in {processing_time:.2f}s")
        print(f"Result keys: {list(result.keys())}")
        
        transcript = result.get("transcript", [])
        print(f"Transcript segments: {len(transcript)}")
        
        if transcript:
            print("\nFirst few segments:")
            for i, segment in enumerate(transcript):
                start = segment.get("start", 0)
                end = segment.get("end", 0)
                text = segment.get("text", "")
                speaker = segment.get("speaker", "")
                print(f"  {i+1}. [{start:.1f}s - {end:.1f}s] {speaker}: {text[:100]}{'...' if len(text) > 100 else ''}")
            
            # Extract unique speakers
            speakers = set(seg.get("speaker", "") for seg in transcript)
            print(f"\nUnique speakers detected: {sorted(speakers)}")
        else:
            print("‚ö†Ô∏è No transcript segments returned")
        
        return result
        
    except Exception as e:
        print(f"‚ùå Transcription failed: {str(e)}")
        print(f"Error type: {type(e).__name__}")
        return None

# Run transcribe test
transcribe_result = await test_transcribe_node()

=== Testing Transcribe Audio Node ===
Input state keys: ['audio_file_path', 'session_id', 'user_id', 'transcript', 'merged_transcript', 'minutes']
Audio file: /Users/kimjunghyeon/Desktop/workspace/ai-agent/data/test/Ïú†ÌÄ¥Ï¶à.mp3
Starting transcription...
2026-02-20 18:59:07 - whisperx.asr - INFO - No language specified, language will be detected for each audio file (increases inference time)
2026-02-20 18:59:07 - whisperx.vads.pyannote - INFO - Performing voice activity detection using Pyannote...


Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.6.1. To apply the upgrade to your files permanently, run `python -m lightning.pytorch.utilities.upgrade_checkpoint ../.venv/lib/python3.13/site-packages/whisperx/assets/pytorch_model.bin`


2026-02-20 18:59:20 - whisperx.asr - INFO - Detected language: ko (1.00) in first 30s of audio
2026-02-20 19:00:49 - whisperx.diarize - INFO - Loading diarization model: pyannote/speaker-diarization-community-1


  std = sequences.std(dim=-1, correction=1)


‚úÖ Transcription completed in 227.86s
Result keys: ['transcript']
Transcript segments: 33

First few segments:
  1. [0.0s - 6.2s] SPEAKER_00: ÏïÑÎãà Í∑∏ Ï¢Ä Î≠ê Ïò§ÏÖ®ÏúºÎãàÍπå ÏñòÍ∏∞ Ïïà Ïó¨Ï≠§Î≥º ÏàòÍ∞Ä ÏóÜÎäîÎç∞ Ïò¨Ìï¥ Ï¢Ä Ïú†ÌÇ§Ï¶àÎäî Ï¢Ä Ïñ¥Îñ® Í≤É Í∞ôÏäµÎãàÍπå?
  2. [6.3s - 7.9s] SPEAKER_01: Ïù¥Í≤å Ï¢Ä ÎäêÎÇå ÏûêÏ≤¥Î°ú.
  3. [8.0s - 12.9s] SPEAKER_00: Ïñ¥ Ïôú Ï£ºÎ≥Ä Î∂ÑÎì§.

Unique speakers detected: ['SPEAKER_00', 'SPEAKER_01']


In [6]:
async def test_merge_node(transcript_data):
    """Test the merge_transcript node"""
    print("\n=== Testing Merge Transcript Node ===")
    
    if not transcript_data or not transcript_data.get("transcript"):
        print("‚ùå No transcript data available, skipping merge test")
        return None
    
    # Create state with transcript data
    state = {
        "audio_file_path": str(absolute_test_path),
        "session_id": "test_session",
        "user_id": "test_user",
        "transcript": transcript_data["transcript"],
        "merged_transcript": "",
        "minutes": ""
    }
    
    print(f"Input segments: {len(state['transcript'])}")
    
    try:
        start_time = time.time()
        result = await merge_transcript(state)
        end_time = time.time()
        
        print(f"‚úÖ Merge completed in {end_time - start_time:.2f}s")
        
        merged_text = result.get("merged_transcript", "")
        print(f"Merged transcript length: {len(merged_text)} chars")
        
        if merged_text:
            print("\nMerged transcript preview:")
            print("-" * 50)
            print(merged_text[:500] + ("..." if len(merged_text) > 500 else ""))
            print("-" * 50)
            
            # Count speaker lines
            lines = merged_text.split('\n')
            speaker_lines = [line for line in lines if ':' in line and line.strip()]
            print(f"\nSpeaker lines: {len(speaker_lines)}")
            
            # Show first few speaker lines
            if speaker_lines:
                print("First few speaker lines:")
                for i, line in enumerate(speaker_lines[:5]):
                    print(f"  {i+1}. {line[:100]}{'...' if len(line) > 100 else ''}")
        else:
            print("‚ö†Ô∏è No merged transcript returned")
        
        return result
        
    except Exception as e:
        print(f"‚ùå Merge failed: {str(e)}")
        return None

# Run merge test if transcription succeeded
merge_result = await test_merge_node(transcribe_result)


=== Testing Merge Transcript Node ===
Input segments: 33
‚úÖ Merge completed in 0.00s
Merged transcript length: 954 chars

Merged transcript preview:
--------------------------------------------------
Speaker 1: ÏïÑÎãà Í∑∏ Ï¢Ä Î≠ê Ïò§ÏÖ®ÏúºÎãàÍπå ÏñòÍ∏∞ Ïïà Ïó¨Ï≠§Î≥º ÏàòÍ∞Ä ÏóÜÎäîÎç∞ Ïò¨Ìï¥ Ï¢Ä Ïú†ÌÇ§Ï¶àÎäî Ï¢Ä Ïñ¥Îñ® Í≤É Í∞ôÏäµÎãàÍπå?
Speaker 2: Ïù¥Í≤å Ï¢Ä ÎäêÎÇå ÏûêÏ≤¥Î°ú.
Speaker 1: Ïñ¥ Ïôú Ï£ºÎ≥Ä Î∂ÑÎì§.
Speaker 2: ÏùºÎã®ÏùÄ Ïó∞Ïï†Ïùò Ïù∏Ïó∞Ïù¥ ÏßßÍ±∞ÎÇò ÏóÜÍ≥† ÏùºÎ≥µÏù¥ ÌÑ∞ÏßÑ ÏÉÅÎì§Ïù¥ ÎßéÏúºÎãàÍπå.
Speaker 1: ÏñòÍ∏∞Í∞Ä Ï∂©Í≤©Ï†ÅÏù¥ÎÑ§. Ïó∞Ïï† Ïö¥Ïù¥ ÏóÜÍ≥† ÏùºÎ≥µÏù¥ ÌÑ∞ÏßÑ Î∂ÑÎì§Ïù¥ Ïó¨Í∏∞ ÎßéÏïÑÏöî?
Speaker 2: Îî¥Ïßì Ïïà ÌïòÍ≥†. Îî¥Ïßì Î™ª ÌïòÍ≥† ÏùºÎßå Ìï† Í±∞ÎãàÍπå. Ïïà ÌïòÎäî Í≤å ÏïÑÎãàÎùº Îî¥ÏßìÏùÑ Î™ª ÌïòÍ≥†? Ïú†Ïû¨ÏÑùÎãòÏóêÍ≤åÎäî Ïù¥Î≥¥Îã§ Îçî Îì†Îì†Ìï† Ïàò ÏóÜÎã§.
Speaker 1: Í≥†ÎßôÏäµÎãàÎã§. Îòê Ïó¨Îü¨Î∂Ñ ÎçïÎ∂ÑÏóê Ïù¥Î†áÍ≤å 1ÎÖÑ... Ï†ÄÎèÑ Ï¢Ä Ïñ¥Îîî Í∞ÄÎ©¥ ÏùºÎ≥µÏù¥ ÌÉÄÍ≥†ÎÇ¨Îã§ Ïù¥Îü∞ ÏñòÍ∏∞Î•º Ï¢Ä Îì£Í∏∞ÎèÑ ÌïòÎäîÎç∞ Ï†ÄÎèÑ Ï¢Ä Í∑∏ÎûòÏöî. Í∑∏Î†áÏ£†.
Speaker 2: Í∏∞ÏÑ∏

In [7]:
async def test_generate_minutes_node(merged_data):
    """Test the generate_minutes node"""
    print("\n=== Testing Generate Minutes Node ===")
    
    if not merged_data or not merged_data.get("merged_transcript"):
        print("‚ùå No merged transcript available, skipping minutes generation test")
        return None
    
    # Create state with merged transcript
    state = {
        "audio_file_path": str(absolute_test_path),
        "session_id": "test_session",
        "user_id": "test_user",
        "transcript": transcribe_result.get("transcript", []) if transcribe_result else [],
        "merged_transcript": merged_data["merged_transcript"],
        "minutes": ""
    }
    
    print(f"Input merged transcript length: {len(state['merged_transcript'])} chars")
    
    try:
        start_time = time.time()
        print("Starting minutes generation with LLM...")
        
        result = await generate_minutes(state)
        
        end_time = time.time()
        print(f"‚úÖ Minutes generation completed in {end_time - start_time:.2f}s")
        
        minutes = result.get("minutes", "")
        print(f"Generated minutes length: {len(minutes)} chars")
        
        if minutes and len(minutes) > 50:
            print("\nGenerated Meeting Minutes:")
            print("=" * 60)
            print(minutes[:1000] + ("\n\n[... truncated for display ...]" if len(minutes) > 1000 else ""))
            print("=" * 60)
            
            # Analyze the structure
            lines = minutes.split('\n')
            headers = [line for line in lines if line.startswith('#')]
            print(f"\nStructure analysis:")
            print(f"  Total lines: {len(lines)}")
            print(f"  Header lines: {len(headers)}")
            print(f"  Headers found: {headers[:5] if headers else 'None'}")
        else:
            print("‚ö†Ô∏è Minutes generation may have failed or returned insufficient content")
            if minutes:
                print(f"Content: {minutes}")
        
        return result
        
    except Exception as e:
        print(f"‚ùå Minutes generation failed: {str(e)}")
        return None

# Run minutes generation test if merge succeeded
minutes_result = await test_generate_minutes_node(merge_result)


=== Testing Generate Minutes Node ===
Input merged transcript length: 954 chars
Starting minutes generation with LLM...
‚úÖ Minutes generation completed in 14.47s
Generated minutes length: 716 chars

Generated Meeting Minutes:
# ÌöåÏùòÎ°ù

## ÌöåÏùò Í∞úÏöî
- ÏùºÏãú: 2023ÎÖÑ 10Ïõî 10Ïùº
- Ï∞∏ÏÑùÏûê: 2Î™Ö (Speaker 1, Speaker 2)
- ÌöåÏùò Ï£ºÏ†ú: Ïú†ÌÇ§Ï¶à ÌîÑÎ°úÍ∑∏Îû®Ïùò Ï†ÑÎßù Î∞è Í¥ÄÏÉÅÍ≥º Ïö¥Ïóê ÎåÄÌïú ÎÖºÏùò

## Ï£ºÏöî ÎÖºÏùò ÏÇ¨Ìï≠
### 1. Ïú†ÌÇ§Ï¶à ÌîÑÎ°úÍ∑∏Îû®Ïùò Ï†ÑÎßù
- Speaker 1ÏùÄ Ïú†ÌÇ§Ï¶à ÌîÑÎ°úÍ∑∏Îû®Ïùò Ìñ•ÌõÑ Ï†ÑÎßùÏóê ÎåÄÌï¥ ÏßàÎ¨∏Ìï®.
- Speaker 2Îäî Ïó∞Ïï† Ïö¥Ïù¥ Î∂ÄÏ°±ÌïòÍ≥† ÏùºÎ≥µÏù¥ ÎßéÏùÄ ÏÇ¨ÎûåÎì§Ïù¥ ÎßéÎã§Í≥† Ïñ∏Í∏âÌïòÎ©∞, Ïù¥Îäî Ïú†ÌÇ§Ï¶à ÌîÑÎ°úÍ∑∏Îû®Ïóê Í∏çÏ†ïÏ†ÅÏù∏ ÏòÅÌñ•ÏùÑ ÎØ∏Ïπ† Í≤ÉÏù¥ÎùºÍ≥† ÏÑ§Î™Ö.

### 2. Í¥ÄÏÉÅÍ≥º Ïö¥Ïùò Í¥ÄÍ≥Ñ
- ÏûòÏÉùÍ∏¥ Ïô∏Î™®Í∞Ä Î∞òÎìúÏãú Ï¢ãÏùÄ Í¥ÄÏÉÅÏùÑ ÏùòÎØ∏ÌïòÏßÄÎäî ÏïäÎäîÎã§Îäî Speaker 2Ïùò ÏÑ§Î™Ö.
- ÏñºÍµ¥Ïùò ÏÉùÍπÄÏÉàÏôÄ Ïö¥Ïùò Í¥ÄÍ≥ÑÏóê ÎåÄÌïú ÏÑ§Î™Ö: ÏñºÍµ¥ÎøêÎßå ÏïÑÎãàÎùº ÏÇ¨Ï£º, ÌíçÏàò, Í∂ÅÌï© Îì± Îã§ÏñëÌïú ÏöîÏÜåÍ∞Ä Ïö¥Ïóê 

## Test Case 2: Complete Workflow Testing

Test the complete meeting workflow end-to-end

In [None]:
async def test_complete_workflow():
    """Test the complete meeting workflow"""
    print("\n=== Complete Workflow Test ===")
    
    if not absolute_test_path.exists():
        print("‚ùå Test file not found, skipping workflow test")
        return None
    
    print("Testing complete meeting processing workflow...")
    print(f"Audio file: {absolute_test_path}")
    
    try:
        start_time = time.time()
        
        result = await process_meeting(
            audio_file_path=str(absolute_test_path),
            user_id="workflow_test_user",
            session_id="workflow_test_session"
        )
        
        end_time = time.time()
        total_time = end_time - start_time
        
        print(f"\n‚úÖ Complete workflow finished in {total_time:.2f}s ({total_time/60:.1f} minutes)")
        print(f"Result keys: {list(result.keys())}")
        
        # Analyze results
        transcript = result.get("transcript", [])
        merged_transcript = result.get("merged_transcript", "")
        minutes = result.get("minutes", "")
        session_id = result.get("session_id", "")
        
        print(f"\nWorkflow Results Summary:")
        print(f"  Session ID: {session_id}")
        print(f"  Transcript segments: {len(transcript)}")
        print(f"  Merged transcript: {len(merged_transcript)} chars")
        print(f"  Meeting minutes: {len(minutes)} chars")
        
        # Show final results
        if minutes and len(minutes) > 100:
            print(f"\nüìù Final Meeting Minutes:")
            print("=" * 80)
            print(minutes)
            print("=" * 80)
            print(f"‚úÖ Workflow completed successfully!")
        else:
            print(f"‚ö†Ô∏è Minutes may be incomplete or missing")
            if minutes:
                print(f"Minutes content: {minutes}")
        
        return result
        
    except Exception as e:
        print(f"‚ùå Complete workflow failed: {str(e)}")
        print(f"Error type: {type(e).__name__}")
        import traceback
        print(f"Traceback: {traceback.format_exc()}")
        return None

# Run complete workflow test
workflow_result = await test_complete_workflow()


=== Complete Workflow Test ===
Testing complete meeting processing workflow...
Audio file: /Users/kimjunghyeon/Desktop/workspace/ai-agent/data/test/Ïú†ÌÄ¥Ï¶à.mp3
2026-02-20 19:05:51 - whisperx.asr - INFO - Detected language: ko (1.00) in first 30s of audio


## Test Case 3: Streaming Workflow Testing

Test the streaming version of the meeting workflow

In [None]:
async def test_streaming_workflow():
    """Test the streaming meeting workflow"""
    print("\n=== Streaming Workflow Test ===")
    
    if not absolute_test_path.exists():
        print("‚ùå Test file not found, skipping streaming test")
        return None
    
    print("Testing streaming meeting processing workflow...")
    print(f"Audio file: {absolute_test_path}")
    
    try:
        start_time = time.time()
        events_received = 0
        final_result = None
        
        async for event in process_meeting_stream(
            audio_file_path=str(absolute_test_path),
            user_id="streaming_test_user",
            session_id="streaming_test_session"
        ):
            events_received += 1
            event_type = event.get("type", "")
            message = event.get("message", "")
            step = event.get("step", "")
            
            print(f"  Event {events_received}: {event_type} - {step} - {message}")
            
            # Check for completion
            if event_type == "complete":
                final_result = event
                print(f"    üìÑ Final minutes length: {len(event.get('minutes', ''))} chars")
            elif event_type == "error":
                print(f"    ‚ùå Error: {event.get('error', '')}")
        
        end_time = time.time()
        total_time = end_time - start_time
        
        print(f"\n‚úÖ Streaming workflow completed in {total_time:.2f}s")
        print(f"Total events received: {events_received}")
        
        if final_result:
            minutes = final_result.get("minutes", "")
            if minutes:
                print(f"\nüìù Final Streaming Result:")
                print("-" * 80)
                print(minutes[:800] + ("\n\n[... truncated for display ...]" if len(minutes) > 800 else ""))
                print("-" * 80)
                return final_result
        else:
            print("‚ö†Ô∏è No final result received")
        
    except Exception as e:
        print(f"‚ùå Streaming workflow failed: {str(e)}")
        import traceback
        print(f"Traceback: {traceback.format_exc()}")
        return None

# Run streaming workflow test
streaming_result = await test_streaming_workflow()

## Test Case 4: Error Handling and Edge Cases

Test various error scenarios and edge cases

In [None]:
async def test_error_handling():
    """Test error handling and edge cases"""
    print("\n=== Error Handling Tests ===")
    
    test_cases = [
        {
            "name": "Non-existent file",
            "audio_path": "data/test/nonexistent.mp3",
            "expect_error": True
        },
        {
            "name": "Empty file path",
            "audio_path": "",
            "expect_error": True
        },
        {
            "name": "Invalid file path",
            "audio_path": "/invalid/path/to/file.mp3",
            "expect_error": True
        }
    ]
    
    for i, case in enumerate(test_cases, 1):
        print(f"\nTest {i}: {case['name']}")
        print(f"  Path: {case['audio_path']}")
        print(f"  Expect error: {case['expect_error']}")
        
        try:
            result = await process_meeting(
                audio_file_path=case['audio_path'],
                user_id="error_test_user",
                session_id=f"error_test_{i}"
            )
            
            minutes = result.get("minutes", "")
            
            if case['expect_error']:
                if "Ïò§Î•ò" in minutes or "Error" in minutes or len(minutes) < 50:
                    print(f"  ‚úÖ Error handled correctly: {minutes[:100]}{'...' if len(minutes) > 100 else ''}")
                else:
                    print(f"  ‚ö†Ô∏è Expected error but got result: {minutes[:100]}")
            else:
                print(f"  ‚úÖ Successful result: {len(minutes)} chars")
                
        except Exception as e:
            if case['expect_error']:
                print(f"  ‚úÖ Exception handled as expected: {str(e)[:100]}")
            else:
                print(f"  ‚ùå Unexpected exception: {str(e)[:100]}")

await test_error_handling()

## Test Case 5: Performance Analysis

Analyze performance characteristics of the meeting workflow

In [None]:
async def analyze_performance():
    """Analyze workflow performance"""
    print("\n=== Performance Analysis ===")
    
    if not absolute_test_path.exists():
        print("‚ùå Test file not found, skipping performance analysis")
        return
    
    file_size = absolute_test_path.stat().st_size
    print(f"Audio file size: {file_size:,} bytes ({file_size / (1024*1024):.2f} MB)")
    
    # Estimate audio duration (rough estimate: ~1MB per minute for MP3)
    estimated_duration = file_size / (1024*1024)  # rough minutes estimate
    print(f"Estimated audio duration: ~{estimated_duration:.1f} minutes")
    
    performance_data = []
    
    # Single run with timing for each phase
    print("\nRunning performance test...")
    
    try:
        total_start = time.time()
        
        # Test individual nodes with timing
        state = {
            "audio_file_path": str(absolute_test_path),
            "session_id": "perf_test",
            "user_id": "perf_user",
            "transcript": [],
            "merged_transcript": "",
            "minutes": ""
        }
        
        # Phase 1: Transcription
        phase_start = time.time()
        transcribe_result = await transcribe_audio(state)
        transcribe_time = time.time() - phase_start
        
        # Phase 2: Merging
        if transcribe_result and transcribe_result.get("transcript"):
            state["transcript"] = transcribe_result["transcript"]
            phase_start = time.time()
            merge_result = await merge_transcript(state)
            merge_time = time.time() - phase_start
            
            # Phase 3: Minutes generation
            if merge_result and merge_result.get("merged_transcript"):
                state["merged_transcript"] = merge_result["merged_transcript"]
                phase_start = time.time()
                minutes_result = await generate_minutes(state)
                minutes_time = time.time() - phase_start
            else:
                merge_time = 0
                minutes_time = 0
        else:
            transcribe_time = 0
            merge_time = 0
            minutes_time = 0
        
        total_time = time.time() - total_start
        
        print(f"\nPerformance Results:")
        print(f"  Transcription time: {transcribe_time:.2f}s ({transcribe_time/total_time*100:.1f}%)")
        print(f"  Merge time: {merge_time:.2f}s ({merge_time/total_time*100:.1f}%)")
        print(f"  Minutes generation time: {minutes_time:.2f}s ({minutes_time/total_time*100:.1f}%)")
        print(f"  Total time: {total_time:.2f}s ({total_time/60:.1f} minutes)")
        
        # Performance metrics
        if estimated_duration > 0:
            processing_ratio = total_time / (estimated_duration * 60)  # total_time in seconds, duration in minutes
            print(f"  Processing ratio: {processing_ratio:.2f}x (higher is slower)")
            
            if processing_ratio < 0.5:
                print(f"  üöÄ Excellent performance (< 0.5x real-time)")
            elif processing_ratio < 1.0:
                print(f"  ‚úÖ Good performance (< 1x real-time)")
            elif processing_ratio < 2.0:
                print(f"  ‚ö†Ô∏è Acceptable performance (< 2x real-time)")
            else:
                print(f"  ‚ùå Slow performance (> 2x real-time)")
        
        # Memory and resource analysis would go here if needed
        print(f"\nResource Usage Notes:")
        print(f"  - WhisperX models are loaded and cached")
        print(f"  - GPU usage: {'Yes' if settings.WHISPERX_DEVICE == 'cuda' else 'No (CPU only)'}")
        print(f"  - Model size: {settings.WHISPERX_MODEL}")
        
    except Exception as e:
        print(f"‚ùå Performance test failed: {str(e)}")

await analyze_performance()

## Test Case 6: Manual Testing and Configuration

Custom tests and configuration validation

In [None]:
def validate_configuration():
    """Validate the configuration for meeting workflow"""
    print("=== Configuration Validation ===")
    
    # Check required settings
    config_checks = [
        ("OPENAI_API_KEY", settings.OPENAI_API_KEY, "OpenAI API access"),
        ("HF_TOKEN", getattr(settings, 'HF_TOKEN', ''), "HuggingFace token for speaker diarization"),
        ("WHISPERX_MODEL", settings.WHISPERX_MODEL, "WhisperX model size"),
        ("WHISPERX_DEVICE", settings.WHISPERX_DEVICE, "Processing device"),
        ("WHISPERX_LANGUAGE", settings.WHISPERX_LANGUAGE, "Default language"),
        ("MINUTES_MODEL", settings.MINUTES_MODEL, "Meeting minutes LLM model"),
        ("MINUTES_TEMPERATURE", str(settings.MINUTES_TEMPERATURE), "LLM temperature")
    ]
    
    print("Configuration Status:")
    all_good = True
    
    for key, value, description in config_checks:
        if value and str(value).strip():
            status = "‚úÖ"
            display_value = value if key not in ['OPENAI_API_KEY', 'HF_TOKEN'] else "***[SET]***"
        else:
            status = "‚ùå"
            display_value = "[NOT SET]"
            all_good = False
        
        print(f"  {status} {key}: {display_value} ({description})")
    
    print(f"\nOverall configuration: {'‚úÖ Ready' if all_good else '‚ùå Issues found'}")
    
    # Check system requirements
    print("\nSystem Requirements:")
    
    try:
        import torch
        print(f"  ‚úÖ PyTorch: {torch.__version__}")
        print(f"  ‚úÖ CUDA available: {torch.cuda.is_available()}")
        if torch.cuda.is_available():
            print(f"      GPU: {torch.cuda.get_device_name(0)}")
    except ImportError:
        print(f"  ‚ùå PyTorch not installed")
    
    try:
        import whisperx
        print(f"  ‚úÖ WhisperX available")
    except ImportError:
        print(f"  ‚ùå WhisperX not installed")
    
    return all_good

config_valid = validate_configuration()

In [None]:
async def test_custom_audio(audio_path: str = None):
    """
    Test with a custom audio file path
    """
    if not audio_path:
        audio_path = str(absolute_test_path)
    
    print(f"=== Custom Audio Test ===")
    print(f"Testing with: {audio_path}")
    
    if not Path(audio_path).exists():
        print(f"‚ùå File not found: {audio_path}")
        return None
    
    try:
        result = await process_meeting(
            audio_file_path=audio_path,
            user_id="custom_test_user",
            session_id="custom_test_session"
        )
        
        print(f"‚úÖ Processing completed")
        print(f"Minutes length: {len(result.get('minutes', ''))} chars")
        
        return result
        
    except Exception as e:
        print(f"‚ùå Custom test failed: {str(e)}")
        return None

# Run with the default test file
# custom_result = await test_custom_audio()
print("Custom audio test function ready. Use: await test_custom_audio('path/to/your/audio.mp3')")

## Test Summary and Results

Overall test summary and recommendations

In [None]:
print("\n" + "="*80)
print("MEETING WORKFLOW TEST SUMMARY")
print("="*80)

# Collect test results
test_results = {
    "Configuration": config_valid,
    "Test File Available": absolute_test_path.exists(),
    "Individual Node Tests": {
        "Transcription": transcribe_result is not None,
        "Merge Transcript": merge_result is not None,
        "Generate Minutes": minutes_result is not None
    },
    "Workflow Tests": {
        "Complete Workflow": workflow_result is not None,
        "Streaming Workflow": streaming_result is not None
    }
}

print("\nTest Results:")
print(f"  üìÅ Test file available: {'‚úÖ' if test_results['Test File Available'] else '‚ùå'}")
print(f"  ‚öôÔ∏è  Configuration valid: {'‚úÖ' if test_results['Configuration'] else '‚ùå'}")

print(f"\n  Individual Nodes:")
for node, success in test_results['Individual Node Tests'].items():
    print(f"    {node}: {'‚úÖ' if success else '‚ùå'}")

print(f"\n  Workflow Tests:")
for workflow, success in test_results['Workflow Tests'].items():
    print(f"    {workflow}: {'‚úÖ' if success else '‚ùå'}")

# Overall assessment
all_tests_passed = (
    test_results['Configuration'] and 
    test_results['Test File Available'] and
    all(test_results['Individual Node Tests'].values()) and
    all(test_results['Workflow Tests'].values())
)

print(f"\nüéØ Overall Status: {'‚úÖ ALL TESTS PASSED' if all_tests_passed else '‚ö†Ô∏è  Some tests failed or skipped'}")

print(f"\nKey Features Tested:")
print(f"  üé§ WhisperX-based speech-to-text transcription")
print(f"  üë• Speaker diarization and identification")
print(f"  üìù Automatic transcript merging and formatting")
print(f"  ü§ñ LLM-powered meeting minutes generation")
print(f"  üîÑ Complete workflow orchestration")
print(f"  üì° Real-time streaming processing")
print(f"  ‚ö†Ô∏è  Error handling and edge cases")

print(f"\nAPI Endpoints Available:")
print(f"  POST /meeting/upload - Upload audio and get meeting minutes")
print(f"  POST /meeting/upload/stream - Upload audio with streaming processing")

print(f"\nRecommendations:")
if not config_valid:
    print(f"  ‚ö†Ô∏è  Fix configuration issues (especially HF_TOKEN for speaker diarization)")
if not test_results['Test File Available']:
    print(f"  üìÅ Add test audio files to data/test/ directory")
print(f"  üöÄ Consider GPU acceleration by setting WHISPERX_DEVICE=cuda")
print(f"  üìä Monitor performance with different audio lengths and quality")
print(f"  üîí Implement proper file validation and security measures for production")
print(f"  üíæ Consider adding database storage for meeting records if needed")

print("\n" + "="*80)

## Quick Test Functions

Convenient functions for quick testing during development

In [None]:
# Quick test functions for development

async def quick_test():
    """Quick test with the default file"""
    if not absolute_test_path.exists():
        print("‚ùå Test file not available")
        return
    
    print("üöÄ Running quick test...")
    result = await process_meeting(
        audio_file_path=str(absolute_test_path),
        user_id="quick_test",
        session_id="quick_session"
    )
    
    minutes = result.get("minutes", "")
    print(f"‚úÖ Generated minutes ({len(minutes)} chars)")
    if minutes:
        print(f"Preview: {minutes[:200]}...")
    
    return result

async def quick_stream_test():
    """Quick streaming test"""
    if not absolute_test_path.exists():
        print("‚ùå Test file not available")
        return
    
    print("üîÑ Running quick streaming test...")
    async for event in process_meeting_stream(
        audio_file_path=str(absolute_test_path),
        user_id="stream_test",
        session_id="stream_session"
    ):
        print(f"  {event.get('type', '')}: {event.get('message', '')}")
        if event.get('type') == 'complete':
            minutes = event.get('minutes', '')
            print(f"‚úÖ Final result: {len(minutes)} chars")
            break

# Functions are ready to use:
# await quick_test()
# await quick_stream_test()
print("Quick test functions ready!")
print("  - await quick_test()")
print("  - await quick_stream_test()")