From 1bc16fe2802e3c375f66e7ba439c9add3ce56a2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?th=E1=BB=8Bnh?= Date: Wed, 15 Oct 2025 16:56:49 +0700 Subject: [PATCH 1/4] X- headers on requests; reducing geo updates --- app/lib/backend/http/shared.dart | 43 ++++++++++++++----- app/lib/providers/user_provider.dart | 42 ++++++++++++++++++ app/lib/services/sockets/pure_socket.dart | 11 +++-- app/lib/utils/platform/platform_manager.dart | 8 ++++ backend/routers/users.py | 24 ++++++++++- .../conversations/process_conversation.py | 4 +- backend/utils/other/timeout.py | 17 ++++++++ 7 files changed, 132 insertions(+), 17 deletions(-) diff --git a/app/lib/backend/http/shared.dart b/app/lib/backend/http/shared.dart index 87a76fc1b96..586359f31dd 100644 --- a/app/lib/backend/http/shared.dart +++ b/app/lib/backend/http/shared.dart @@ -57,12 +57,17 @@ Future makeRawApiCall({ Map headers = const {}, }) async { var request = http.Request(method, Uri.parse(url)); + final mutableHeaders = Map.from(headers); + mutableHeaders['X-Request-Start-Time'] = (DateTime.now().millisecondsSinceEpoch / 1000).toString(); + mutableHeaders['X-App-Platform'] = PlatformManager.instance.platform; + mutableHeaders['X-App-Version'] = PlatformManager.instance.appVersion; + final bool requireAuthCheck = _isRequiredAuthCheck(url); if (requireAuthCheck) { - headers['Authorization'] = await getAuthHeader(); + mutableHeaders['Authorization'] = await getAuthHeader(); // headers['Authorization'] = ''; // set admin key + uid here for testing } - request.headers.addAll(headers); + request.headers.addAll(mutableHeaders); return ApiClient._client.send(request); } @@ -73,6 +78,10 @@ Future makeApiCall({ required String method, }) async { try { + headers['X-Request-Start-Time'] = (DateTime.now().millisecondsSinceEpoch / 1000).toString(); + headers['X-App-Platform'] = PlatformManager.instance.platform; + headers['X-App-Version'] = PlatformManager.instance.appVersion; + final bool requireAuthCheck = _isRequiredAuthCheck(url); if (requireAuthCheck) { headers['Authorization'] = await getAuthHeader(); @@ -163,13 +172,17 @@ Future makeMultipartApiCall({ try { var request = http.MultipartRequest(method, Uri.parse(url)); + final mutableHeaders = Map.from(headers); + mutableHeaders['X-Request-Start-Time'] = (DateTime.now().millisecondsSinceEpoch / 1000).toString(); + mutableHeaders['X-App-Platform'] = PlatformManager.instance.platform; + mutableHeaders['X-App-Version'] = PlatformManager.instance.appVersion; + final bool requireAuthCheck = _isRequiredAuthCheck(url); if (requireAuthCheck) { - headers = Map.from(headers); - headers['Authorization'] = await getAuthHeader(); + mutableHeaders['Authorization'] = await getAuthHeader(); } - request.headers.addAll(headers); + request.headers.addAll(mutableHeaders); request.fields.addAll(fields); for (var file in files) { @@ -202,13 +215,17 @@ Stream makeStreamingApiCall({ try { var request = http.Request(method, Uri.parse(url)); + final mutableHeaders = Map.from(headers); + mutableHeaders['X-Request-Start-Time'] = (DateTime.now().millisecondsSinceEpoch / 1000).toString(); + mutableHeaders['X-App-Platform'] = PlatformManager.instance.platform; + mutableHeaders['X-App-Version'] = PlatformManager.instance.appVersion; + final bool requireAuthCheck = _isRequiredAuthCheck(url); if (requireAuthCheck) { - headers = Map.from(headers); - headers['Authorization'] = await getAuthHeader(); + mutableHeaders['Authorization'] = await getAuthHeader(); } - request.headers.addAll(headers); + request.headers.addAll(mutableHeaders); if (body.isNotEmpty) { request.headers['Content-Type'] = 'application/json'; @@ -262,13 +279,17 @@ Stream makeMultipartStreamingApiCall({ try { var request = http.MultipartRequest('POST', Uri.parse(url)); + final mutableHeaders = Map.from(headers); + mutableHeaders['X-Request-Start-Time'] = (DateTime.now().millisecondsSinceEpoch / 1000).toString(); + mutableHeaders['X-App-Platform'] = PlatformManager.instance.platform; + mutableHeaders['X-App-Version'] = PlatformManager.instance.appVersion; + final bool requireAuthCheck = _isRequiredAuthCheck(url); if (requireAuthCheck) { - headers = Map.from(headers); - headers['Authorization'] = await getAuthHeader(); + mutableHeaders['Authorization'] = await getAuthHeader(); } - request.headers.addAll(headers); + request.headers.addAll(mutableHeaders); for (var file in files) { request.files.add(await http.MultipartFile.fromPath(fileFieldName, file.path, filename: basename(file.path))); diff --git a/app/lib/providers/user_provider.dart b/app/lib/providers/user_provider.dart index c990b434be5..7aa5bd44d73 100644 --- a/app/lib/providers/user_provider.dart +++ b/app/lib/providers/user_provider.dart @@ -1,7 +1,9 @@ import 'package:awesome_notifications/awesome_notifications.dart'; import 'package:flutter/material.dart'; +import 'package:geolocator/geolocator.dart'; import 'package:omi/backend/http/api/privacy.dart'; import 'package:omi/backend/http/api/users.dart'; +import 'package:omi/backend/schema/geolocation.dart'; import 'package:omi/services/notifications.dart'; import 'package:omi/utils/logger.dart'; @@ -25,6 +27,8 @@ class UserProvider with ChangeNotifier { String _targetLevel = ''; DateTime? _startTime; + Geolocation? _lastKnownLocation; + String get dataProtectionLevel => _dataProtectionLevel; bool get isLoading => _isLoading; bool get privateCloudSyncEnabled => _privateCloudSyncEnabled; @@ -150,6 +154,44 @@ class UserProvider with ChangeNotifier { } } + Future updateUserGeolocationIfNeeded(Map data) async { + try { + final newLocation = Geolocation( + latitude: data['latitude'], + longitude: data['longitude'], + accuracy: data['accuracy'], + altitude: data['altitude'], + time: DateTime.parse(data['time']).toUtc(), + ); + + // Ensure new location has valid coordinates before proceeding. + if (newLocation.latitude == null || newLocation.longitude == null) { + Logger.log('Received location update with null coordinates, skipping.'); + return; + } + + if (_lastKnownLocation != null && _lastKnownLocation!.latitude != null && _lastKnownLocation!.longitude != null) { + // Truncate to 4 decimal places for comparison + final lastLat = double.parse(_lastKnownLocation!.latitude!.toStringAsFixed(4)); + final lastLon = double.parse(_lastKnownLocation!.longitude!.toStringAsFixed(4)); + final newLat = double.parse(newLocation.latitude!.toStringAsFixed(4)); + final newLon = double.parse(newLocation.longitude!.toStringAsFixed(4)); + + // Only update if location has changed up to 4 decimal places + if (lastLat == newLat && lastLon == newLon) { + Logger.log('User has not moved significantly (based on 4 decimal places), skipping geolocation update.'); + return; + } + } + + Logger.log('Updating user geolocation.'); + await updateUserGeolocation(geolocation: newLocation); + _lastKnownLocation = newLocation; + } catch (e, stackTrace) { + Logger.error('Failed to update user geolocation: $e\n$stackTrace'); + } + } + Future getMigrationCountFor(String targetLevel) async { if (dataProtectionLevel == targetLevel) return 0; try { diff --git a/app/lib/services/sockets/pure_socket.dart b/app/lib/services/sockets/pure_socket.dart index 1d345526208..8a2e653ddf3 100644 --- a/app/lib/services/sockets/pure_socket.dart +++ b/app/lib/services/sockets/pure_socket.dart @@ -83,11 +83,16 @@ class PureSocket implements IPureSocket { } debugPrint("request wss ${url}"); + final headers = { + 'Authorization': await getAuthHeader(), + 'X-Request-Start-Time': (DateTime.now().millisecondsSinceEpoch / 1000).toString(), + 'X-App-Platform': PlatformManager.instance.platform, + 'X-App-Version': PlatformManager.instance.appVersion, + }; + _channel = IOWebSocketChannel.connect( url, - headers: { - 'Authorization': await getAuthHeader(), - }, + headers: headers, pingInterval: const Duration(seconds: 20), connectTimeout: const Duration(seconds: 15), ); diff --git a/app/lib/utils/platform/platform_manager.dart b/app/lib/utils/platform/platform_manager.dart index 77b8d6e22ae..2231332849d 100644 --- a/app/lib/utils/platform/platform_manager.dart +++ b/app/lib/utils/platform/platform_manager.dart @@ -1,13 +1,17 @@ +import 'dart:io'; + import 'package:omi/utils/analytics/intercom.dart'; import 'package:omi/utils/analytics/mixpanel.dart'; import 'package:omi/utils/debugging/crashlytics_manager.dart'; import 'package:omi/utils/debugging/crash_reporter.dart'; import 'package:omi/utils/platform/platform_service.dart'; +import 'package:package_info_plus/package_info_plus.dart'; /// Centralized platform manager for all platform-specific services /// This provides a single point of access for all platform services class PlatformManager { static final PlatformManager _instance = PlatformManager._internal(); + late PackageInfo _packageInfo; factory PlatformManager() => _instance; PlatformManager._internal(); @@ -20,10 +24,14 @@ class PlatformManager { CrashReporter get crashReporter => CrashlyticsManager.instance; static Future initializeServices() async { + _instance._packageInfo = await PackageInfo.fromPlatform(); await MixpanelManager.init(); await IntercomManager.instance.initIntercom(); } + String get platform => Platform.operatingSystem; + String get appVersion => '${_packageInfo.version}+${_packageInfo.buildNumber}'; + bool get isAnalyticsSupported => PlatformService.isAnalyticsSupported; bool get isDebuggingSupported => PlatformService.isCrashlyticsSupported; bool get isMacOS => PlatformService.isMacOS; diff --git a/backend/routers/users.py b/backend/routers/users.py index eb7e685f6ee..52b885c8c2b 100644 --- a/backend/routers/users.py +++ b/backend/routers/users.py @@ -17,6 +17,7 @@ from database.conversations import get_in_progress_conversation, get_conversation from database.redis_db import ( cache_user_geolocation, + get_cached_user_geolocation, set_user_webhook_db, get_user_webhook_db, disable_user_webhook_db, @@ -88,7 +89,28 @@ def delete_account(uid: str = Depends(auth.get_current_user_uid)): @router.patch('/v1/users/geolocation', tags=['v1']) def set_user_geolocation(geolocation: Geolocation, uid: str = Depends(auth.get_current_user_uid)): - cache_user_geolocation(uid, geolocation.dict()) + last_location_data = get_cached_user_geolocation(uid) + if last_location_data: + try: + last_location = Geolocation(**last_location_data) + + last_lat = round(last_location.latitude, 4) + last_lon = round(last_location.longitude, 4) + new_lat = round(geolocation.latitude, 4) + new_lon = round(geolocation.longitude, 4) + + # Only update if location has changed up to 4 decimal places + if last_lat == new_lat and last_lon == new_lon: + return {'status': 'ok', 'message': 'Location not changed significantly.'} + + cache_user_geolocation(uid, geolocation.dict()) + except Exception as e: + print(f"Error processing geolocation update, caching new location anyway. Error: {e}") + cache_user_geolocation(uid, geolocation.dict()) + else: + # No previous location, so cache the new one + cache_user_geolocation(uid, geolocation.dict()) + return {'status': 'ok'} diff --git a/backend/utils/conversations/process_conversation.py b/backend/utils/conversations/process_conversation.py index 2e1334a1e28..8656194398d 100644 --- a/backend/utils/conversations/process_conversation.py +++ b/backend/utils/conversations/process_conversation.py @@ -4,7 +4,7 @@ import re import threading import uuid -from datetime import timezone +from datetime import timezone, timedelta, datetime from typing import Union, Tuple, List, Optional from fastapi import HTTPException @@ -73,7 +73,7 @@ def _get_structured( # Fetch existing action items from past 2 days for deduplication existing_action_items = None try: - two_days_ago = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=2) + two_days_ago = datetime.now(timezone.utc) - timedelta(days=2) existing_action_items = action_items_db.get_action_items(uid=uid, start_date=two_days_ago, limit=50) except Exception as e: print(f"Error fetching existing action items for deduplication: {e}") diff --git a/backend/utils/other/timeout.py b/backend/utils/other/timeout.py index a2490bd2c0f..2453751dc19 100644 --- a/backend/utils/other/timeout.py +++ b/backend/utils/other/timeout.py @@ -3,6 +3,7 @@ from fastapi import Request import asyncio import os +import time class TimeoutMiddleware(BaseHTTPMiddleware): @@ -10,6 +11,7 @@ def __init__(self, app, methods_timeout: dict = None): super().__init__(app) self.default_timeout = self._get_timeout_from_env("HTTP_DEFAULT_TIMEOUT", default=2 * 60) + self.maximum_age_seconds = self._get_timeout_from_env("HTTP_MAXIMUM_AGE_SECONDS", default=5 * 60) self.methods_timeout = self._parse_methods_timeout(methods_timeout or {}) @@ -34,6 +36,21 @@ def _parse_methods_timeout(methods_timeout: dict) -> dict: return result async def dispatch(self, request: Request, call_next): + # Check for stale request header first + request_start_header = request.headers.get("x-request-start-time") + if request_start_header: + try: + request_start_time = float(request_start_header) + current_time = time.time() + request_age = current_time - request_start_time + + if request_age > self.maximum_age_seconds: + # 408 Request Timeout is a fitting status code. + return Response(status_code=408, content="Request is too old and has been rejected.") + except (ValueError, TypeError): + # Header is malformed, proceed as normal or reject with 400 + pass + timeout = self.methods_timeout.get(request.method, self.default_timeout) try: return await asyncio.wait_for(call_next(request), timeout=timeout) From 01774b3a933f82f2795aa897d7c38eb14e8d5c25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?th=E1=BB=8Bnh?= Date: Wed, 15 Oct 2025 17:50:30 +0700 Subject: [PATCH 2/4] Better buildHeaders function --- app/lib/backend/http/shared.dart | 106 +++++++++++----------- app/lib/services/sockets/pure_socket.dart | 7 +- 2 files changed, 52 insertions(+), 61 deletions(-) diff --git a/app/lib/backend/http/shared.dart b/app/lib/backend/http/shared.dart index 586359f31dd..e1e6aab281c 100644 --- a/app/lib/backend/http/shared.dart +++ b/app/lib/backend/http/shared.dart @@ -44,6 +44,27 @@ Future getAuthHeader() async { return 'Bearer ${SharedPreferencesUtil().authToken}'; } +/// Builds common headers for API and WebSocket requests +/// Centralizes header logic for easy maintenance and consistency +/// Automatically adds Authorization header if required +Future> buildHeaders({ + required bool requireAuthCheck, + Map fromHeaders = const {}, +}) async { + final headers = { + 'X-Request-Start-Time': (DateTime.now().millisecondsSinceEpoch / 1000).toString(), + 'X-App-Platform': PlatformManager.instance.platform, + 'X-App-Version': PlatformManager.instance.appVersion, + ...fromHeaders, + }; + + if (requireAuthCheck) { + headers['Authorization'] = await getAuthHeader(); + } + + return headers; +} + bool _isRequiredAuthCheck(String url) { if (url.contains(Env.apiBaseUrl!)) { return true; @@ -57,17 +78,11 @@ Future makeRawApiCall({ Map headers = const {}, }) async { var request = http.Request(method, Uri.parse(url)); - final mutableHeaders = Map.from(headers); - mutableHeaders['X-Request-Start-Time'] = (DateTime.now().millisecondsSinceEpoch / 1000).toString(); - mutableHeaders['X-App-Platform'] = PlatformManager.instance.platform; - mutableHeaders['X-App-Version'] = PlatformManager.instance.appVersion; - - final bool requireAuthCheck = _isRequiredAuthCheck(url); - if (requireAuthCheck) { - mutableHeaders['Authorization'] = await getAuthHeader(); - // headers['Authorization'] = ''; // set admin key + uid here for testing - } - request.headers.addAll(mutableHeaders); + final builtHeaders = await buildHeaders( + requireAuthCheck: _isRequiredAuthCheck(url), + fromHeaders: headers, + ); + request.headers.addAll(builtHeaders); return ApiClient._client.send(request); } @@ -78,23 +93,22 @@ Future makeApiCall({ required String method, }) async { try { - headers['X-Request-Start-Time'] = (DateTime.now().millisecondsSinceEpoch / 1000).toString(); - headers['X-App-Platform'] = PlatformManager.instance.platform; - headers['X-App-Version'] = PlatformManager.instance.appVersion; - final bool requireAuthCheck = _isRequiredAuthCheck(url); - if (requireAuthCheck) { - headers['Authorization'] = await getAuthHeader(); - // headers['Authorization'] = ''; // set admin key + uid here for testing - } + final builtHeaders = await buildHeaders( + requireAuthCheck: requireAuthCheck, + fromHeaders: headers, + ); - http.Response? response = await _performRequest(url, headers, body, method); + http.Response? response = await _performRequest(url, builtHeaders, body, method); if (requireAuthCheck && response.statusCode == 401) { Logger.log('Token expired on 1st attempt'); SharedPreferencesUtil().authToken = await AuthService.instance.getIdToken() ?? ''; if (SharedPreferencesUtil().authToken.isNotEmpty) { - headers['Authorization'] = 'Bearer ${SharedPreferencesUtil().authToken}'; - response = await _performRequest(url, headers, body, method); + final refreshedHeaders = await buildHeaders( + requireAuthCheck: requireAuthCheck, + fromHeaders: headers, + ); + response = await _performRequest(url, refreshedHeaders, body, method); Logger.log('Token refreshed and request retried'); if (response.statusCode == 401) { // Force user to sign in again @@ -172,17 +186,11 @@ Future makeMultipartApiCall({ try { var request = http.MultipartRequest(method, Uri.parse(url)); - final mutableHeaders = Map.from(headers); - mutableHeaders['X-Request-Start-Time'] = (DateTime.now().millisecondsSinceEpoch / 1000).toString(); - mutableHeaders['X-App-Platform'] = PlatformManager.instance.platform; - mutableHeaders['X-App-Version'] = PlatformManager.instance.appVersion; - - final bool requireAuthCheck = _isRequiredAuthCheck(url); - if (requireAuthCheck) { - mutableHeaders['Authorization'] = await getAuthHeader(); - } - - request.headers.addAll(mutableHeaders); + final builtHeaders = await buildHeaders( + requireAuthCheck: _isRequiredAuthCheck(url), + fromHeaders: headers, + ); + request.headers.addAll(builtHeaders); request.fields.addAll(fields); for (var file in files) { @@ -215,17 +223,11 @@ Stream makeStreamingApiCall({ try { var request = http.Request(method, Uri.parse(url)); - final mutableHeaders = Map.from(headers); - mutableHeaders['X-Request-Start-Time'] = (DateTime.now().millisecondsSinceEpoch / 1000).toString(); - mutableHeaders['X-App-Platform'] = PlatformManager.instance.platform; - mutableHeaders['X-App-Version'] = PlatformManager.instance.appVersion; - - final bool requireAuthCheck = _isRequiredAuthCheck(url); - if (requireAuthCheck) { - mutableHeaders['Authorization'] = await getAuthHeader(); - } - - request.headers.addAll(mutableHeaders); + final builtHeaders = await buildHeaders( + requireAuthCheck: _isRequiredAuthCheck(url), + fromHeaders: headers, + ); + request.headers.addAll(builtHeaders); if (body.isNotEmpty) { request.headers['Content-Type'] = 'application/json'; @@ -279,17 +281,11 @@ Stream makeMultipartStreamingApiCall({ try { var request = http.MultipartRequest('POST', Uri.parse(url)); - final mutableHeaders = Map.from(headers); - mutableHeaders['X-Request-Start-Time'] = (DateTime.now().millisecondsSinceEpoch / 1000).toString(); - mutableHeaders['X-App-Platform'] = PlatformManager.instance.platform; - mutableHeaders['X-App-Version'] = PlatformManager.instance.appVersion; - - final bool requireAuthCheck = _isRequiredAuthCheck(url); - if (requireAuthCheck) { - mutableHeaders['Authorization'] = await getAuthHeader(); - } - - request.headers.addAll(mutableHeaders); + final builtHeaders = await buildHeaders( + requireAuthCheck: _isRequiredAuthCheck(url), + fromHeaders: headers, + ); + request.headers.addAll(builtHeaders); for (var file in files) { request.files.add(await http.MultipartFile.fromPath(fileFieldName, file.path, filename: basename(file.path))); diff --git a/app/lib/services/sockets/pure_socket.dart b/app/lib/services/sockets/pure_socket.dart index 8a2e653ddf3..62d3130c89e 100644 --- a/app/lib/services/sockets/pure_socket.dart +++ b/app/lib/services/sockets/pure_socket.dart @@ -83,12 +83,7 @@ class PureSocket implements IPureSocket { } debugPrint("request wss ${url}"); - final headers = { - 'Authorization': await getAuthHeader(), - 'X-Request-Start-Time': (DateTime.now().millisecondsSinceEpoch / 1000).toString(), - 'X-App-Platform': PlatformManager.instance.platform, - 'X-App-Version': PlatformManager.instance.appVersion, - }; + final headers = await buildHeaders(requireAuthCheck: true); _channel = IOWebSocketChannel.connect( url, From b82470009edc8daa4be89afd15b921f99bc4ef58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?th=E1=BB=8Bnh?= Date: Wed, 15 Oct 2025 18:20:24 +0700 Subject: [PATCH 3/4] refactor: remove Modal deployment configuration and add debug print statement --- backend/pusher/main.py | 21 --------------------- backend/routers/pusher.py | 1 + 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/backend/pusher/main.py b/backend/pusher/main.py index 7d963b1a099..94e813823cd 100644 --- a/backend/pusher/main.py +++ b/backend/pusher/main.py @@ -4,7 +4,6 @@ import firebase_admin from fastapi import FastAPI -from modal import Image, App, asgi_app, Secret from routers import pusher if os.environ.get('SERVICE_ACCOUNT_JSON'): @@ -17,26 +16,6 @@ app = FastAPI() app.include_router(pusher.router) -modal_app = App( - name='pusher', - secrets=[Secret.from_name("gcp-credentials"), Secret.from_name('envs')], -) -image = Image.debian_slim().apt_install('ffmpeg', 'git', 'unzip').pip_install_from_requirements('requirements.txt') - - -@modal_app.function( - image=image, - keep_warm=2, - memory=(512, 1024), - cpu=2, - allow_concurrent_inputs=10, - timeout=60 * 10, -) -@asgi_app() -def api(): - return app - - paths = ['_temp', '_samples', '_segments', '_speech_profiles'] for path in paths: if not os.path.exists(path): diff --git a/backend/routers/pusher.py b/backend/routers/pusher.py index 16a56eb6a0f..a881adcbfa0 100644 --- a/backend/routers/pusher.py +++ b/backend/routers/pusher.py @@ -152,4 +152,5 @@ async def websocket_endpoint_trigger( uid: str, sample_rate: int = 8000, ): + print("1"); await _websocket_util_trigger(websocket, uid, sample_rate) From 7ffcd3dd2e9c0967f5921e41a8d5fa1d5639c5bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?th=E1=BB=8Bnh?= Date: Wed, 15 Oct 2025 18:48:43 +0700 Subject: [PATCH 4/4] Clean up logs --- backend/routers/pusher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/routers/pusher.py b/backend/routers/pusher.py index a881adcbfa0..16a56eb6a0f 100644 --- a/backend/routers/pusher.py +++ b/backend/routers/pusher.py @@ -152,5 +152,4 @@ async def websocket_endpoint_trigger( uid: str, sample_rate: int = 8000, ): - print("1"); await _websocket_util_trigger(websocket, uid, sample_rate)