diff --git a/app/lib/backend/schema/conversation.dart b/app/lib/backend/schema/conversation.dart index 522d8f96145..2c9f0cc1c31 100644 --- a/app/lib/backend/schema/conversation.dart +++ b/app/lib/backend/schema/conversation.dart @@ -40,6 +40,7 @@ enum ConversationSource { apple_watch, phone, desktop, + limitless, } class ConversationExternalData { diff --git a/app/lib/pages/home/widgets/sync_bottom_sheet.dart b/app/lib/pages/home/widgets/sync_bottom_sheet.dart index 4283ab4a5ec..8f792ae4976 100644 --- a/app/lib/pages/home/widgets/sync_bottom_sheet.dart +++ b/app/lib/pages/home/widgets/sync_bottom_sheet.dart @@ -3,7 +3,6 @@ import 'package:omi/backend/schema/bt_device/bt_device.dart'; import 'package:omi/gen/assets.gen.dart'; import 'package:omi/providers/device_provider.dart'; import 'package:omi/providers/sync_provider.dart'; -import 'package:omi/services/services.dart'; import 'package:omi/services/wals.dart'; import 'package:provider/provider.dart'; @@ -34,25 +33,13 @@ class SyncBottomSheet extends StatelessWidget { final progress = syncProvider.walsSyncedProgress; final hasPendingData = pendingFlashPages.isNotEmpty; - // Check for orphaned files from previous failed syncs - final flashPageSync = ServiceManager.instance().wal.getSyncs().flashPage; - final hasOrphanedFiles = flashPageSync.hasOrphanedFiles; - final orphanedCount = flashPageSync.orphanedFilesCount; - final isUploadingOrphans = flashPageSync.isUploadingOrphans; + final isSyncingFromPendant = syncProvider.isSyncingFromPendant; + final isUploadingToCloud = syncProvider.isUploadingToCloud; - // Calculate time ago for pending data - String timeAgo = ''; - if (hasPendingData && pendingFlashPages.isNotEmpty) { - final oldestWal = pendingFlashPages.reduce((a, b) => a.timerStart < b.timerStart ? a : b); - final minutesAgo = ((DateTime.now().millisecondsSinceEpoch ~/ 1000) - oldestWal.timerStart) ~/ 60; - if (minutesAgo < 60) { - timeAgo = '$minutesAgo minutes ago'; - } else if (minutesAgo < 1440) { - timeAgo = '${minutesAgo ~/ 60} hours ago'; - } else { - timeAgo = '${minutesAgo ~/ 1440} days ago'; - } - } + // Consider sync in progress if EITHER pendant sync OR cloud upload is happening + final isAnySyncInProgress = isSyncing || isSyncingFromPendant || isUploadingToCloud; + final hasOrphanedFiles = syncProvider.hasOrphanedFiles; + final orphanedCount = syncProvider.orphanedFilesCount; return Container( decoration: const BoxDecoration( @@ -97,7 +84,9 @@ class SyncBottomSheet extends StatelessWidget { ], ), child: Icon( - isSyncing ? Icons.sync_rounded : (hasPendingData ? Icons.graphic_eq_rounded : Icons.check_rounded), + isAnySyncInProgress + ? Icons.sync_rounded + : (hasPendingData ? Icons.graphic_eq_rounded : Icons.check_rounded), color: Colors.white, size: 36, ), @@ -106,7 +95,7 @@ class SyncBottomSheet extends StatelessWidget { // Title Text( - isSyncing ? 'Catching Up' : (hasPendingData ? 'Recordings Available' : 'All Synced'), + isAnySyncInProgress ? 'Syncing recordings' : (hasPendingData ? 'Recordings to sync' : 'All caught up'), style: const TextStyle( color: Colors.white, fontSize: 24, @@ -116,47 +105,29 @@ class SyncBottomSheet extends StatelessWidget { const SizedBox(height: 12), // Description - if (isSyncing) ...[ - RichText( + if (isAnySyncInProgress) ...[ + Text( + 'We\'ll keep syncing your recordings in the background.', textAlign: TextAlign.center, - text: TextSpan( - style: TextStyle( - color: Colors.grey.shade400, - fontSize: 16, - height: 1.4, - ), - children: [ - const TextSpan(text: 'Processing audio and generating summaries from '), - TextSpan( - text: timeAgo.isNotEmpty ? timeAgo : 'earlier', - style: const TextStyle(color: Colors.deepPurpleAccent), - ), - const TextSpan(text: '.'), - ], + style: TextStyle( + color: Colors.grey.shade400, + fontSize: 16, + height: 1.4, ), ), ] else if (hasPendingData) ...[ - RichText( + Text( + 'You have recordings that aren\'t synced yet.', textAlign: TextAlign.center, - text: TextSpan( - style: TextStyle( - color: Colors.grey.shade400, - fontSize: 16, - height: 1.4, - ), - children: [ - const TextSpan(text: 'You have '), - TextSpan( - text: _formatDuration(pendingFlashPages.fold(0, (sum, w) => sum + w.seconds)), - style: const TextStyle(color: Colors.deepPurpleAccent), - ), - const TextSpan(text: ' of offline recordings to sync.'), - ], + style: TextStyle( + color: Colors.grey.shade400, + fontSize: 16, + height: 1.4, ), ), ] else ...[ Text( - 'Your pendant is fully synced with the cloud.', + 'Everything is already synced.', textAlign: TextAlign.center, style: TextStyle( color: Colors.grey.shade400, @@ -169,9 +140,11 @@ class SyncBottomSheet extends StatelessWidget { const SizedBox(height: 8), // Explanation text - if ((isSyncing || hasPendingData) && isLimitless) ...[ + if ((isAnySyncInProgress || hasPendingData) && isLimitless) ...[ Text( - 'This happens when your pendant is away from your phone for an extended period, the app is closed, or if Bluetooth is turned off on your phone.', + isAnySyncInProgress + ? 'We\'re catching up on earlier recordings. New moments are still being saved and will appear once sync finishes.' + : 'This usually happens when your pendant and phone were apart or Bluetooth was off.', textAlign: TextAlign.center, style: TextStyle( color: Colors.grey.shade600, @@ -217,7 +190,7 @@ class SyncBottomSheet extends StatelessWidget { crossAxisAlignment: CrossAxisAlignment.start, children: [ Text( - isSyncing + isAnySyncInProgress ? 'Syncing in progress' : (hasPendingData ? 'Ready to sync' : 'Pendant is up to date'), style: const TextStyle( @@ -228,11 +201,10 @@ class SyncBottomSheet extends StatelessWidget { ), const SizedBox(height: 2), Text( - isSyncing - ? _getSyncStatusText(progress) - : (hasPendingData - ? '${_formatDuration(pendingFlashPages.fold(0, (sum, w) => sum + w.seconds))} waiting' - : 'All audio has been sent to phone'), + isAnySyncInProgress + ? _getSyncStatusText( + progress, isSyncingFromPendant, isUploadingToCloud, hasOrphanedFiles, orphanedCount) + : (hasPendingData ? 'Tap Sync to start' : 'All recordings are synced'), style: TextStyle( color: Colors.grey.shade500, fontSize: 13, @@ -242,7 +214,7 @@ class SyncBottomSheet extends StatelessWidget { ), ), // Status indicator or button - if (isSyncing) ...[ + if (isAnySyncInProgress) ...[ const SizedBox( width: 28, height: 28, @@ -287,68 +259,6 @@ class SyncBottomSheet extends StatelessWidget { ), ), - // Orphaned files card - files saved to phone but not yet uploaded - if ((hasOrphanedFiles || isUploadingOrphans) && !isSyncing) ...[ - const SizedBox(height: 16), - Container( - padding: const EdgeInsets.all(14), - decoration: BoxDecoration( - color: Colors.blue.withOpacity(0.1), - borderRadius: BorderRadius.circular(12), - border: Border.all(color: Colors.blue.withOpacity(0.3)), - ), - child: Row( - children: [ - Icon(Icons.phone_android_rounded, color: Colors.blue.shade400, size: 22), - const SizedBox(width: 12), - Expanded( - child: Column( - crossAxisAlignment: CrossAxisAlignment.start, - children: [ - Text( - isUploadingOrphans - ? 'Uploading to cloud...' - : '$orphanedCount file${orphanedCount > 1 ? 's' : ''} saved on phone', - style: TextStyle(color: Colors.blue.shade300, fontSize: 14, fontWeight: FontWeight.w600), - ), - const SizedBox(height: 2), - Text( - isUploadingOrphans ? 'Processing saved recordings' : 'Ready to upload to cloud', - style: TextStyle(color: Colors.blue.shade400.withOpacity(0.7), fontSize: 12), - ), - ], - ), - ), - if (isUploadingOrphans) - const SizedBox( - width: 24, - height: 24, - child: CircularProgressIndicator( - strokeWidth: 2, - color: Colors.blue, - ), - ) - else - ElevatedButton( - onPressed: () { - flashPageSync.uploadOrphanedFiles(); - }, - style: ElevatedButton.styleFrom( - backgroundColor: Colors.blue.shade700, - foregroundColor: Colors.white, - padding: const EdgeInsets.symmetric(horizontal: 14, vertical: 8), - shape: RoundedRectangleBorder( - borderRadius: BorderRadius.circular(16), - ), - elevation: 0, - ), - child: const Text('Upload', style: TextStyle(fontSize: 13, fontWeight: FontWeight.w600)), - ), - ], - ), - ), - ], - // Not connected warning if (!isConnected && isLimitless) ...[ const SizedBox(height: 16), @@ -380,19 +290,19 @@ class SyncBottomSheet extends StatelessWidget { ); } - String _formatDuration(int seconds) { - if (seconds < 60) return '${seconds}s'; - if (seconds < 3600) return '${(seconds / 60).round()} min'; - final hours = seconds ~/ 3600; - final mins = (seconds % 3600) ~/ 60; - return '${hours}h ${mins}m'; - } - - String _getSyncStatusText(double progress) { - if (progress <= 0.4) { - return 'Syncing from device...'; + String _getSyncStatusText( + double progress, bool isSyncingFromPendant, bool isUploadingToCloud, bool hasOrphanedFiles, int orphanedCount) { + if (isSyncingFromPendant) { + return 'Downloading your recordings…'; + } else if (isUploadingToCloud) { + if (hasOrphanedFiles && orphanedCount > 0) { + return 'Uploading to cloud… ($orphanedCount file${orphanedCount > 1 ? 's' : ''} remaining)'; + } + return 'Uploading to cloud…'; + } else if (progress <= 0.4) { + return 'Downloading your recordings…'; } else { - return 'Processing audio...'; + return 'Processing your audio…'; } } } diff --git a/app/lib/providers/sync_provider.dart b/app/lib/providers/sync_provider.dart index 0992766f431..8882082b75c 100644 --- a/app/lib/providers/sync_provider.dart +++ b/app/lib/providers/sync_provider.dart @@ -54,6 +54,14 @@ class SyncProvider extends ChangeNotifier implements IWalServiceListener, IWalSy String? get syncError => _syncState.errorMessage; Wal? get failedWal => _syncState.failedWal; + // Flash page (Limitless) sync states - distinct phases + // isSyncingFromPendant: true when receiving data from pendant (pendant → phone) + // isUploadingToCloud: true when uploading files to cloud (phone → cloud) + bool get isSyncingFromPendant => _walService.getSyncs().flashPage.isSyncing; + bool get isUploadingToCloud => _walService.getSyncs().flashPage.isUploading; + bool get hasOrphanedFiles => _walService.getSyncs().flashPage.hasOrphanedFiles; + int get orphanedFilesCount => _walService.getSyncs().flashPage.orphanedFilesCount; + // Audio playback delegates String? get currentPlayingWalId => _audioPlayerUtils.currentPlayingId; bool get isProcessingAudio => _audioPlayerUtils.isProcessingAudio; diff --git a/app/lib/services/devices/limitless_connection.dart b/app/lib/services/devices/limitless_connection.dart index d84b14f747c..acf7416e964 100644 --- a/app/lib/services/devices/limitless_connection.dart +++ b/app/lib/services/devices/limitless_connection.dart @@ -16,6 +16,12 @@ class LimitlessDeviceConnection extends DeviceConnection { final _rawDataBuffer = []; int? _firstFlashPageTimestampMs; + // Fragment reassembly: index -> {seq -> payload} + final Map>> _fragmentBuffer = {}; + + // Completed flash pages + final List> _completedFlashPages = []; + StreamSubscription? _rxSubscription; bool _isInitialized = false; bool _isBatchMode = false; @@ -78,20 +84,380 @@ class LimitlessDeviceConnection extends DeviceConnection { void _handleNotification(List data) { if (data.isEmpty) return; - if (data.length > 2 && data[0] == 0x08) { - final indexResult = _decodeVarint(data, 1); - final packetIndex = indexResult[0] as int; - if (packetIndex > _highestReceivedIndex) { - _highestReceivedIndex = packetIndex; + _tryParseButtonStatus(data); + _tryParseDeviceStatus(data); + + // Parse BLE packet to get fragmentation info + final packet = _parseBlePacket(data); + if (packet == null) { + if (_isBatchMode) { + debugPrint( + 'Limitless: Batch mode - packet parse failed, data=${data.length}b, first bytes: ${data.take(10).toList()}'); } + _rawDataBuffer.addAll(data); + return; } - // Accumulate data for frame extraction - _rawDataBuffer.addAll(data); + final index = packet['index'] as int; + final seq = packet['seq'] as int; + final numFrags = packet['num_frags'] as int; + final payload = packet['payload'] as List; - _tryParseButtonStatus(data); + // Track highest received index for acknowledgment + if (index > _highestReceivedIndex) { + _highestReceivedIndex = index; + } - _tryParseDeviceStatus(data); + if (_isBatchMode) { + _fragmentBuffer.putIfAbsent(index, () => {}); + _fragmentBuffer[index]![seq] = payload; + + if (_fragmentBuffer[index]!.length == numFrags) { + final completePayload = []; + for (int i = 0; i < numFrags; i++) { + final fragment = _fragmentBuffer[index]![i]; + if (fragment != null) { + completePayload.addAll(fragment); + } + } + + _fragmentBuffer.remove(index); + + _handlePendantMessage(completePayload); + } + } else { + _rawDataBuffer.addAll(data); + } + } + + void _handlePendantMessage(List payload) { + try { + int pos = 0; + List foundFields = []; + + while (pos < payload.length) { + final tag = payload[pos]; + final fieldNum = tag >> 3; + final wireType = tag & 0x07; + pos++; + + foundFields.add(fieldNum); + + if (wireType == 2) { + // Length-delimited field + final lengthResult = _decodeVarint(payload, pos); + final length = lengthResult[0] as int; + pos = lengthResult[1] as int; + + final fieldData = payload.sublist(pos, pos + length); + pos += length; + + if (fieldNum == 2) { + _handleStorageBuffer(fieldData); + } + } else if (wireType == 0) { + final result = _decodeVarint(payload, pos); + pos = result[1] as int; + } else { + // Unknown wire type, skip byte + pos++; + } + } + } catch (e) { + debugPrint('Limitless: Error handling pendant message: $e'); + } + } + + void _handleStorageBuffer(List storageData) { + try { + int pos = 0; + int? session; + int? seq; + int? index; + List? flashPageData; + + while (pos < storageData.length) { + final tag = storageData[pos]; + final fieldNum = tag >> 3; + final wireType = tag & 0x07; + pos++; + + if (wireType == 0) { + // Varint + final result = _decodeVarint(storageData, pos); + final value = result[0] as int; + pos = result[1] as int; + + if (fieldNum == 2) { + session = value; + } else if (fieldNum == 4) { + seq = value; + } else if (fieldNum == 5) { + index = value; + } + } else if (wireType == 2) { + // Length-delimited + final lengthResult = _decodeVarint(storageData, pos); + final length = lengthResult[0] as int; + pos = lengthResult[1] as int; + + if (fieldNum == 6) { + flashPageData = storageData.sublist(pos, pos + length); + } + pos += length; + } else { + pos++; + } + } + + if (flashPageData != null && flashPageData.isNotEmpty) { + final pageInfo = _parseFlashPageInfo(flashPageData); + + final opusFrames = _extractOpusFramesFromFlashPage(flashPageData); + + if (opusFrames.isNotEmpty) { + final flashPage = { + 'opus_frames': opusFrames, + 'timestamp_ms': pageInfo['timestamp_ms'] ?? DateTime.now().millisecondsSinceEpoch, + 'session': session, + 'seq': seq, + 'index': index, + 'did_start_session': pageInfo['did_start_session'] ?? false, + 'did_stop_session': pageInfo['did_stop_session'] ?? false, + 'did_start_recording': pageInfo['did_start_recording'] ?? false, + 'did_stop_recording': pageInfo['did_stop_recording'] ?? false, + }; + + _completedFlashPages.add(flashPage); + + // Update first flash page timestamp if not set + if (_firstFlashPageTimestampMs == null) { + final timestamp = pageInfo['timestamp_ms'] as int?; + if (timestamp != null && timestamp > 1577836800000) { + _firstFlashPageTimestampMs = timestamp; + } + } + + _flashPageController.add(flashPage); + } + } + } catch (e) { + debugPrint('Limitless: Error handling storage buffer: $e'); + } + } + + Map _parseFlashPageInfo(List flashPageData) { + Map result = { + 'timestamp_ms': 0, + 'did_start_session': false, + 'did_stop_session': false, + 'did_start_recording': false, + 'did_stop_recording': false, + }; + + try { + int pos = 0; + + // Field 1 (0x08) = timestamp_ms + if (pos < flashPageData.length && flashPageData[pos] == 0x08) { + pos++; + final timestampResult = _decodeVarint(flashPageData, pos); + result['timestamp_ms'] = timestampResult[0] as int; + pos = timestampResult[1] as int; + } + + while (pos < flashPageData.length - 2) { + // Audio wrapper (0x1a) + if (flashPageData[pos] == 0x1a) { + pos++; + final chunkLengthResult = _decodeVarint(flashPageData, pos); + final chunkLength = chunkLengthResult[0] as int; + pos = chunkLengthResult[1] as int; + + final chunkEnd = pos + chunkLength; + + while (pos < chunkEnd - 1) { + final marker = flashPageData[pos]; + + // Storage status (0x62) + if (marker == 0x62) { + pos++; + final statusLengthResult = _decodeVarint(flashPageData, pos); + final statusLength = statusLengthResult[0] as int; + pos = statusLengthResult[1] as int; + + final statusEnd = pos + statusLength; + while (pos < statusEnd) { + final statusMarker = flashPageData[pos]; + pos++; + + if (statusMarker == 0x08 && pos < statusEnd) { + result['did_start_session'] = flashPageData[pos] != 0; + pos++; + } else if (statusMarker == 0x10 && pos < statusEnd) { + result['did_stop_session'] = flashPageData[pos] != 0; + pos++; + } + } + continue; + } + + // Audio data (0x12) + if (marker == 0x12) { + pos++; + final audioLengthResult = _decodeVarint(flashPageData, pos); + final audioLength = audioLengthResult[0] as int; + pos = audioLengthResult[1] as int; + + final audioEnd = pos + audioLength; + while (pos < audioEnd - 1) { + final audioMarker = flashPageData[pos]; + pos++; + + if (audioMarker == 0x40 && pos < audioEnd) { + result['did_start_recording'] = flashPageData[pos] != 0; + pos++; + } else if (audioMarker == 0x48 && pos < audioEnd) { + result['did_stop_recording'] = flashPageData[pos] != 0; + pos++; + } + } + pos = audioEnd; + continue; + } + + pos++; + } + pos = chunkEnd; + } else { + pos++; + } + } + } catch (e) { + // Silently ignore parsing errors + } + + return result; + } + + List> _extractOpusFramesFromFlashPage(List flashPageData) { + final frames = >[]; + + try { + int pos = 0; + + // Skip timestamp (0x08) if present + if (pos < flashPageData.length && flashPageData[pos] == 0x08) { + pos++; + final result = _decodeVarint(flashPageData, pos); + pos = result[1] as int; + } + + // Skip 0x10 if present + if (pos < flashPageData.length && flashPageData[pos] == 0x10) { + pos++; + final result = _decodeVarint(flashPageData, pos); + pos = result[1] as int; + } + + // Process audio wrappers (0x1a) + while (pos < flashPageData.length - 2) { + if (flashPageData[pos] == 0x1a) { + pos++; + final wrapperLengthResult = _decodeVarint(flashPageData, pos); + final wrapperLength = wrapperLengthResult[0] as int; + pos = wrapperLengthResult[1] as int; + + final wrapperEnd = pos + wrapperLength; + if (wrapperEnd > flashPageData.length) break; + + while (pos < wrapperEnd - 1) { + final marker = flashPageData[pos]; + + // Offset (0x08) - skip + if (marker == 0x08) { + pos++; + final result = _decodeVarint(flashPageData, pos); + pos = result[1] as int; + continue; + } + + // Audio data (0x12) containing Opus packets + if (marker == 0x12) { + pos++; + final audioLengthResult = _decodeVarint(flashPageData, pos); + final audioLength = audioLengthResult[0] as int; + pos = audioLengthResult[1] as int; + + final audioEnd = pos + audioLength; + if (audioEnd > flashPageData.length) { + pos = wrapperEnd; + break; + } + + _extractOpusRecursive(flashPageData, pos, audioEnd, frames); + pos = audioEnd; + continue; + } + + // Skip other wire types + final wireType = marker & 0x07; + pos++; + if (wireType == 0) { + final result = _decodeVarint(flashPageData, pos); + pos = result[1] as int; + } else if (wireType == 2) { + final lengthResult = _decodeVarint(flashPageData, pos); + pos = lengthResult[1] as int; + pos += lengthResult[0] as int; + } + } + + pos = wrapperEnd; + } else { + pos++; + } + } + } catch (e) { + debugPrint('Limitless: Error extracting Opus frames from flash page: $e'); + } + + return frames; + } + + void _extractOpusRecursive(List data, int start, int end, List> frames) { + int pos = start; + + while (pos < end - 1) { + final tag = data[pos]; + final wireType = tag & 0x07; + pos++; + + if (wireType == 2) { + // Length-delimited + final lengthResult = _decodeVarint(data, pos); + final length = lengthResult[0] as int; + pos = lengthResult[1] as int; + + if (length > 0 && pos + length <= end) { + final fieldData = data.sublist(pos, pos + length); + + if (length >= 10 && length <= 200 && fieldData.isNotEmpty && _isValidOpusToc(fieldData[0])) { + frames.add(fieldData); + } else if (length > 10) { + _extractOpusRecursive(data, pos, pos + length, frames); + } + } + pos += length; + } else if (wireType == 0) { + // Varint + final result = _decodeVarint(data, pos); + pos = result[1] as int; + } else { + // Unknown wire type + break; + } + } } /// Observed pattern in BLE data: @@ -186,6 +552,64 @@ class LimitlessDeviceConnection extends DeviceConnection { return [result, pos]; } + /// Parse BLE packet to extract index, sequence, number of fragments, and payload + Map? _parseBlePacket(List data) { + try { + int pos = 0; + int? index; + int seq = 0; + int? numFrags; + List? payload; + + while (pos < data.length) { + final tag = data[pos]; + final fieldNum = tag >> 3; + final wireType = tag & 0x07; + pos++; + + if (wireType == 0) { + // Varint + final result = _decodeVarint(data, pos); + final value = result[0] as int; + pos = result[1] as int; + + if (fieldNum == 1) { + index = value; + } else if (fieldNum == 2) { + seq = value; + } else if (fieldNum == 3) { + numFrags = value; + } + } else if (wireType == 2) { + // Length-delimited + final lengthResult = _decodeVarint(data, pos); + final length = lengthResult[0] as int; + pos = lengthResult[1] as int; + + if (fieldNum == 4) { + payload = data.sublist(pos, pos + length); + } + pos += length; + } else { + // Unknown wire type, skip + break; + } + } + + if (index != null && numFrags != null && payload != null) { + return { + 'index': index, + 'seq': seq, + 'num_frags': numFrags, + 'payload': payload, + }; + } + } catch (e) { + debugPrint('Limitless: Error parsing BLE wrapper: $e'); + } + return null; + } + List _encodeField(int fieldNum, int wireType, List value) { final tag = (fieldNum << 3) | wireType; return [..._encodeVarint(tag), ...value]; @@ -338,8 +762,10 @@ class LimitlessDeviceConnection extends DeviceConnection { if (!_isInitialized) return; try { - // Clear buffer before switching modes to prevent cross-contamination + // Clear all buffers before switching modes to prevent cross-contamination _rawDataBuffer.clear(); + _fragmentBuffer.clear(); + _completedFlashPages.clear(); _isBatchMode = true; final cmd = _encodeDownloadFlashPages(batchMode: true, realTime: false); await transport.writeCharacteristic(limitlessServiceUuid, limitlessTxCharUuid, cmd); @@ -354,8 +780,10 @@ class LimitlessDeviceConnection extends DeviceConnection { if (!_isInitialized) return; try { - // Clear buffer before switching modes to prevent batch data from being processed as real-time + // Clear all buffers before switching modes to prevent batch data from being processed as real-time _rawDataBuffer.clear(); + _fragmentBuffer.clear(); + _completedFlashPages.clear(); _firstFlashPageTimestampMs = null; // Send command to switch back to real-time mode @@ -415,6 +843,8 @@ class LimitlessDeviceConnection extends DeviceConnection { void clearBuffer() { _rawDataBuffer.clear(); _firstFlashPageTimestampMs = null; + _fragmentBuffer.clear(); + _completedFlashPages.clear(); } int? getFirstFlashPageTimestampMs() => _firstFlashPageTimestampMs; @@ -427,12 +857,64 @@ class LimitlessDeviceConnection extends DeviceConnection { /// Returns map with: opus_frames, timestamp_ms, did_start_session, did_stop_session, etc. /// This combines frame extraction with session marker parsing Map? extractFramesWithSessionInfo() { + if (_isBatchMode) { + if (_completedFlashPages.isEmpty) return null; + + final allFrames = >[]; + int? timestampMs; + bool didStartSession = false; + bool didStopSession = false; + bool didStartRecording = false; + bool didStopRecording = false; + int? maxIndex; + + for (final page in _completedFlashPages) { + final frames = page['opus_frames'] as List>?; + if (frames != null) { + allFrames.addAll(frames); + } + + // Use timestamp from first page + if (timestampMs == null) { + final ts = page['timestamp_ms'] as int?; + if (ts != null && ts > 1577836800000) { + timestampMs = ts; + } + } + + // Track the highest index for ACK + final pageIndex = page['index'] as int?; + if (pageIndex != null && (maxIndex == null || pageIndex > maxIndex)) { + maxIndex = pageIndex; + } + + // Aggregate session markers + if (page['did_start_session'] == true) didStartSession = true; + if (page['did_stop_session'] == true) didStopSession = true; + if (page['did_start_recording'] == true) didStartRecording = true; + if (page['did_stop_recording'] == true) didStopRecording = true; + } + + // Clear processed flash pages + _completedFlashPages.clear(); + + if (allFrames.isEmpty) return null; + + return { + 'opus_frames': allFrames, + 'timestamp_ms': timestampMs ?? _firstFlashPageTimestampMs ?? DateTime.now().millisecondsSinceEpoch, + 'did_start_session': didStartSession, + 'did_stop_session': didStopSession, + 'did_start_recording': didStartRecording, + 'did_stop_recording': didStopRecording, + 'max_index': maxIndex, + }; + } + if (_rawDataBuffer.isEmpty) return null; - // First try to parse session markers from the buffer before extraction final sessionInfo = parseStorageBuffer(_rawDataBuffer); - // Now extract opus frames (this also modifies the buffer) final frames = extractFramesFromBuffer(); if (frames.isEmpty) return null; diff --git a/app/lib/services/wals.dart b/app/lib/services/wals.dart index 1ad1ac76131..e5c838edb33 100644 --- a/app/lib/services/wals.dart +++ b/app/lib/services/wals.dart @@ -661,6 +661,8 @@ class SDCardWalSync implements IWalSync { class FlashPageWalSync implements IWalSync { static const int pagesPerChunk = 25; static const String _pendingFilesKey = 'flash_page_pending_uploads'; + static const Duration _uploadTimerInterval = Duration(seconds: 5); + static const Duration _persistBatchDuration = Duration(seconds: 90); List _wals = const []; BtDevice? _device; @@ -672,9 +674,17 @@ class FlashPageWalSync implements IWalSync { int _newestPage = 0; int _currentSession = 0; - // Track if we're currently uploading orphaned files - bool _isUploadingOrphans = false; - bool get isUploadingOrphans => _isUploadingOrphans; + // Sync state tracking - distinct phases + // isSyncing: true when receiving data from pendant (pendant → phone) + // isUploading: true when uploading files to cloud (phone → cloud) + bool _isSyncing = false; + bool get isSyncing => _isSyncing; + + bool _isUploading = false; + bool get isUploading => _isUploading; + + // Upload timer for background uploads during sync + Timer? _uploadTimer; IWalSyncListener listener; @@ -707,11 +717,12 @@ class FlashPageWalSync implements IWalSync { } /// Check for and upload any orphaned files from previous failed syncs + /// This runs automatically on app start in the background Future uploadOrphanedFiles() async { var resp = SyncLocalFilesResponse(newConversationIds: [], updatedConversationIds: []); - if (_isUploadingOrphans) { - debugPrint("FlashPageSync: Already uploading orphaned files, skipping"); + if (_isUploading) { + debugPrint("FlashPageSync: Already uploading, skipping orphan upload"); return resp; } @@ -720,7 +731,9 @@ class FlashPageWalSync implements IWalSync { return resp; } - _isUploadingOrphans = true; + _isUploading = true; + listener.onWalUpdated(); + debugPrint("FlashPageSync: Uploading ${pendingFiles.length} orphaned files in background"); // Sort by timestamp for chronological upload order final sortedFiles = List.from(pendingFiles); @@ -730,29 +743,52 @@ class FlashPageWalSync implements IWalSync { return tsA.compareTo(tsB); }); - for (final filePath in sortedFiles) { - try { + // Upload in batches of 2 files + const batchSize = 2; + for (int i = 0; i < sortedFiles.length; i += batchSize) { + final batchPaths = sortedFiles.skip(i).take(batchSize).toList(); + final batchFiles = []; + final validPaths = []; + + // Collect valid files for this batch + for (final filePath in batchPaths) { final file = File(filePath); if (await file.exists()) { - final partialResp = await syncLocalFiles([file]); - - resp.newConversationIds - .addAll(partialResp.newConversationIds.where((id) => !resp.newConversationIds.contains(id))); - resp.updatedConversationIds.addAll(partialResp.updatedConversationIds - .where((id) => !resp.updatedConversationIds.contains(id) && !resp.newConversationIds.contains(id))); - - await file.delete(); - _removePendingFile(filePath); + batchFiles.add(file); + validPaths.add(filePath); } else { // File doesn't exist, remove from pending _removePendingFile(filePath); } + } + + if (batchFiles.isEmpty) continue; + + try { + debugPrint("FlashPageSync: Uploading batch of ${batchFiles.length} orphaned files"); + final partialResp = await syncLocalFiles(batchFiles); + + resp.newConversationIds + .addAll(partialResp.newConversationIds.where((id) => !resp.newConversationIds.contains(id))); + resp.updatedConversationIds.addAll(partialResp.updatedConversationIds + .where((id) => !resp.updatedConversationIds.contains(id) && !resp.newConversationIds.contains(id))); + + // Delete files and remove from pending list after successful upload + for (final filePath in validPaths) { + try { + await File(filePath).delete(); + _removePendingFile(filePath); + } catch (e) { + debugPrint("FlashPageSync: Failed to delete file $filePath: $e"); + } + } } catch (e) { - debugPrint("FlashPageSync: Failed to upload orphaned file $filePath: $e"); + debugPrint("FlashPageSync: Failed to upload batch: $e"); + // Files stay in pending list for next upload cycle } } - _isUploadingOrphans = false; + _isUploading = false; listener.onWalUpdated(); return resp; } @@ -882,6 +918,7 @@ class FlashPageWalSync implements IWalSync { Future stop() async { _wals = []; await _pageStream?.cancel(); + _stopUploadTimer(); } @override @@ -938,6 +975,106 @@ class FlashPageWalSync implements IWalSync { return resp; } + // Guard to prevent concurrent upload calls from timer + bool _uploadInProgress = false; + + /// Upload pending files in background - called by timer + Future _uploadPendingFiles() async { + var resp = SyncLocalFilesResponse(newConversationIds: [], updatedConversationIds: []); + + // Prevent concurrent uploads from timer + if (_uploadInProgress) { + return resp; + } + + final pendingFiles = _getPendingFiles(); + if (pendingFiles.isEmpty) { + return resp; + } + + _uploadInProgress = true; + + // Set isUploading state for UI + if (!_isUploading) { + _isUploading = true; + listener.onWalUpdated(); + } + debugPrint("FlashPageSync: Uploading ${pendingFiles.length} pending files"); + + // Sort by timestamp for chronological upload order + final sortedFiles = List.from(pendingFiles); + sortedFiles.sort((a, b) { + final tsA = _extractTimestampFromFilename(a); + final tsB = _extractTimestampFromFilename(b); + return tsA.compareTo(tsB); + }); + + // Upload in batches of 2 files + const batchSize = 2; + for (int i = 0; i < sortedFiles.length; i += batchSize) { + final batchPaths = sortedFiles.skip(i).take(batchSize).toList(); + final batchFiles = []; + final validPaths = []; + + // Collect valid files for this batch + for (final filePath in batchPaths) { + final file = File(filePath); + if (await file.exists()) { + batchFiles.add(file); + validPaths.add(filePath); + } else { + // File doesn't exist, remove from pending + _removePendingFile(filePath); + } + } + + if (batchFiles.isEmpty) continue; + + try { + debugPrint("FlashPageSync: Uploading batch of ${batchFiles.length} pending files"); + final partialResp = await syncLocalFiles(batchFiles); + + resp.newConversationIds + .addAll(partialResp.newConversationIds.where((id) => !resp.newConversationIds.contains(id))); + resp.updatedConversationIds.addAll(partialResp.updatedConversationIds + .where((id) => !resp.updatedConversationIds.contains(id) && !resp.newConversationIds.contains(id))); + + // Delete files and remove from pending list after successful upload + for (final filePath in validPaths) { + try { + await File(filePath).delete(); + _removePendingFile(filePath); + } catch (e) { + debugPrint("FlashPageSync: Failed to delete file $filePath: $e"); + } + } + } catch (e) { + debugPrint("FlashPageSync: Failed to upload batch: $e"); + // Files stay in pending list for next upload cycle + } + } + + _uploadInProgress = false; + listener.onWalUpdated(); + return resp; + } + + /// Start the upload timer + void _startUploadTimer() { + _uploadTimer?.cancel(); + _uploadTimer = Timer.periodic(_uploadTimerInterval, (timer) async { + await _uploadPendingFiles(); + }); + debugPrint("FlashPageSync: Upload timer started (${_uploadTimerInterval.inSeconds}s interval)"); + } + + /// Stop the upload timer + void _stopUploadTimer() { + _uploadTimer?.cancel(); + _uploadTimer = null; + debugPrint("FlashPageSync: Upload timer stopped"); + } + Future _syncWal(Wal wal, IWalSyncProgressListener? progress) async { if (_device == null) return null; @@ -951,22 +1088,28 @@ class FlashPageWalSync implements IWalSync { return null; } - final dynamic limitlessConnection = connection; + final limitlessConnection = connection as LimitlessDeviceConnection; limitlessConnection.clearBuffer(); await limitlessConnection.enableBatchMode(); debugPrint("FlashPageSync: Batch mode enabled"); + // Mark as syncing (pendant → phone) + _isSyncing = true; + listener.onWalUpdated(); + + // Start upload timer - uploads pending files every 10s while we receive data + _startUploadTimer(); + int totalPages = wal.storageTotalBytes - wal.storageOffset + 1; int emptyExtractions = 0; const maxEmptyExtractions = 60; // 30 seconds of no data = done - int pagesProcessed = 0; // Track pages for incremental ACK + int? lastProcessedIndex; // Track actual flash page index for ACK - // PHASE 1: Save all data from device to local files - const maxBatchDuration = Duration(seconds: 30); + // Accumulate frames and persist to disk List> accumulatedFrames = []; - List savedFiles = []; int? batchMinTimestamp; DateTime lastSaveTime = DateTime.now(); + int filesSaved = 0; bool syncComplete = false; final startTime = DateTime.now(); @@ -987,15 +1130,20 @@ class FlashPageWalSync implements IWalSync { if (pageData != null) { emptyExtractions = 0; - pagesProcessed++; final opusFrames = pageData['opus_frames'] as List>? ?? []; final timestampMs = pageData['timestamp_ms'] as int? ?? DateTime.now().millisecondsSinceEpoch; + final maxIndex = pageData['max_index'] as int?; final didStartSession = (pageData['did_start_session'] as bool? ?? false) || (pageData['did_start_recording'] as bool? ?? false); final didStopSession = (pageData['did_stop_session'] as bool? ?? false) || (pageData['did_stop_recording'] as bool? ?? false); + // Track the highest flash page index we've processed for ACK + if (maxIndex != null && (lastProcessedIndex == null || maxIndex > lastProcessedIndex)) { + lastProcessedIndex = maxIndex; + } + if (opusFrames.isNotEmpty) { // If new timestamp is >2 min different, save current batch first // This prevents mixing recordings from different sessions (e.g., yesterday + today) @@ -1032,14 +1180,14 @@ class FlashPageWalSync implements IWalSync { emptyExtractions++; } - // If 30 seconds elapsed - save batch + // If 60 seconds elapsed - save batch to disk if (!shouldSave && accumulatedFrames.isNotEmpty) { - if (DateTime.now().difference(lastSaveTime) >= maxBatchDuration) { + if (DateTime.now().difference(lastSaveTime) >= _persistBatchDuration) { shouldSave = true; } } - // Save batch to local file + // Save batch to local file (upload timer will pick it up) if (shouldSave && accumulatedFrames.isNotEmpty) { final filePath = await _saveBatchToFile( accumulatedFrames, @@ -1047,18 +1195,19 @@ class FlashPageWalSync implements IWalSync { ); if (filePath != null) { - savedFiles.add(filePath); + filesSaved++; debugPrint( - "FlashPageSync: Saved batch #${savedFiles.length} to phone (${accumulatedFrames.length} frames, ts=$batchMinTimestamp)"); - progress?.onWalSyncedProgress((savedFiles.length / (totalPages / 50)).clamp(0.0, 0.4)); + "FlashPageSync: Saved batch #$filesSaved to disk (${accumulatedFrames.length} frames, ts=$batchMinTimestamp)"); + progress?.onWalSyncedProgress((filesSaved / (totalPages / 50)).clamp(0.0, 0.8)); // Send incremental ACK - device can clear pages up to this point - try { - final ackIndex = wal.storageOffset + pagesProcessed; - await limitlessConnection.acknowledgeProcessedData(ackIndex); - debugPrint("FlashPageSync: Incremental ACK sent for page $ackIndex"); - } catch (e) { - debugPrint("FlashPageSync: Incremental ACK failed: $e"); + if (lastProcessedIndex != null) { + try { + await limitlessConnection.acknowledgeProcessedData(lastProcessedIndex); + debugPrint("FlashPageSync: Incremental ACK sent for page $lastProcessedIndex"); + } catch (e) { + debugPrint("FlashPageSync: Incremental ACK failed: $e"); + } } } @@ -1088,55 +1237,69 @@ class FlashPageWalSync implements IWalSync { batchMinTimestamp ?? DateTime.now().millisecondsSinceEpoch, ); if (filePath != null) { - savedFiles.add(filePath); + filesSaved++; + debugPrint("FlashPageSync: Saved final batch #$filesSaved to disk"); - // Send final ACK for remaining pages - try { - await limitlessConnection.acknowledgeProcessedData(wal.storageTotalBytes); - } catch (e) { - debugPrint("FlashPageSync: Final ACK failed: $e"); + // Send final ACK for remaining pages - use actual highest index processed + if (lastProcessedIndex != null) { + try { + await limitlessConnection.acknowledgeProcessedData(lastProcessedIndex); + debugPrint("FlashPageSync: Final ACK sent for page $lastProcessedIndex"); + } catch (e) { + debugPrint("FlashPageSync: Final ACK failed: $e"); + } } } } - await limitlessConnection.enableRealTimeMode(); + // Syncing from pendant complete + _isSyncing = false; + listener.onWalUpdated(); - // Sort files by timestamp before upload - savedFiles.sort((a, b) { - final tsA = _extractTimestampFromFilename(a); - final tsB = _extractTimestampFromFilename(b); - return tsA.compareTo(tsB); - }); + // Send one final ACK if we haven't already (in case last batch wasn't saved) + if (lastProcessedIndex != null) { + try { + await limitlessConnection.acknowledgeProcessedData(lastProcessedIndex); + debugPrint("FlashPageSync: Sent final ACK for index $lastProcessedIndex before switching to real-time"); + } catch (e) { + debugPrint("FlashPageSync: Final cleanup ACK failed: $e"); + } + } - // Upload saved files to backend + // Switch back to real-time mode + await limitlessConnection.enableRealTimeMode(); - for (int i = 0; i < savedFiles.length; i++) { - final filePath = savedFiles[i]; - try { - final file = File(filePath); - if (await file.exists()) { - final partialResp = await syncLocalFiles([file]); + // Upload all remaining files synchronously to ensure they're all processed + _isUploading = true; + listener.onWalUpdated(); - resp.newConversationIds - .addAll(partialResp.newConversationIds.where((id) => !resp.newConversationIds.contains(id))); - resp.updatedConversationIds.addAll(partialResp.updatedConversationIds - .where((id) => !resp.updatedConversationIds.contains(id) && !resp.newConversationIds.contains(id))); + // Keep uploading until no files are left + while (_getPendingFiles().isNotEmpty) { + debugPrint("FlashPageSync: Uploading remaining files (${_getPendingFiles().length} left)"); + final uploadResp = await _uploadPendingFiles(); + resp.newConversationIds + .addAll(uploadResp.newConversationIds.where((id) => !resp.newConversationIds.contains(id))); + resp.updatedConversationIds.addAll(uploadResp.updatedConversationIds + .where((id) => !resp.updatedConversationIds.contains(id) && !resp.newConversationIds.contains(id))); - await file.delete(); - _removePendingFile(filePath); - } - } catch (e) { - debugPrint("FlashPageSync: Failed to upload file $filePath: $e"); - // File stays in pending list for recovery on next app start + // Small delay to avoid tight loop if uploads fail + if (_getPendingFiles().isNotEmpty) { + await Future.delayed(const Duration(milliseconds: 500)); } - - progress?.onWalSyncedProgress(0.4 + (0.6 * (i + 1) / savedFiles.length)); } - debugPrint("FlashPageSync: Completed. ${savedFiles.length} files synced"); + _stopUploadTimer(); + _isUploading = false; + listener.onWalUpdated(); + + debugPrint("FlashPageSync: Completed. $filesSaved files saved, uploads processed"); progress?.onWalSyncedProgress(1.0); } catch (e) { debugPrint("FlashPageSync: Error: $e"); + _isSyncing = false; + _isUploading = false; + _stopUploadTimer(); + listener.onWalUpdated(); try { await _enableRealTimeMode(deviceId); } catch (_) {}