In [2]:
import requests
import time
import json
from transformers import AutoTokenizer

# Load Qwen3 tokenizer
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen3-4B")

  from .autonotebook import tqdm as notebook_tqdm


In [6]:
import requests
response = requests.get("http://localhost:8000/v1/models")
response.json()['data'][0]['root']

'zhiqing/Qwen3-14B-INT8'

In [None]:
import os
import glob

def measure_vllm_response(file_path, vllm_url="http://localhost:8000/v1/chat/completions", 
                         max_input_tokens=None, max_output_tokens=1024, max_generation_time=7.0, 
                         print_stream=False, benchmark_mode=False):
    """
    Send file content to vLLM chat endpoint and measure response metrics with streaming
    
    Args:
        file_path: Path to the input file
        vllm_url: vLLM endpoint URL
        max_input_tokens: Maximum tokens for input (for truncation), None for no limit
        max_output_tokens: Maximum tokens for output response
        max_generation_time: Maximum time in seconds for generation after prefill (default: 7.0)
        print_stream: Whether to print streaming content (default: False for benchmarking)
        benchmark_mode: If True, optimize for precise TTFT measurement
    """
    # Read the file
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()
    
    # Truncate content if max_input_tokens is specified
    if max_input_tokens is not None:
        tokens = tokenizer.encode(content)
        if len(tokens) > max_input_tokens:
            truncated_tokens = tokens[:max_input_tokens]
            content = tokenizer.decode(truncated_tokens)
            if not benchmark_mode:
                print(f"Content truncated from {len(tokens)} to {max_input_tokens} tokens")
    
    # Get actual input token count
    input_tokens = len(tokenizer.encode(content))
    
    # Prepare chat request with streaming
    payload = {
        "model": "qwen3",
        "messages": [
            {"role": "user", "content": content}
        ],
        "max_tokens": max_output_tokens,
        "temperature": 0.7,
        "stream": True,
        "stream_options": {"include_usage": True},  # Request usage info in stream
        "chat_template_kwargs": {
                "enable_thinking": False
        }
    }
    
    headers = {
        "Content-Type": "application/json",
        "Accept": "text/event-stream",
        "Connection": "keep-alive",
    }
    
    # Use session for better connection management
    session = requests.Session()
    
    # Start timing
    start_time = time.time()
    ttft = None
    generation_start_time = None
    generation_stopped_early = False
    
    # Send request to vLLM with streaming
    response = session.post(vllm_url, json=payload, headers=headers, stream=True)
    
    if response.status_code != 200:
        session.close()
        raise Exception(f"Request failed with status {response.status_code}: {response.text}")
    
    # Process streaming response
    full_response = ""
    output_tokens = 0
    actual_input_tokens = None
    actual_output_tokens = None
    
    if print_stream and not benchmark_mode:
        print("Response streaming:")
        print("-" * 50)
    
    try:
        # Use minimal buffering for faster first token detection
        for line in response.iter_lines(chunk_size=1, decode_unicode=True):
            if not line:
                continue
                
            if line.startswith('data: '):
                data_str = line[6:]  # Remove 'data: ' prefix
                if data_str.strip() == '[DONE]':
                    break
                
                try:
                    data = json.loads(data_str)
                    
                    # Check for usage information (comes in final event)
                    if 'usage' in data:
                        actual_input_tokens = data['usage']['prompt_tokens']
                        actual_output_tokens = data['usage']['completion_tokens']
                        if not benchmark_mode:
                            print(f"\nUsage info received: {actual_input_tokens} input tokens, {actual_output_tokens} output tokens")
                    
                    if 'choices' in data and len(data['choices']) > 0:
                        choice = data['choices'][0]
                        if 'delta' in choice and 'content' in choice['delta']:
                            current_time = time.time()
                            
                            # Record TTFT (time to first token) - do this FIRST before any other processing
                            if ttft is None:
                                ttft = current_time - start_time
                                generation_start_time = current_time
                            
                            content_chunk = choice['delta']['content']
                            full_response += content_chunk
                            
                            # Print the streaming tokens
                            print(content_chunk, end='', flush=True)
                            
                            # Check if generation time exceeds max_generation_time (only if not benchmark mode)
                            if not benchmark_mode and generation_start_time is not None:
                                generation_elapsed = current_time - generation_start_time
                                if generation_elapsed > max_generation_time:
                                    print(f"\nGeneration stopped early after {generation_elapsed:.2f} seconds (max: {max_generation_time}s)")
                                    generation_stopped_early = True
                                    response.close()
                                    break
                            
                except json.JSONDecodeError:
                    continue
            
            # Check timeout after each line as well (only if not benchmark mode)
            if not benchmark_mode and generation_start_time is not None:
                generation_elapsed = time.time() - generation_start_time
                if generation_elapsed > max_generation_time:
                    print(f"\nGeneration stopped early after {generation_elapsed:.2f} seconds (max: {max_generation_time}s)")
                    generation_stopped_early = True
                    response.close()
                    break
    
    except Exception as e:
        # If there's any error during streaming, close the response
        response.close()
        session.close()
        if not generation_stopped_early:
            raise e
    
    # Clean up
    response.close()
    session.close()
    
    if print_stream and not benchmark_mode:
        print()  # New line after streaming
        print("-" * 50)
    
    end_time = time.time()
    total_time = end_time - start_time
    
    # If no TTFT was recorded (no tokens received), set it to total time
    if ttft is None:
        ttft = total_time
    
    # Use actual token counts from server if available, otherwise fall back to tokenizer estimate
    if actual_input_tokens is not None and actual_output_tokens is not None and not generation_stopped_early:
        input_tokens = actual_input_tokens
        output_tokens = actual_output_tokens
        if not benchmark_mode:
            print(f"Using server-reported token counts: {input_tokens} input, {output_tokens} output")
    else:
        # Fallback: estimate output tokens using tokenizer
        output_tokens = len(tokenizer.encode(full_response))
        if not benchmark_mode:
            print(f"Using tokenizer estimates: {input_tokens} input, {output_tokens} output")
            if generation_stopped_early:
                print("Note: Generation was stopped early due to time limit")
    
    # Calculate generation speed using actual token counts
    generation_time = max(total_time - ttft, 1e-9)
    gen_tokens_per_sec = output_tokens / generation_time
    
    # Total tokens
    total_tokens = input_tokens + output_tokens
    
    # Metrics dictionary
    metrics = {
        'ttft': ttft,
        'gen_tokens_per_sec': gen_tokens_per_sec,
        'total_tokens': total_tokens,
        'total_time': total_time,
        'input_tokens': input_tokens,
        'output_tokens': output_tokens,
        'generation_time': generation_time,
        'generation_stopped_early': generation_stopped_early
    }
    
    # Print additional information
    # if not benchmark_mode:
    start_time_formatted = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(generation_start_time))
    end_time_formatted = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(end_time))
    print(f"Start time: {start_time_formatted}")
    print(f"End time: {end_time_formatted}")
    print(f"Tokens generated: {output_tokens}")
    print(f"Generation speed formula: Tokens/sec = Total tokens / Total time")

    return full_response, metrics

def save_and_print_metrics(response, metrics, output_file):
    """
    Save response and metrics to file and print metrics
    """
    # Print metrics
    print(f"TTFT: {metrics['ttft']:.2f} seconds")
    print(f"Gen tokens/sec (post-TTFT): {metrics['gen_tokens_per_sec']:.2f}")
    # print(f"E2E tokens/sec (incl. prefill): {metrics['e2e_tokens_per_sec']:.2f}")
    print(f"Total tokens: {metrics['total_tokens']}")
    print(f"Input tokens: {metrics['input_tokens']}")
    print(f"Output tokens: {metrics['output_tokens']}")
    print(f"Total time: {metrics['total_time']:.2f} seconds")
    print(f"Generation time: {metrics['generation_time']:.2f} seconds")
    if metrics.get('generation_stopped_early', False):
        print("Generation was stopped early due to time limit")
    
    # Save to file
    with open(output_file, 'w', encoding='utf-8') as f:
        # Write metrics in the format similar to the examples
        f.write(f"ttft: {metrics['ttft']:.2f}\n")
        f.write(f"gen_tokens_per_second: {metrics['gen_tokens_per_sec']:.2f}\n")
        # f.write(f"e2e_tokens_per_second: {metrics['e2e_tokens_per_sec']:.2f}\n")
        f.write(f"total_tokens: {metrics['total_tokens']}\n")
        f.write(f"total_time: {metrics['total_time']:.2f}\n")
        f.write(f"input_tokens: {metrics['input_tokens']}\n")
        f.write(f"output_tokens: {metrics['output_tokens']}\n")
        f.write(f"generation_time: {metrics['generation_time']:.2f}\n")
        if metrics.get('generation_stopped_early', False):
            f.write("generation_stopped_early: true\n")
        f.write("\n")
        f.write(response)

file_path = "tests/book.txt"
vllm_url = "http://localhost:8000/v1/chat/completions"
max_output_tokens = 100

import threading
import time
import os

def run_concurrent_test(file_path, max_input_tokens, max_output_tokens, request_id):
    """Run a single request for concurrent testing"""
    if not os.environ.get('BENCHMARK_QUIET'):
        print(f"Starting request {request_id}")
    start = time.time()
    
    try:
        response, metrics = measure_vllm_response(
            file_path=file_path,
            max_input_tokens=max_input_tokens,
            max_output_tokens=max_output_tokens,
            print_stream=False,  # Disable streaming output for benchmarks
            benchmark_mode=True   # Enable benchmark optimizations
        )
            
        end = time.time()
        if not os.environ.get('BENCHMARK_QUIET'):
            print(f"Request {request_id} completed in {end - start:.2f}s")
        return metrics
        
    except Exception as e:
        print(f"Request {request_id} failed: {e}")
        return None

print("Testing different input token sizes with various concurrent requests:")

input_token_sizes = [1000, 5000, 10000, 15000, 20000, 25000, 30000]
# input_token_sizes = [15000]
concurrent_counts = [2, 5]

# Set environment variable to reduce noise during benchmarking
os.environ['BENCHMARK_QUIET'] = '1'

for concurrent_count in concurrent_counts:
    for token_size in input_token_sizes:
        print(f"\n" + "="*80)
        print(f"Testing {concurrent_count} concurrent requests with {token_size} input tokens each")
        print("="*80)
        
        threads = []
        all_metrics = []
        
        # Start all threads
        concurrent_start_time = time.time()
        for i in range(concurrent_count):
            thread = threading.Thread(
                target=lambda i=i: all_metrics.append(
                    run_concurrent_test(file_path, token_size, max_output_tokens, i+1)
                )
            )
            threads.append(thread)
            thread.start()
            time.sleep(0.1)  # Small delay to avoid overwhelming the server
        
        # Wait for all threads to complete
        print(f"Waiting for all {concurrent_count} threads to complete...")
        for i, thread in enumerate(threads):
            thread.join()
            if not os.environ.get('BENCHMARK_QUIET'):
                print(f"Thread {i+1} joined")
        
        concurrent_end_time = time.time()
        total_concurrent_time = concurrent_end_time - concurrent_start_time
        
        # Filter out None results (failed requests)
        valid_metrics = [m for m in all_metrics if m is not None]
        valid_requests = len(valid_metrics)
        
        if valid_requests > 0:
            # Calculate aggregate metrics
            avg_ttft = sum(m['ttft'] for m in valid_metrics) / valid_requests
            avg_gen_tokens_per_sec = sum(m['gen_tokens_per_sec'] for m in valid_metrics) / valid_requests
            avg_total_tokens = sum(m['total_tokens'] for m in valid_metrics) / valid_requests
            avg_total_time = sum(m['total_time'] for m in valid_metrics) / valid_requests
            avg_input_tokens = sum(m['input_tokens'] for m in valid_metrics) / valid_requests
            avg_output_tokens = sum(m['output_tokens'] for m in valid_metrics) / valid_requests
            
            # Create output filename
            # Get model name from API
            response = requests.get("http://localhost:8000/v1/models")
            model_name = response.json()['data'][0]['root'].split('/')[-1]
            
            output_filename = f"speed_tests/T2_x2/{model_name}/{token_size}_length_{concurrent_count}_parallel.txt"
            os.makedirs(os.path.dirname(output_filename), exist_ok=True)
            # Save results to file
            with open(output_filename, 'w', encoding='utf-8') as f:
                # Write aggregate metrics
                f.write(f"ttft: {avg_ttft:.2f}\n")
                f.write(f"gen_tokens_per_second: {avg_gen_tokens_per_sec:.2f}\n")
                f.write(f"total_tokens: {avg_total_tokens:.0f}\n")
                f.write(f"total_time: {avg_total_time:.2f}\n")
                f.write(f"input_tokens: {avg_input_tokens:.0f}\n")
                f.write(f"output_tokens: {avg_output_tokens:.0f}\n")
                f.write(f"concurrent_requests: {concurrent_count}\n")
                f.write(f"valid_requests: {valid_requests}\n")
                f.write(f"total_concurrent_time: {total_concurrent_time:.2f}\n")
                f.write(f"\nAverage metrics for {valid_requests}/{concurrent_count} concurrent requests of {token_size} tokens each\n\n")
                
                # Write individual request metrics
                f.write("Individual request metrics:\n\n")
                for i, metrics in enumerate(valid_metrics):
                    f.write(f"Request {i+1}:\n")
                    f.write(f"  ttft: {metrics['ttft']:.2f}\n")
                    f.write(f"  gen_tokens_per_second: {metrics['gen_tokens_per_sec']:.2f}\n")
                    f.write(f"  total_tokens: {metrics['total_tokens']}\n")
                    f.write(f"  total_time: {metrics['total_time']:.2f}\n")
                    f.write(f"  input_tokens: {metrics['input_tokens']}\n")
                    f.write(f"  output_tokens: {metrics['output_tokens']}\n")
                    f.write(f"\n")
            
            print(f"\nResults for {concurrent_count} concurrent requests with {token_size} tokens:")
            print(f"Average TTFT: {avg_ttft:.2f}s")
            print(f"Average gen tokens/sec: {avg_gen_tokens_per_sec:.2f}")
            print(f"Average total time: {avg_total_time:.2f}s")
            print(f"Total concurrent time: {total_concurrent_time:.2f}s")
            print(f"Valid requests: {valid_requests}/{concurrent_count}")
            print(f"Results saved to: {output_filename}")
        else:
            print(f"All requests failed for {concurrent_count} concurrent requests with {token_size} tokens")
        
        # Brief pause between tests
        time.sleep(2)

# Clean up environment variable
if 'BENCHMARK_QUIET' in os.environ:
    del os.environ['BENCHMARK_QUIET']

print("\nAll tests completed!")

Testing different input token sizes with various concurrent requests:

Testing 2 concurrent requests with 1000 input tokens each
Waiting for all 2 threads to complete...
****ОЭгонринь Х иан лтеред****  
  
****ОЭгонринь Х иан лтеред****  
  
***ЖПранологр (:**прод Солжказениеки)* /

 К—от Мыы не- сумпелиред защитатитьели своих / детей К отл Сассумикара  
ч**ногоА пнлемнениот,ация —:** продолж  
илВ оно, время его войны голос между был кл гланубамиок Симум ира тчверноед пымлем,я как из камгеньна.ло — к Иотов теперь п онилем бениег Вутет,ра как с кры ихсы территории,, от что одного нару ошиглоня рав кнов другесомуие, в вмест лоес тогоу чтобы. с Длямотр восетьстанов вления глаз справаед темлив,Start time: 2025-09-22 03:19:23
End time: 2025-09-22 03:19:28
Tokens generated: 100
Generation speed formula: Tokens/sec = Total tokens / Total time
Start time: 2025-09-22 03:19:23
End time: 2025-09-22 03:19:28
Tokens generated: 100
Generation speed formula: Tokens/sec = Total tokens / Total time

R