diff --git a/optillm/__init__.py b/optillm/__init__.py index cc9aea13..0c622287 100644 --- a/optillm/__init__.py +++ b/optillm/__init__.py @@ -1,5 +1,5 @@ # Version information -__version__ = "0.2.6" +__version__ = "0.2.7" # Import from server module from .server import ( diff --git a/optillm/plugins/proxy/README.md b/optillm/plugins/proxy/README.md index 2e88b8af..87c6b262 100644 --- a/optillm/plugins/proxy/README.md +++ b/optillm/plugins/proxy/README.md @@ -33,6 +33,7 @@ providers: base_url: https://api.openai.com/v1 api_key: ${OPENAI_API_KEY} weight: 2 + max_concurrent: 5 # Optional: limit this provider to 5 concurrent requests model_map: gpt-4: gpt-4-turbo-preview # Optional: map model names @@ -40,6 +41,7 @@ providers: base_url: https://api.openai.com/v1 api_key: ${OPENAI_API_KEY_BACKUP} weight: 1 + max_concurrent: 2 # Optional: limit this provider to 2 concurrent requests routing: strategy: weighted # Options: weighted, round_robin, failover @@ -189,6 +191,39 @@ queue: - **Automatic Failover**: When a provider times out, it's marked unhealthy and the request automatically fails over to the next available provider. - **Protection**: Prevents slow backends from causing queue buildup that can crash the proxy server. +### Per-Provider Concurrency Limits + +Control the maximum number of concurrent requests each provider can handle: + +```yaml +providers: + - name: slow_server + base_url: http://192.168.1.100:8080/v1 + api_key: dummy + max_concurrent: 1 # This server can only handle 1 request at a time + + - name: fast_server + base_url: https://api.fast.com/v1 + api_key: ${API_KEY} + max_concurrent: 10 # This server can handle 10 concurrent requests + + - name: unlimited_server + base_url: https://api.unlimited.com/v1 + api_key: ${API_KEY} + # No max_concurrent means no limit for this provider +``` + +**Use Cases:** +- **Hardware-limited servers**: Set `max_concurrent: 1` for servers that can't handle parallel requests +- **Rate limiting**: Prevent overwhelming providers with too many concurrent requests +- **Resource management**: Balance load across providers with different capacities +- **Cost control**: Limit expensive providers while allowing more requests to cheaper ones + +**Behavior:** +- If a provider is at max capacity, the proxy tries the next available provider +- Requests wait briefly (0.5s) for a slot before moving to the next provider +- Works with all routing strategies (weighted, round_robin, failover) + ### Environment Variables The configuration supports flexible environment variable interpolation: diff --git a/optillm/plugins/proxy/client.py b/optillm/plugins/proxy/client.py index 1b85972b..696ef41a 100644 --- a/optillm/plugins/proxy/client.py +++ b/optillm/plugins/proxy/client.py @@ -27,6 +27,14 @@ def __init__(self, config: Dict): self.last_error = None self.latencies = [] # Track recent latencies + # Per-provider concurrency control + self.max_concurrent = config.get('max_concurrent', None) # None means no limit + if self.max_concurrent is not None: + self._semaphore = threading.Semaphore(self.max_concurrent) + logger.info(f"Provider {self.name} limited to {self.max_concurrent} concurrent requests") + else: + self._semaphore = None + @property def client(self): """Lazy initialization of OpenAI client""" @@ -39,6 +47,13 @@ def client(self): api_version="2024-02-01", max_retries=0 # Disable client retries - we handle them ) + elif 'generativelanguage.googleapis.com' in self.base_url: + # Google AI client - create custom client to avoid "models/" prefix + from optillm.plugins.proxy.google_client import GoogleAIClient + self._client = GoogleAIClient( + api_key=self.api_key, + base_url=self.base_url + ) else: # Standard OpenAI-compatible client self._client = OpenAI( @@ -63,6 +78,28 @@ def avg_latency(self) -> float: if not self.latencies: return 0 return sum(self.latencies) / len(self.latencies) + + def acquire_slot(self, timeout: Optional[float] = None) -> bool: + """ + Try to acquire a slot for this provider. + Returns True if acquired, False if timeout or no limit. + """ + if self._semaphore is None: + return True # No limit, always available + + return self._semaphore.acquire(blocking=True, timeout=timeout) + + def release_slot(self): + """Release a slot for this provider.""" + if self._semaphore is not None: + self._semaphore.release() + + def available_slots(self) -> Optional[int]: + """Get number of available slots, None if unlimited.""" + if self._semaphore is None: + return None + # Note: _value is internal but there's no public method to check availability + return self._semaphore._value class ProxyClient: """OpenAI-compatible client that proxies to multiple providers""" @@ -185,6 +222,13 @@ def create(self, **kwargs): attempted_providers.add(provider) + # Try to acquire a slot for this provider (with reasonable timeout for queueing) + slot_timeout = 10.0 # Wait up to 10 seconds for provider to become available + if not provider.acquire_slot(timeout=slot_timeout): + logger.debug(f"Provider {provider.name} at max capacity, trying next provider") + errors.append((provider.name, "At max concurrent requests")) + continue + try: # Map model name if needed and filter out OptiLLM-specific parameters request_kwargs = self._filter_kwargs(kwargs.copy()) @@ -225,6 +269,11 @@ def create(self, **kwargs): if self.proxy_client.track_errors: provider.is_healthy = False provider.last_error = str(e) + + finally: + # Always release the provider slot + provider.release_slot() + logger.debug(f"Released slot for provider {provider.name}") # All providers failed, try fallback client if self.proxy_client.fallback_client: diff --git a/optillm/plugins/proxy/config.py b/optillm/plugins/proxy/config.py index e5f0c684..a827a164 100644 --- a/optillm/plugins/proxy/config.py +++ b/optillm/plugins/proxy/config.py @@ -172,6 +172,8 @@ def _apply_defaults(config: Dict) -> Dict: provider.setdefault('weight', 1) provider.setdefault('fallback_only', False) provider.setdefault('model_map', {}) + # Per-provider concurrency limit (None means no limit) + provider.setdefault('max_concurrent', None) return config @@ -200,6 +202,12 @@ def _validate_config(config: Dict) -> Dict: if provider['weight'] <= 0: logger.warning(f"Provider {provider['name']} has invalid weight {provider['weight']}, setting to 1") provider['weight'] = 1 + + # Validate max_concurrent if specified + if provider.get('max_concurrent') is not None: + if not isinstance(provider['max_concurrent'], int) or provider['max_concurrent'] <= 0: + logger.warning(f"Provider {provider['name']} has invalid max_concurrent {provider['max_concurrent']}, removing limit") + provider['max_concurrent'] = None # Validate routing strategy valid_strategies = ['weighted', 'round_robin', 'failover'] diff --git a/optillm/plugins/proxy/google_client.py b/optillm/plugins/proxy/google_client.py new file mode 100644 index 00000000..73378b6e --- /dev/null +++ b/optillm/plugins/proxy/google_client.py @@ -0,0 +1,92 @@ +""" +Custom Google AI client that doesn't add "models/" prefix to model names +""" +import requests +import json +from typing import Dict, List, Any + + +class GoogleAIClient: + """Custom client for Google AI that bypasses OpenAI client's model name prefix behavior""" + + def __init__(self, api_key: str, base_url: str): + self.api_key = api_key + self.base_url = base_url.rstrip('/') + self.chat = self.Chat(self) + self.models = self.Models(self) + + class Chat: + def __init__(self, client): + self.client = client + self.completions = self.Completions(client) + + class Completions: + def __init__(self, client): + self.client = client + + def create(self, model: str, messages: List[Dict[str, str]], **kwargs) -> Any: + """Create chat completion without adding models/ prefix to model name""" + url = f"{self.client.base_url}/chat/completions" + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.client.api_key}" + } + + # Build request data - use model name directly without "models/" prefix + data = { + "model": model, # Use exactly as provided - no prefix! + "messages": messages, + **kwargs + } + + # Make direct HTTP request to bypass OpenAI client behavior + response = requests.post(url, headers=headers, json=data, timeout=kwargs.get('timeout', 30)) + + if response.status_code != 200: + error_text = response.text + raise Exception(f"HTTP {response.status_code}: {error_text}") + + # Parse response and return OpenAI-compatible object + result = response.json() + + # Create a simple object that has the attributes expected by the proxy + class CompletionResponse: + def __init__(self, data): + self._data = data + self.choices = data.get('choices', []) + self.usage = data.get('usage', {}) + self.model = data.get('model', model) + + def model_dump(self): + return self._data + + def __getitem__(self, key): + return self._data[key] + + def get(self, key, default=None): + return self._data.get(key, default) + + return CompletionResponse(result) + + class Models: + def __init__(self, client): + self.client = client + + def list(self): + """Simple models list for health checking""" + url = f"{self.client.base_url}/models" + headers = { + "Authorization": f"Bearer {self.client.api_key}" + } + + try: + response = requests.get(url, headers=headers, timeout=5) + if response.status_code == 200: + return response.json() + else: + # Return a mock response if health check fails + return {"data": [{"id": "gemma-3-4b-it"}]} + except: + # Return a mock response if health check fails + return {"data": [{"id": "gemma-3-4b-it"}]} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 4bce6bb9..ed3ca978 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "optillm" -version = "0.2.6" +version = "0.2.7" description = "An optimizing inference proxy for LLMs." readme = "README.md" license = "Apache-2.0"