Skip to content

Commit

Permalink
feat(firebase_dart_plus): implement onValue for WriteBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
rbellens committed Sep 12, 2023
1 parent b109938 commit 84117e2
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 28 deletions.
62 changes: 34 additions & 28 deletions packages/firebase_dart_plus/lib/src/write_batch.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import 'package:firebase_dart/src/database/impl/view.dart';
import 'package:firebase_dart/src/database/impl/tree.dart';
import 'package:firebase_dart/src/database/impl/operations/tree.dart';
import 'package:firebase_dart/src/database/impl/firebase_impl.dart';
import 'package:firebase_dart/src/database/impl/events/value.dart';
import 'package:firebase_dart/src/implementation/isolate/database.dart';
import 'package:rxdart/rxdart.dart';

import 'package:sortedmap/sortedmap.dart';
import 'package:firebase_dart/database.dart';
Expand Down Expand Up @@ -44,7 +46,8 @@ extension FirebaseDatabaseWithWriteBatch on FirebaseDatabase {
class WriteBatch {
final DatabaseReference _rootReference;

final List<TreeOperation> _operations = [];
final BehaviorSubject<List<TreeOperation>> _onOperationAdded =
BehaviorSubject.seeded([]);

bool _committed = false;

Expand All @@ -57,7 +60,7 @@ class WriteBatch {
if (_committed) throw StateError('Batch already committed');

_committed = true;
if (_operations.isEmpty) return;
if (_onOperationAdded.value.isEmpty) return;

var updates = _getUpdates();
if (updates.isEmpty) return;
Expand Down Expand Up @@ -87,16 +90,17 @@ class WriteBatch {
}
}

_operations.clear();
await _onOperationAdded.close();
}

void _addOperation(TreeOperation operation) {
if (_committed) throw StateError('Batch already committed');
_operations.add(operation);
_onOperationAdded.add([..._onOperationAdded.value, operation]);
}

Map<String, dynamic> _getUpdates() {
var ops = SortedMap<int, TreeOperation>()..addAll(_operations.asMap());
var ops = SortedMap<int, TreeOperation>()
..addAll(_onOperationAdded.value.asMap());
var cache = ViewCache(IncompleteData.empty(), IncompleteData.empty(), ops)
..recalcLocalVersion();

Expand Down Expand Up @@ -142,31 +146,33 @@ class TransactionalQuery extends Query {

@override
Stream<Event> on(String eventType) {
throw UnimplementedError();
}

@override
Future<dynamic> get() async {
var ops = SortedMap<int, TreeOperation>()
..addAll(_transaction._operations.asMap());
var cache = ViewCache(IncompleteData.empty(), IncompleteData.empty(), ops)
..recalcLocalVersion();

var path =
reference().url.path.substring(reference().root().url.path.length);
var p = Name.parsePath(path);

if (!cache.localVersion.isCompleteForPath(p)) {
var v = await _query.get();
var serverVersion = IncompleteData.empty().applyOperation(
TreeOperation.overwrite(p, TreeStructuredData.fromJson(v)));
cache = cache.updateServerVersion(serverVersion);
if (eventType != 'value') {
throw UnimplementedError();
}
return CombineLatestStream.combine2(
_query.onValue, _transaction._onOperationAdded, (v, operations) {
var ops = SortedMap<int, TreeOperation>()..addAll(operations.asMap());
var cache = ViewCache(IncompleteData.empty(), IncompleteData.empty(), ops)
..recalcLocalVersion();

var path =
reference().url.path.substring(reference().root().url.path.length);
var p = Name.parsePath(path);

if (!cache.localVersion.isCompleteForPath(p)) {
var serverVersion = IncompleteData.empty().applyOperation(
TreeOperation.overwrite(
p, TreeStructuredData.fromJson(v.snapshot.value)));
cache = cache.updateServerVersion(serverVersion);
}

var v =
cache.localVersion.child(p).completeValue!.withFilter(_query.filter);

return v.toJson();
return cache.localVersion
.child(p)
.completeValue!
.withFilter(_query.filter);
}).distinct().map((v) {
return Event(DataSnapshotImpl(reference(), v), null);
});
}

@override
Expand Down
1 change: 1 addition & 0 deletions packages/firebase_dart_plus/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ environment:
dependencies:
firebase_dart: ^1.1.0-dev.11
sortedmap: ^0.5.3
rxdart: ^0.27.7

dev_dependencies:
lints: ^2.0.0
Expand Down
54 changes: 54 additions & 0 deletions packages/firebase_dart_plus/test/firebase_dart_plus_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,59 @@ void main() {
expect(() => batch.reference().child('test').set('test'),
throwsA(isA<StateError>()));
});

test('onValue should get updates from server and batch', () async {
await db.reference().child('test').set({'hello': 'world'});

var batch = db.batch();

dynamic w;
var s = batch.reference().child('test').onValue.listen((v) {
w = v.snapshot.value;
});

await Future.delayed(Duration(milliseconds: 10));
expect(w, {'hello': 'world'});

await batch.reference().child('test').child('message').set('hello');
await Future.delayed(Duration(milliseconds: 10));
expect(w, {'hello': 'world', 'message': 'hello'});

await db.reference().child('test').child('hello').set('hello');
await Future.delayed(Duration(milliseconds: 10));
expect(w, {'hello': 'hello', 'message': 'hello'});

await s.cancel();
});

test('onValue should not be called multiple times with same value',
() async {
await db.reference().child('test').set({'hello': 'world'});

var batch = db.batch();

dynamic w;
int count = 0;
var s = batch.reference().child('test').onValue.listen((v) {
w = v.snapshot.value;
count++;
});

await Future.delayed(Duration(milliseconds: 10));
expect(w, {'hello': 'world'});
expect(count, 1);

await batch.reference().child('test').child('message').set('hello');
await Future.delayed(Duration(milliseconds: 10));
expect(w, {'hello': 'world', 'message': 'hello'});
expect(count, 2);

await db.reference().child('test').child('message').set('hello');
await Future.delayed(Duration(milliseconds: 10));
expect(w, {'hello': 'world', 'message': 'hello'});
expect(count, 2);

await s.cancel();
});
});
}

0 comments on commit 84117e2

Please sign in to comment.