-
Notifications
You must be signed in to change notification settings - Fork 1
Mistral: per-key rate limiting, retry/backoff and improved error handling for streaming requests #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mistral: per-key rate limiting, retry/backoff and improved error handling for streaming requests #65
Changes from all commits
61d16f6
3cbeac1
e89c450
d139fcc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,6 +70,7 @@ import kotlinx.serialization.modules.subclass | |
| import com.google.ai.sample.webrtc.WebRTCSender | ||
| import com.google.ai.sample.webrtc.SignalingClient | ||
| import org.webrtc.IceCandidate | ||
| import kotlin.math.max | ||
|
|
||
| class PhotoReasoningViewModel( | ||
| application: Application, | ||
|
|
@@ -182,9 +183,11 @@ class PhotoReasoningViewModel( | |
| // to avoid re-executing already-executed commands | ||
| private var incrementalCommandCount = 0 | ||
|
|
||
| // Mistral rate limiting: track last request time to enforce 1-second minimum interval | ||
| private var lastMistralRequestTimeMs = 0L | ||
| private val MISTRAL_MIN_INTERVAL_MS = 1000L | ||
| // Mistral rate limiting per API key (1.1 seconds between requests with same key) | ||
| private val mistralNextAllowedRequestAtMsByKey = mutableMapOf<String, Long>() | ||
| private var lastMistralTokenTimeMs = 0L | ||
| private var lastMistralTokenKey: String? = null | ||
| private val MISTRAL_MIN_INTERVAL_MS = 1100L | ||
|
|
||
| // Accumulated full text during streaming for incremental command parsing | ||
| private var streamingAccumulatedText = StringBuilder() | ||
|
|
@@ -1052,12 +1055,6 @@ private fun reasonWithMistral( | |
| resetStreamingCommandState() | ||
|
|
||
| viewModelScope.launch(Dispatchers.IO) { | ||
| // Rate limiting: nur die verbleibende Zeit warten | ||
| val elapsed = System.currentTimeMillis() - lastMistralRequestTimeMs | ||
| if (lastMistralRequestTimeMs > 0 && elapsed < MISTRAL_MIN_INTERVAL_MS) { | ||
| delay(MISTRAL_MIN_INTERVAL_MS - elapsed) | ||
| } | ||
|
|
||
| try { | ||
| val currentModel = com.google.ai.sample.GenerativeAiViewModelFactory.getCurrentModel() | ||
| val genSettings = com.google.ai.sample.util.GenerationSettingsPreferences.loadSettings(context, currentModel.modelName) | ||
|
|
@@ -1126,60 +1123,133 @@ private fun reasonWithMistral( | |
| .addHeader("Authorization", "Bearer $key") | ||
| .build() | ||
|
|
||
| var currentKey = initialApiKey | ||
| var response = client.newCall(buildRequest(currentKey)).execute() | ||
| lastMistralRequestTimeMs = System.currentTimeMillis() | ||
| val availableKeys = apiKeyManager.getApiKeys(ApiProvider.MISTRAL) | ||
| .filter { it.isNotBlank() } | ||
| .distinct() | ||
| if (availableKeys.isEmpty()) { | ||
| throw IOException("Mistral API key not found.") | ||
| } | ||
|
|
||
| if (response.code == 429) { | ||
| response.close() | ||
| apiKeyManager.markKeyAsFailed(currentKey, ApiProvider.MISTRAL) | ||
| val nextKey = apiKeyManager.switchToNextAvailableKey(ApiProvider.MISTRAL) | ||
| if (nextKey != null && nextKey != currentKey) { | ||
| // Anderer Key verfugbar -> sofort wechseln wie bisher | ||
| currentKey = nextKey | ||
| val elapsed2 = System.currentTimeMillis() - lastMistralRequestTimeMs | ||
| if (elapsed2 < MISTRAL_MIN_INTERVAL_MS) delay(MISTRAL_MIN_INTERVAL_MS - elapsed2) | ||
| response = client.newCall(buildRequest(currentKey)).execute() | ||
| lastMistralRequestTimeMs = System.currentTimeMillis() | ||
| } else { | ||
| // Kein anderer Key -> 5 Sekunden lang sofort wiederholen | ||
| apiKeyManager.resetFailedKeys(ApiProvider.MISTRAL) | ||
| // Validate that we have at least one key before proceeding | ||
| require(availableKeys.isNotEmpty()) { "No valid Mistral API keys available after filtering" } | ||
|
|
||
| fun markKeyCooldown(key: String, referenceTimeMs: Long) { | ||
| val nextAllowedAt = referenceTimeMs + MISTRAL_MIN_INTERVAL_MS | ||
| val existing = mistralNextAllowedRequestAtMsByKey[key] ?: 0L | ||
| mistralNextAllowedRequestAtMsByKey[key] = max(existing, nextAllowedAt) | ||
| } | ||
|
|
||
| fun remainingWaitForKeyMs(key: String, nowMs: Long): Long { | ||
| val nextAllowedAt = mistralNextAllowedRequestAtMsByKey[key] ?: 0L | ||
| return (nextAllowedAt - nowMs).coerceAtLeast(0L) | ||
| } | ||
|
|
||
| fun isRetryableMistralFailure(code: Int): Boolean { | ||
| return code == 429 || code >= 500 | ||
| } | ||
|
|
||
| var response: okhttp3.Response? = null | ||
| var selectedKeyForResponse: String? = null | ||
| var consecutiveFailures = 0 | ||
| var blockedKeysThisRound = mutableSetOf<String>() | ||
|
|
||
| val maxAttempts = availableKeys.size * 2 + 3 // Allow cycling through all keys at least twice | ||
| while (response == null && consecutiveFailures < maxAttempts) { | ||
| if (stopExecutionFlag.get()) break | ||
|
|
||
| val now = System.currentTimeMillis() | ||
| val keyPool = availableKeys.filter { it !in blockedKeysThisRound }.ifEmpty { | ||
| blockedKeysThisRound.clear() | ||
| availableKeys | ||
| } | ||
|
|
||
| val keyWithLeastWait = keyPool.minByOrNull { remainingWaitForKeyMs(it, now) } ?: availableKeys.first() | ||
| val waitMs = remainingWaitForKeyMs(keyWithLeastWait, now) | ||
| if (waitMs > 0L) { | ||
| delay(waitMs) | ||
| } | ||
|
|
||
| val selectedKey = keyWithLeastWait | ||
| selectedKeyForResponse = selectedKey | ||
|
|
||
| try { | ||
| val attemptResponse = client.newCall(buildRequest(selectedKey)).execute() | ||
| val requestEndMs = System.currentTimeMillis() | ||
| markKeyCooldown(selectedKey, requestEndMs) | ||
|
|
||
| if (attemptResponse.isSuccessful) { | ||
| response = attemptResponse | ||
| break | ||
| } | ||
|
|
||
| val isRetryable = isRetryableMistralFailure(attemptResponse.code) | ||
| if (!isRetryable) { | ||
| val errBody = attemptResponse.body?.string() | ||
| attemptResponse.close() | ||
| throw IllegalStateException("Mistral Error ${attemptResponse.code}: $errBody") | ||
| } | ||
|
|
||
| attemptResponse.close() | ||
| blockedKeysThisRound.add(selectedKey) | ||
| consecutiveFailures++ | ||
| withContext(Dispatchers.Main) { | ||
| replaceAiMessageText("Rate limit erreicht. Wiederhole...", isPending = true) | ||
| replaceAiMessageText( | ||
| "Mistral temporär nicht verfügbar (Versuch $consecutiveFailures/$maxAttempts). Wiederhole...", | ||
| isPending = true | ||
| ) | ||
| } | ||
| val retryDeadline = System.currentTimeMillis() + 5000L | ||
| var retryResponse: okhttp3.Response? = null | ||
| while (System.currentTimeMillis() < retryDeadline) { | ||
| if (stopExecutionFlag.get()) break | ||
| val retryResp = client.newCall(buildRequest(currentKey)).execute() | ||
| lastMistralRequestTimeMs = System.currentTimeMillis() | ||
| if (retryResp.code != 429) { | ||
| retryResponse = retryResp | ||
| break | ||
| } | ||
| retryResp.close() | ||
| } catch (e: IOException) { | ||
| val requestEndMs = System.currentTimeMillis() | ||
| markKeyCooldown(selectedKey, requestEndMs) | ||
| blockedKeysThisRound.add(selectedKey) | ||
| consecutiveFailures++ | ||
| if (consecutiveFailures >= 5) { | ||
| throw IOException("Mistral request failed after 5 attempts: ${e.message}", e) | ||
| } | ||
| if (retryResponse == null || stopExecutionFlag.get()) { | ||
| throw IOException("Mistral rate limit: Kein Erfolg innerhalb von 5 Sekunden.") | ||
| withContext(Dispatchers.Main) { | ||
| replaceAiMessageText( | ||
| if (consecutiveFailures >= maxAttempts) { | ||
| throw IOException("Mistral request failed after $maxAttempts attempts: ${e.message}", e) | ||
| ) | ||
| } | ||
| response = retryResponse | ||
| } | ||
|
Comment on lines
+1201
to
1215
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛑 Logic Error: After closing a failed response, the code continues to use |
||
| "Mistral Netzwerkfehler (Versuch $consecutiveFailures/$maxAttempts). Wiederhole...", | ||
|
|
||
| if (stopExecutionFlag.get()) { | ||
| throw IOException("Mistral request aborted.") | ||
| } | ||
|
|
||
| if (!response.isSuccessful) { | ||
| val errBody = response.body?.string() | ||
| response.close() | ||
| throw IOException("Mistral Error ${response.code}: $errBody") | ||
| val finalResponse = response ?: throw IOException("Mistral request failed after 5 attempts.") | ||
|
|
||
| if (!finalResponse.isSuccessful) { | ||
| val errBody = finalResponse.body?.string() | ||
| finalResponse.close() | ||
| val finalResponse = response ?: throw IOException("Mistral request failed after $maxAttempts attempts.") | ||
| } | ||
|
|
||
| val body = response.body ?: throw IOException("Empty response body from Mistral") | ||
| val body = finalResponse.body ?: throw IOException("Empty response body from Mistral") | ||
| val aiResponseText = openAiStreamParser.parse(body) { accText -> | ||
| selectedKeyForResponse?.let { key -> | ||
| lastMistralTokenKey = key | ||
| lastMistralTokenTimeMs = System.currentTimeMillis() | ||
| markKeyCooldown(key, lastMistralTokenTimeMs) | ||
| } ?: run { | ||
| Log.w(TAG, "selectedKeyForResponse is null during streaming callback") | ||
| } | ||
Android-PowerUser marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| withContext(Dispatchers.Main) { | ||
| replaceAiMessageText(accText, isPending = true) | ||
| processCommandsIncrementally(accText) | ||
| } | ||
| } | ||
| response.close() | ||
| finalResponse.close() | ||
| selectedKeyForResponse?.let { key -> | ||
| val reference = if (lastMistralTokenKey == key && lastMistralTokenTimeMs > 0L) { | ||
| lastMistralTokenTimeMs | ||
| } else { | ||
| System.currentTimeMillis() | ||
| } | ||
| markKeyCooldown(key, reference) | ||
| } | ||
|
|
||
| withContext(Dispatchers.Main) { | ||
| _uiState.value = PhotoReasoningUiState.Success(aiResponseText) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.