Skip to content

Commit

Permalink
feat(logging): enable log rotation and set retry on full log store sy…
Browse files Browse the repository at this point in the history
…nc (#3699)

* feat(logging): enable log rotation and set retry
  • Loading branch information
NikaHsn committed Sep 12, 2023
1 parent 1c77579 commit eda0a6a
Show file tree
Hide file tree
Showing 10 changed files with 574 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DartQueuedItemStore implements QueuedItemStore, Closeable {
}

@override
FutureOr<bool> isFull(int maxSizeInMB) {
bool isFull(int maxSizeInMB) {
throw UnimplementedError('isFull() has not been implemented.');
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DartQueuedItemStore implements QueuedItemStore, Closeable {
}

@override
Future<bool> isFull(int maxSizeInMB) {
bool isFull(int maxSizeInMB) {
return _database.isFull(maxSizeInMB);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class DartQueuedItemStore
// ignore: avoid_unused_constructor_parameters
DartQueuedItemStore(String? storagePath);

late final Future<QueuedItemStore> _database = () async {
if (await IndexedDbAdapter.checkIsIndexedDBSupported()) {
late final QueuedItemStore _database = () {
if (IndexedDbAdapter.checkIsIndexedDBSupported()) {
return IndexedDbAdapter();
}
logger.warn(
Expand All @@ -34,8 +34,7 @@ class DartQueuedItemStore
String timestamp, {
bool enableQueueRotation = false,
}) async {
final db = await _database;
await db.addItem(
await _database.addItem(
string,
timestamp,
enableQueueRotation: enableQueueRotation,
Expand All @@ -44,34 +43,29 @@ class DartQueuedItemStore

@override
Future<void> deleteItems(Iterable<QueuedItem> items) async {
final db = await _database;
await db.deleteItems(items);
await _database.deleteItems(items);
}

@override
Future<Iterable<QueuedItem>> getCount(int count) async {
final db = await _database;
return db.getCount(count);
return _database.getCount(count);
}

@override
Future<Iterable<QueuedItem>> getAll() async {
final db = await _database;
return db.getAll();
return _database.getAll();
}

@override
Future<bool> isFull(int maxSizeInMB) async {
final db = await _database;
return db.isFull(maxSizeInMB);
bool isFull(int maxSizeInMB) {
return _database.isFull(maxSizeInMB);
}

/// Clear IndexedDB data.
@override
@visibleForTesting
Future<void> clear() async {
final db = await _database;
return db.clear();
return _database.clear();
}

@override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class DriftQueuedItemStore extends _$DriftQueuedItemStore
}

@override
Future<bool> isFull(int maxSizeInMB) async {
bool isFull(int maxSizeInMB) {
final maxBytes = maxSizeInMB * 1024 * 1024;
return _currentTotalByteSize >= maxBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class IndexedDbAdapter implements QueuedItemStore {
}

@override
Future<bool> isFull(int maxSizeInMB) async {
bool isFull(int maxSizeInMB) {
final maxBytes = maxSizeInMB * 1024 * 1024;
return _currentTotalByteSize >= maxBytes;
}
Expand All @@ -167,15 +167,14 @@ class IndexedDbAdapter implements QueuedItemStore {
void close() {}

/// Check that IndexDB will work on this device.
static Future<bool> checkIsIndexedDBSupported() async {
static bool checkIsIndexedDBSupported() {
if (indexedDB == null) {
return false;
}
// indexedDB will be non-null in Firefox private browsing,
// but will fail to open.
try {
final openRequest = indexedDB!.open('test', 1);
await openRequest.future;
indexedDB!.open('test', 1).result;
return true;
} on Object {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,14 @@ void main() {
await db.addItem(largeItem, DateTime.now().toIso8601String());
}

var result = await db.isFull(capacityLimit);
var result = db.isFull(capacityLimit);
expect(result, isFalse);

for (var i = 0; i < 100; i++) {
await db.addItem(largeItem, DateTime.now().toIso8601String());
}

result = await db.isFull(capacityLimit);
result = db.isFull(capacityLimit);
expect(result, isTrue);
},
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ import 'package:meta/meta.dart';
const int _maxNumberOfLogEventsInBatch = 10000;
const int _maxLogEventsBatchSize = 1048576;
const int _baseBufferSize = 26;
const int _maxLogEventsTimeSpanInBatch = Duration.millisecondsPerDay;
const int _maxLogEventSize = 256000;
final int _maxLogEventsTimeSpanInBatch =
const Duration(hours: 24).inMilliseconds;
const Duration _minusMaxLogEventTimeInFuture = Duration(hours: -2);
const Duration _baseRetryInterval = Duration(seconds: 10);

typedef _LogBatch = (List<QueuedItem> logQueues, List<InputLogEvent> logEvents);

Expand Down Expand Up @@ -113,7 +114,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
bool _enabled;
StoppableTimer? _timer;
RemoteLoggingConstraintProvider? _remoteLoggingConstraintProvider;

int _retryCount = 0;
DateTime? _retryTime;
set remoteLoggingConstraintProvider(
RemoteLoggingConstraintProvider remoteProvider,
) {
Expand All @@ -129,32 +131,89 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
Future<void> startSyncing() async {
final batchStream = _getLogBatchesToSync();
await for (final (logs, events) in batchStream) {
final response = await _sendToCloudWatch(events);
// TODO(nikahsn): handle tooOldLogEventEndIndex
// and expiredLogEventEndIndex.
if (response.rejectedLogEventsInfo?.tooNewLogEventStartIndex != null) {
// TODO(nikahsn): throw and exception to enable log rotation if the
// log store is full.
break;
_TooNewLogEventException? tooNewException;
while (logs.isNotEmpty && events.isNotEmpty) {
final rejectedLogEventsInfo =
(await _sendToCloudWatch(events)).rejectedLogEventsInfo;
if (rejectedLogEventsInfo == null) {
await _logStore.deleteItems(logs);
break;
}

final (tooOldEndIndex, tooNewStartIndex) =
rejectedLogEventsInfo.parse(events.length);

if (_isValidIndex(tooNewStartIndex, events.length)) {
tooNewException = _TooNewLogEventException(
events[tooNewStartIndex!].timestamp.toInt(),
);
// set logs to end before the index.
logs.removeRange(tooNewStartIndex, events.length);
// set events to end before the index.
events.removeRange(tooNewStartIndex, events.length);
}
if (_isValidIndex(tooOldEndIndex, events.length)) {
// remove old logs from log store.
await _logStore.deleteItems(logs.sublist(0, tooOldEndIndex! + 1));
// set logs to start after the index.
logs.removeRange(0, tooOldEndIndex + 1);
// set events to start after the index.
events.removeRange(0, tooOldEndIndex + 1);
}
}
// after sending each batch to CloudWatch check if the batch has
// `tooNewException` and throw to stop syncing next batches.
if (tooNewException != null) {
throw tooNewException;
}
await _logStore.deleteItems(logs);
}
}

if (!_syncing) {
// TODO(nikahsn): disable log rotation.
_syncing = true;
DateTime? nextRetry;
try {
await startSyncing();
} on _TooNewLogEventException catch (e) {
nextRetry =
DateTime.fromMillisecondsSinceEpoch(e.timeInMillisecondsSinceEpoch)
.add(_minusMaxLogEventTimeInFuture);
} on Exception catch (e) {
logger.error('Failed to sync logs to CloudWatch.', e);
// TODO(nikahsn): enable log rotation if the log store is full
} finally {
_handleFullLogStoreAfterSync(
retryTime: nextRetry,
);
_syncing = false;
}
}
}

void _handleFullLogStoreAfterSync({
DateTime? retryTime,
}) {
final isLogStoreFull =
_logStore.isFull(_pluginConfig.localStoreMaxSizeInMB);
if (!isLogStoreFull) {
_retryCount = 0;
_retryTime = null;
return;
}
if (retryTime != null && retryTime.isAfter(DateTime.timestamp())) {
_retryTime = retryTime;
return;
}
_retryCount += 1;
_retryTime = DateTime.timestamp().add((_baseRetryInterval * _retryCount));
}

bool _shouldSyncOnFullLogStore() {
if (_retryTime == null) {
return true;
}
return !(_retryTime!.isAfter(DateTime.timestamp()));
}

void _onTimerError(Object e) {
logger.error('Failed to sync logs to CloudWatch.', e);
}
Expand Down Expand Up @@ -225,11 +284,17 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
return;
}
final item = logEntry.toQueuedItem();
final isLogStoreFull =
_logStore.isFull(_pluginConfig.localStoreMaxSizeInMB);
final shouldEnableQueueRotation = isLogStoreFull && _retryTime != null;

await _logStore.addItem(
item.value,
item.timestamp,
enableQueueRotation: shouldEnableQueueRotation,
);
if (await _logStore.isFull(_pluginConfig.localStoreMaxSizeInMB)) {

if (isLogStoreFull && _shouldSyncOnFullLogStore()) {
await _startSyncingIfNotInProgress();
}
}
Expand All @@ -253,6 +318,8 @@ class CloudWatchLoggerPlugin extends AWSLoggerPlugin
_enabled = false;
_timer?.stop();
await _logStore.clear();
_retryCount = 0;
_retryTime = null;
}

/// Sends logs on-demand to CloudWatch.
Expand Down Expand Up @@ -285,3 +352,34 @@ extension on LogEntry {
);
}
}

extension on RejectedLogEventsInfo {
(int? pastEndIndex, int? futureStartIndex) parse(int length) {
int? pastEndIndex;
int? futureStartIndex;

if (_isValidIndex(tooOldLogEventEndIndex, length)) {
pastEndIndex = tooOldLogEventEndIndex;
}
if (_isValidIndex(expiredLogEventEndIndex, length)) {
pastEndIndex = pastEndIndex == null
? expiredLogEventEndIndex
: max(pastEndIndex, expiredLogEventEndIndex!);
}
if (_isValidIndex(tooNewLogEventStartIndex, length)) {
futureStartIndex = tooNewLogEventStartIndex;
}
return (pastEndIndex, futureStartIndex);
}
}

class _TooNewLogEventException implements Exception {
const _TooNewLogEventException(
this.timeInMillisecondsSinceEpoch,
);
final int timeInMillisecondsSinceEpoch;
}

bool _isValidIndex(int? index, int length) {
return index != null && index >= 0 && index <= length - 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ abstract interface class QueuedItemStore {
FutureOr<Iterable<QueuedItem>> getAll();

/// Whether the queue size is reached [maxSizeInMB].
FutureOr<bool> isFull(int maxSizeInMB);
bool isFull(int maxSizeInMB);

/// Clear the queue of items.
FutureOr<void> clear();
Expand Down
Loading

0 comments on commit eda0a6a

Please sign in to comment.