# 🌊 Streaming Performance с Cynosure Bridge

Оптимизация и настройка streaming для максимальной производительности.

## 📦 Установка библиотек

In [None]:
!pip install openai langchain-openai asyncio aiohttp websockets streamlit

## 🚀 Базовый streaming setup

In [None]:
import openai
import asyncio
import time
from typing import AsyncGenerator, Generator
import json

# Настройка клиента
client = openai.OpenAI(
    base_url="http://192.168.1.196:3000/v1",
    api_key="dummy-key"
)

async_client = openai.AsyncOpenAI(
    base_url="http://192.168.1.196:3000/v1",
    api_key="dummy-key"
)

print("✅ OpenAI клиенты настроены для streaming")

## ⚡ Синхронный streaming

In [None]:
def basic_streaming_chat(message: str, model: str = "gpt-4") -> Generator[str, None, None]:
    """Базовый синхронный streaming"""
    try:
        stream = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": message}],
            stream=True,
            max_tokens=1000
        )
        
        for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
                
    except Exception as e:
        yield f"Ошибка: {str(e)}"

# Тестирование базового streaming
print("🌊 Базовый streaming:")
start_time = time.time()
full_response = ""

for chunk in basic_streaming_chat("Расскажи про streaming в AI моделях"):
    print(chunk, end="", flush=True)
    full_response += chunk

end_time = time.time()
print(f"\n\n⏱️ Время: {end_time - start_time:.2f}с")
print(f"📝 Длина ответа: {len(full_response)} символов")

## 🚀 Асинхронный streaming

In [None]:
async def async_streaming_chat(message: str, model: str = "gpt-4") -> AsyncGenerator[str, None]:
    """Асинхронный streaming для лучшей производительности"""
    try:
        stream = await async_client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": message}],
            stream=True,
            max_tokens=1000
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
                
    except Exception as e:
        yield f"Ошибка: {str(e)}"

# Тестирование асинхронного streaming
async def test_async_streaming():
    print("🚀 Асинхронный streaming:")
    start_time = time.time()
    full_response = ""
    
    async for chunk in async_streaming_chat("Объясни преимущества асинхронного программирования"):
        print(chunk, end="", flush=True)
        full_response += chunk
    
    end_time = time.time()
    print(f"\n\n⏱️ Время: {end_time - start_time:.2f}с")
    print(f"📝 Длина ответа: {len(full_response)} символов")
    return full_response

# Запуск асинхронного теста
async_response = await test_async_streaming()

## 🔄 Параллельные streaming запросы

In [None]:
async def parallel_streaming_requests(messages: list[str]) -> list[str]:
    """Параллельная обработка нескольких streaming запросов"""
    async def process_single_request(message: str, request_id: int) -> str:
        print(f"\n🔄 Запрос {request_id}: {message[:50]}...")
        full_response = ""
        
        async for chunk in async_streaming_chat(message):
            full_response += chunk
        
        print(f"\n✅ Запрос {request_id} завершен ({len(full_response)} символов)")
        return full_response
    
    # Создание задач для параллельного выполнения
    tasks = [
        process_single_request(message, i+1) 
        for i, message in enumerate(messages)
    ]
    
    # Ожидание завершения всех задач
    start_time = time.time()
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    
    print(f"\n⏱️ Общее время всех запросов: {end_time - start_time:.2f}с")
    return results

# Тестирование параллельных запросов
test_messages = [
    "Что такое машинное обучение?",
    "Объясни принципы веб-разработки",
    "Расскажи про базы данных"
]

parallel_results = await parallel_streaming_requests(test_messages)
print(f"\n📊 Обработано {len(parallel_results)} запросов параллельно")

## 💾 Streaming с сохранением в реальном времени

In [None]:
import json
from datetime import datetime
import os

class StreamingLogger:
    def __init__(self, log_file: str = "streaming_log.jsonl"):
        self.log_file = log_file
        self.session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    async def streaming_with_logging(self, message: str, model: str = "gpt-4"):
        """Streaming с логированием каждого чанка"""
        session_data = {
            "session_id": self.session_id,
            "timestamp": datetime.now().isoformat(),
            "message": message,
            "model": model,
            "chunks": []
        }
        
        start_time = time.time()
        full_response = ""
        chunk_count = 0
        
        try:
            async for chunk in async_streaming_chat(message, model):
                chunk_time = time.time() - start_time
                
                # Логирование чанка
                chunk_data = {
                    "chunk_id": chunk_count,
                    "content": chunk,
                    "timestamp": chunk_time,
                    "content_length": len(chunk)
                }
                session_data["chunks"].append(chunk_data)
                
                full_response += chunk
                chunk_count += 1
                
                # Вывод чанка
                print(chunk, end="", flush=True)
                
                # Сохранение промежуточного состояния каждые 10 чанков
                if chunk_count % 10 == 0:
                    self._save_session(session_data)
        
        except Exception as e:
            session_data["error"] = str(e)
        
        finally:
            # Финальное сохранение
            session_data["total_time"] = time.time() - start_time
            session_data["total_chunks"] = chunk_count
            session_data["full_response"] = full_response
            session_data["response_length"] = len(full_response)
            
            self._save_session(session_data)
            
            print(f"\n\n📊 Статистика:")
            print(f"   ⏱️ Время: {session_data['total_time']:.2f}с")
            print(f"   🧩 Чанков: {chunk_count}")
            print(f"   📝 Символов: {len(full_response)}")
            print(f"   💾 Лог сохранен в: {self.log_file}")
        
        return full_response
    
    def _save_session(self, session_data: dict):
        """Сохранение данных сессии в JSONL файл"""
        with open(self.log_file, "a", encoding="utf-8") as f:
            f.write(json.dumps(session_data, ensure_ascii=False) + "\n")

# Тестирование streaming с логированием
logger = StreamingLogger()
logged_response = await logger.streaming_with_logging(
    "Напиши подробную статью про искусственный интеллект и его влияние на общество"
)

## 🎛️ Настройка streaming параметров

In [None]:
class OptimizedStreaming:
    def __init__(self):
        self.metrics = {
            "response_times": [],
            "chunk_counts": [],
            "tokens_per_second": []
        }
    
    async def optimized_stream(self, 
                              message: str, 
                              model: str = "gpt-4",
                              max_tokens: int = 1000,
                              temperature: float = 0.7,
                              top_p: float = 1.0) -> str:
        """Оптимизированный streaming с настраиваемыми параметрами"""
        start_time = time.time()
        full_response = ""
        chunk_count = 0
        
        try:
            stream = await async_client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": message}],
                stream=True,
                max_tokens=max_tokens,
                temperature=temperature,
                top_p=top_p
            )
            
            async for chunk in stream:
                if chunk.choices[0].delta.content:
                    content = chunk.choices[0].delta.content
                    full_response += content
                    chunk_count += 1
                    
                    # Отображение с ограничением частоты для лучшей производительности
                    if chunk_count % 3 == 0:  # Обновляем каждый 3-й чанк
                        print(content, end="", flush=True)
        
        except Exception as e:
            print(f"\nОшибка: {e}")
            return ""
        
        # Сбор метрик
        total_time = time.time() - start_time
        tokens_per_sec = len(full_response.split()) / total_time if total_time > 0 else 0
        
        self.metrics["response_times"].append(total_time)
        self.metrics["chunk_counts"].append(chunk_count)
        self.metrics["tokens_per_second"].append(tokens_per_sec)
        
        print(f"\n\n⚡ Производительность:")
        print(f"   📊 {tokens_per_sec:.1f} токенов/сек")
        print(f"   🧩 {chunk_count} чанков за {total_time:.2f}с")
        
        return full_response
    
    def get_performance_stats(self):
        """Получение статистики производительности"""
        if not self.metrics["response_times"]:
            return "Нет данных для анализа"
        
        import statistics
        
        stats = {
            "avg_response_time": statistics.mean(self.metrics["response_times"]),
            "avg_chunks": statistics.mean(self.metrics["chunk_counts"]),
            "avg_tokens_per_sec": statistics.mean(self.metrics["tokens_per_second"]),
            "total_requests": len(self.metrics["response_times"])
        }
        
        return stats

# Тестирование оптимизированного streaming
optimizer = OptimizedStreaming()

test_configs = [
    {"temperature": 0.1, "max_tokens": 500, "message": "Кратко объясни квантовые вычисления"},
    {"temperature": 0.7, "max_tokens": 800, "message": "Напиши творческую историю про роботов"},
    {"temperature": 0.3, "max_tokens": 600, "message": "Техническое объяснение блокчейн технологии"}
]

for i, config in enumerate(test_configs, 1):
    print(f"\n🧪 Тест {i}: temp={config['temperature']}, tokens={config['max_tokens']}")
    print("=" * 60)
    
    response = await optimizer.optimized_stream(
        message=config["message"],
        temperature=config["temperature"],
        max_tokens=config["max_tokens"]
    )
    print("\n" + "-" * 60)

# Финальная статистика
final_stats = optimizer.get_performance_stats()
print(f"\n📊 ОБЩАЯ СТАТИСТИКА:")
print(f"   🔢 Запросов: {final_stats['total_requests']}")
print(f"   ⏱️ Среднее время: {final_stats['avg_response_time']:.2f}с")
print(f"   🧩 Средние чанки: {final_stats['avg_chunks']:.1f}")
print(f"   ⚡ Средняя скорость: {final_stats['avg_tokens_per_sec']:.1f} токенов/сек")

## 🌐 WebSocket streaming (продвинутый)

In [None]:
import websockets
import json
from typing import Optional

class WebSocketStreaming:
    def __init__(self, host: str = "localhost", port: int = 8765):
        self.host = host
        self.port = port
        self.clients = set()
    
    async def websocket_handler(self, websocket, path):
        """Обработчик WebSocket соединений"""
        self.clients.add(websocket)
        print(f"🔌 Новое соединение: {websocket.remote_address}")
        
        try:
            async for message in websocket:
                data = json.loads(message)
                
                if data["type"] == "chat_request":
                    await self.handle_chat_request(websocket, data)
                    
        except websockets.exceptions.ConnectionClosed:
            pass
        finally:
            self.clients.remove(websocket)
            print(f"🔌 Соединение закрыто: {websocket.remote_address}")
    
    async def handle_chat_request(self, websocket, data):
        """Обработка chat запроса через WebSocket"""
        message = data.get("message", "")
        session_id = data.get("session_id", "default")
        
        # Отправка начального ответа
        await websocket.send(json.dumps({
            "type": "stream_start",
            "session_id": session_id,
            "timestamp": datetime.now().isoformat()
        }))
        
        try:
            # Streaming ответ
            async for chunk in async_streaming_chat(message):
                await websocket.send(json.dumps({
                    "type": "stream_chunk",
                    "session_id": session_id,
                    "content": chunk,
                    "timestamp": datetime.now().isoformat()
                }))
            
            # Завершение стрима
            await websocket.send(json.dumps({
                "type": "stream_end",
                "session_id": session_id,
                "timestamp": datetime.now().isoformat()
            }))
            
        except Exception as e:
            await websocket.send(json.dumps({
                "type": "error",
                "session_id": session_id,
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }))
    
    async def start_server(self):
        """Запуск WebSocket сервера"""
        print(f"🚀 WebSocket сервер запускается на ws://{self.host}:{self.port}")
        
        async with websockets.serve(self.websocket_handler, self.host, self.port):
            print("✅ WebSocket сервер активен")
            await asyncio.Future()  # Запуск навсегда

# Пример клиента для тестирования
async def websocket_client_test():
    """Тестовый WebSocket клиент"""
    uri = "ws://localhost:8765"
    
    try:
        async with websockets.connect(uri) as websocket:
            # Отправка запроса
            request = {
                "type": "chat_request",
                "message": "Расскажи про WebSocket протокол",
                "session_id": "test_session"
            }
            
            await websocket.send(json.dumps(request))
            
            # Получение ответов
            async for message in websocket:
                data = json.loads(message)
                
                if data["type"] == "stream_start":
                    print("🌊 Начало streaming...")
                elif data["type"] == "stream_chunk":
                    print(data["content"], end="", flush=True)
                elif data["type"] == "stream_end":
                    print("\n✅ Streaming завершен")
                    break
                elif data["type"] == "error":
                    print(f"\n❌ Ошибка: {data['error']}")
                    break
    
    except Exception as e:
        print(f"❌ Ошибка подключения: {e}")

print("🌐 WebSocket streaming класс готов")
print("💡 Для запуска сервера используйте:")
print("   ws_server = WebSocketStreaming()")
print("   await ws_server.start_server()")
print("💡 Для тестирования клиента:")
print("   await websocket_client_test()")

## 📊 Benchmarking и сравнение

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
from statistics import mean, median, stdev

class StreamingBenchmark:
    def __init__(self):
        self.results = []
    
    async def benchmark_streaming_methods(self, test_message: str, iterations: int = 3):
        """Сравнение производительности разных методов streaming"""
        methods = {
            "sync_basic": self.test_sync_streaming,
            "async_basic": self.test_async_streaming,
            "async_optimized": self.test_optimized_streaming
        }
        
        print(f"🧪 Benchmark: {iterations} итераций для каждого метода")
        print(f"📝 Тестовое сообщение: {test_message[:50]}...\n")
        
        for method_name, method_func in methods.items():
            print(f"🔬 Тестирование {method_name}...")
            
            method_results = []
            for i in range(iterations):
                start_time = time.time()
                response = await method_func(test_message)
                end_time = time.time()
                
                result = {
                    "method": method_name,
                    "iteration": i + 1,
                    "response_time": end_time - start_time,
                    "response_length": len(response),
                    "tokens_per_second": len(response.split()) / (end_time - start_time)
                }
                
                method_results.append(result)
                self.results.append(result)
                
                print(f"  ✅ Итерация {i+1}: {result['response_time']:.2f}с, {result['tokens_per_second']:.1f} токенов/сек")
            
            # Статистика по методу
            avg_time = mean([r["response_time"] for r in method_results])
            avg_tokens_sec = mean([r["tokens_per_second"] for r in method_results])
            
            print(f"📊 {method_name} - среднее время: {avg_time:.2f}с, скорость: {avg_tokens_sec:.1f} токенов/сек\n")
    
    async def test_sync_streaming(self, message: str) -> str:
        """Тест синхронного streaming"""
        response = ""
        for chunk in basic_streaming_chat(message):
            response += chunk
        return response
    
    async def test_async_streaming(self, message: str) -> str:
        """Тест асинхронного streaming"""
        response = ""
        async for chunk in async_streaming_chat(message):
            response += chunk
        return response
    
    async def test_optimized_streaming(self, message: str) -> str:
        """Тест оптимизированного streaming"""
        optimizer = OptimizedStreaming()
        return await optimizer.optimized_stream(message, temperature=0.3, max_tokens=600)
    
    def generate_report(self):
        """Генерация отчета по результатам"""
        if not self.results:
            print("❌ Нет данных для отчета")
            return
        
        df = pd.DataFrame(self.results)
        
        print("📊 ФИНАЛЬНЫЙ ОТЧЕТ BENCHMARK")
        print("=" * 50)
        
        # Группировка по методам
        summary = df.groupby('method').agg({
            'response_time': ['mean', 'median', 'std'],
            'tokens_per_second': ['mean', 'median', 'std'],
            'response_length': 'mean'
        }).round(3)
        
        print(summary)
        
        # Создание графиков
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # График времени ответа
        methods = df['method'].unique()
        response_times = [df[df['method'] == method]['response_time'].values for method in methods]
        
        ax1.boxplot(response_times, labels=methods)
        ax1.set_title('Время ответа по методам')
        ax1.set_ylabel('Секунды')
        ax1.tick_params(axis='x', rotation=45)
        
        # График токенов в секунду
        tokens_per_sec = [df[df['method'] == method]['tokens_per_second'].values for method in methods]
        
        ax2.boxplot(tokens_per_sec, labels=methods)
        ax2.set_title('Токенов в секунду по методам')
        ax2.set_ylabel('Токены/сек')
        ax2.tick_params(axis='x', rotation=45)
        
        plt.tight_layout()
        plt.show()
        
        # Рекомендации
        best_speed = df.loc[df['tokens_per_second'].idxmax()]
        best_time = df.loc[df['response_time'].idxmin()]
        
        print(f"\n🏆 РЕКОМЕНДАЦИИ:")
        print(f"⚡ Лучшая скорость: {best_speed['method']} ({best_speed['tokens_per_second']:.1f} токенов/сек)")
        print(f"🏃 Лучшее время: {best_time['method']} ({best_time['response_time']:.2f}с)")

# Запуск benchmark
benchmark = StreamingBenchmark()
await benchmark.benchmark_streaming_methods(
    "Объясни принципы REST API и приведи примеры использования",
    iterations=2  # Уменьшено для демонстрации
)

benchmark.generate_report()

## 🎯 Заключение и рекомендации

### ✅ Лучшие практики streaming:

1. **Используйте асинхронный код** для лучшей производительности
2. **Оптимизируйте частоту обновлений** интерфейса
3. **Логируйте метрики** для мониторинга
4. **Обрабатывайте ошибки** gracefully
5. **Тестируйте разные параметры** модели

### 🚀 Для продакшена:

- **WebSocket** для real-time приложений
- **Буферизация** для стабильности
- **Rate limiting** для защиты
- **Monitoring** для отслеживания производительности

### 📊 Метрики для отслеживания:

- Time to First Token (TTFT)
- Tokens per Second
- Total Response Time
- Error Rate
- Concurrent Connections