Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion optillm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Version information
__version__ = "0.2.6"
__version__ = "0.2.7"

# Import from server module
from .server import (
Expand Down
35 changes: 35 additions & 0 deletions optillm/plugins/proxy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ providers:
base_url: https://api.openai.com/v1
api_key: ${OPENAI_API_KEY}
weight: 2
max_concurrent: 5 # Optional: limit this provider to 5 concurrent requests
model_map:
gpt-4: gpt-4-turbo-preview # Optional: map model names

- name: backup
base_url: https://api.openai.com/v1
api_key: ${OPENAI_API_KEY_BACKUP}
weight: 1
max_concurrent: 2 # Optional: limit this provider to 2 concurrent requests

routing:
strategy: weighted # Options: weighted, round_robin, failover
Expand Down Expand Up @@ -189,6 +191,39 @@ queue:
- **Automatic Failover**: When a provider times out, it's marked unhealthy and the request automatically fails over to the next available provider.
- **Protection**: Prevents slow backends from causing queue buildup that can crash the proxy server.

### Per-Provider Concurrency Limits

Control the maximum number of concurrent requests each provider can handle:

```yaml
providers:
- name: slow_server
base_url: http://192.168.1.100:8080/v1
api_key: dummy
max_concurrent: 1 # This server can only handle 1 request at a time

- name: fast_server
base_url: https://api.fast.com/v1
api_key: ${API_KEY}
max_concurrent: 10 # This server can handle 10 concurrent requests

- name: unlimited_server
base_url: https://api.unlimited.com/v1
api_key: ${API_KEY}
# No max_concurrent means no limit for this provider
```

**Use Cases:**
- **Hardware-limited servers**: Set `max_concurrent: 1` for servers that can't handle parallel requests
- **Rate limiting**: Prevent overwhelming providers with too many concurrent requests
- **Resource management**: Balance load across providers with different capacities
- **Cost control**: Limit expensive providers while allowing more requests to cheaper ones

**Behavior:**
- If a provider is at max capacity, the proxy tries the next available provider
- Requests wait briefly (0.5s) for a slot before moving to the next provider
- Works with all routing strategies (weighted, round_robin, failover)

### Environment Variables

The configuration supports flexible environment variable interpolation:
Expand Down
49 changes: 49 additions & 0 deletions optillm/plugins/proxy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ def __init__(self, config: Dict):
self.last_error = None
self.latencies = [] # Track recent latencies

# Per-provider concurrency control
self.max_concurrent = config.get('max_concurrent', None) # None means no limit
if self.max_concurrent is not None:
self._semaphore = threading.Semaphore(self.max_concurrent)
logger.info(f"Provider {self.name} limited to {self.max_concurrent} concurrent requests")
else:
self._semaphore = None

@property
def client(self):
"""Lazy initialization of OpenAI client"""
Expand All @@ -39,6 +47,13 @@ def client(self):
api_version="2024-02-01",
max_retries=0 # Disable client retries - we handle them
)
elif 'generativelanguage.googleapis.com' in self.base_url:
# Google AI client - create custom client to avoid "models/" prefix
from optillm.plugins.proxy.google_client import GoogleAIClient
self._client = GoogleAIClient(
api_key=self.api_key,
base_url=self.base_url
)
else:
# Standard OpenAI-compatible client
self._client = OpenAI(
Expand All @@ -63,6 +78,28 @@ def avg_latency(self) -> float:
if not self.latencies:
return 0
return sum(self.latencies) / len(self.latencies)

def acquire_slot(self, timeout: Optional[float] = None) -> bool:
"""
Try to acquire a slot for this provider.
Returns True if acquired, False if timeout or no limit.
"""
if self._semaphore is None:
return True # No limit, always available

return self._semaphore.acquire(blocking=True, timeout=timeout)

def release_slot(self):
"""Release a slot for this provider."""
if self._semaphore is not None:
self._semaphore.release()

def available_slots(self) -> Optional[int]:
"""Get number of available slots, None if unlimited."""
if self._semaphore is None:
return None
# Note: _value is internal but there's no public method to check availability
return self._semaphore._value

class ProxyClient:
"""OpenAI-compatible client that proxies to multiple providers"""
Expand Down Expand Up @@ -185,6 +222,13 @@ def create(self, **kwargs):

attempted_providers.add(provider)

# Try to acquire a slot for this provider (with reasonable timeout for queueing)
slot_timeout = 10.0 # Wait up to 10 seconds for provider to become available
if not provider.acquire_slot(timeout=slot_timeout):
logger.debug(f"Provider {provider.name} at max capacity, trying next provider")
errors.append((provider.name, "At max concurrent requests"))
continue

try:
# Map model name if needed and filter out OptiLLM-specific parameters
request_kwargs = self._filter_kwargs(kwargs.copy())
Expand Down Expand Up @@ -225,6 +269,11 @@ def create(self, **kwargs):
if self.proxy_client.track_errors:
provider.is_healthy = False
provider.last_error = str(e)

finally:
# Always release the provider slot
provider.release_slot()
logger.debug(f"Released slot for provider {provider.name}")

# All providers failed, try fallback client
if self.proxy_client.fallback_client:
Expand Down
8 changes: 8 additions & 0 deletions optillm/plugins/proxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ def _apply_defaults(config: Dict) -> Dict:
provider.setdefault('weight', 1)
provider.setdefault('fallback_only', False)
provider.setdefault('model_map', {})
# Per-provider concurrency limit (None means no limit)
provider.setdefault('max_concurrent', None)

return config

Expand Down Expand Up @@ -200,6 +202,12 @@ def _validate_config(config: Dict) -> Dict:
if provider['weight'] <= 0:
logger.warning(f"Provider {provider['name']} has invalid weight {provider['weight']}, setting to 1")
provider['weight'] = 1

# Validate max_concurrent if specified
if provider.get('max_concurrent') is not None:
if not isinstance(provider['max_concurrent'], int) or provider['max_concurrent'] <= 0:
logger.warning(f"Provider {provider['name']} has invalid max_concurrent {provider['max_concurrent']}, removing limit")
provider['max_concurrent'] = None

# Validate routing strategy
valid_strategies = ['weighted', 'round_robin', 'failover']
Expand Down
92 changes: 92 additions & 0 deletions optillm/plugins/proxy/google_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
Custom Google AI client that doesn't add "models/" prefix to model names
"""
import requests
import json
from typing import Dict, List, Any


class GoogleAIClient:
"""Custom client for Google AI that bypasses OpenAI client's model name prefix behavior"""

def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url.rstrip('/')
self.chat = self.Chat(self)
self.models = self.Models(self)

class Chat:
def __init__(self, client):
self.client = client
self.completions = self.Completions(client)

class Completions:
def __init__(self, client):
self.client = client

def create(self, model: str, messages: List[Dict[str, str]], **kwargs) -> Any:
"""Create chat completion without adding models/ prefix to model name"""
url = f"{self.client.base_url}/chat/completions"

headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.client.api_key}"
}

# Build request data - use model name directly without "models/" prefix
data = {
"model": model, # Use exactly as provided - no prefix!
"messages": messages,
**kwargs
}

# Make direct HTTP request to bypass OpenAI client behavior
response = requests.post(url, headers=headers, json=data, timeout=kwargs.get('timeout', 30))

if response.status_code != 200:
error_text = response.text
raise Exception(f"HTTP {response.status_code}: {error_text}")

# Parse response and return OpenAI-compatible object
result = response.json()

# Create a simple object that has the attributes expected by the proxy
class CompletionResponse:
def __init__(self, data):
self._data = data
self.choices = data.get('choices', [])
self.usage = data.get('usage', {})
self.model = data.get('model', model)

def model_dump(self):
return self._data

def __getitem__(self, key):
return self._data[key]

def get(self, key, default=None):
return self._data.get(key, default)

return CompletionResponse(result)

class Models:
def __init__(self, client):
self.client = client

def list(self):
"""Simple models list for health checking"""
url = f"{self.client.base_url}/models"
headers = {
"Authorization": f"Bearer {self.client.api_key}"
}

try:
response = requests.get(url, headers=headers, timeout=5)
if response.status_code == 200:
return response.json()
else:
# Return a mock response if health check fails
return {"data": [{"id": "gemma-3-4b-it"}]}
except:
# Return a mock response if health check fails
return {"data": [{"id": "gemma-3-4b-it"}]}
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "optillm"
version = "0.2.6"
version = "0.2.7"
description = "An optimizing inference proxy for LLMs."
readme = "README.md"
license = "Apache-2.0"
Expand Down