Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamBuilder doesn't support merged streams #23173

Closed
brilliant-ember opened this issue Oct 16, 2018 · 15 comments
Closed

StreamBuilder doesn't support merged streams #23173

brilliant-ember opened this issue Oct 16, 2018 · 15 comments

Comments

@brilliant-ember
Copy link

When trying to merge query streams the SteamBuilder only shows the second stream, my guess is that this issue has to do with Dart stream combining streams as the StreamBuilder widget seems to work correctly, nonetheless, this issue is a major one as it prevents flutter developers from combining queries an important feature for any app that uses an API.
I already tracked the issue in DartLang but I felt that I should address it here as well as maybe a flutter widget can address the issue.
dart-lang/async#70

@kyleorin
Copy link

Try https://pub.dartlang.org/packages/rxdart to find out ways about combining streams in powerful ways.

@zoechi
Copy link
Contributor

zoechi commented Oct 17, 2018

Please consider asking support questions in one of the other channels listed at http://flutter.io/support .

There is no difference in normal or merged streams. The issue has to be caused by something else.

@zoechi zoechi closed this as completed Oct 17, 2018
@zoechi
Copy link
Contributor

zoechi commented Oct 17, 2018

Dup of dart-lang/async#70

@zoechi
Copy link
Contributor

zoechi commented Oct 17, 2018

I saw that dart-lang/async#70 was also closed.
Even though I'm confident this is not a StreamBuilder issue, if you provide more information that allows to reproduce I can reopen the issue or make suggestions how to fix, ....
A minimal runnable main.dart that reproduces the issue would be ideal.

@brilliant-ember
Copy link
Author

brilliant-ember commented Oct 17, 2018

The merged streams did print the expected output to the console when accessed from outside the streambuilder , however when the StreamBuilder's snapshot ignores the first item in the stream and only updates the UI about the second item.
I attached the build function
`Widget build(BuildContext context) {
Stream streamGroup = MyQueryTool.searchResult(); // this is the merged stream which proved to contain all the proper data from all the queries, currently tested with only 2 streams combined

    return SafeArea(
      child: Scaffold(
    
        body: Padding(
          padding: _padding,
          child: Center(
            child: Column(
              children: <Widget>[
                Expanded(
                  child: StreamBuilder(stream: streamGroup, builder: (BuildContext context, AsyncSnapshot<QuerySnapshot> snapshot){
                      if(!snapshot.hasData){
                        return MyLoadingWidget();
                      }
                      int length = snapshot.data.documents.length;
                      snapshot.data.documents.forEach((f){print(f.data["name"]);});
                      //streamGroup.map((convert){ convert.documents.forEach((f){print(f.data["name"]);});});
                      //print("from the streamBuilder: "+ snapshot.data.documents[]);
                      print(length.toString()+ " doc length");
                      return ListView.builder(
                        itemCount: length,
                        itemBuilder: (_,int index){
                          final DocumentSnapshot doc = snapshot.data.documents[index];
                          return new Container(child: Text(doc.data["name"]));
                        },
                      );
                    },
  
  ),
                ),
              ],
            ),
          ),
        ),
      ),
    );
  }

@zoechi
Copy link
Contributor

zoechi commented Oct 17, 2018

Perhaps the widget is rebuilt and StreamBuilder re-subscribes when build is executed again?
If broadcastStream is used events might get lost in between.

@brilliant-ember
Copy link
Author

brilliant-ember commented Oct 17, 2018

Sample app:

import 'package:flutter/material.dart';
import 'package:cloud_firestore/cloud_firestore.dart';
import 'package:rxdart/rxdart.dart' show Observable;
import 'dart:async';

final String TAG = "StreamBuilderTest.Flutter";
String _searchQuery = "vegan yoga";

class A extends StatefulWidget{
  @override
  State createState() => Astate();
}
class Astate extends State{
@override
Widget build(BuildContext context){

  return Scaffold(body: StreamBuilder(stream: searchResult(), builder: (BuildContext context, AsyncSnapshot<QuerySnapshot> snapshot){
                      if(!snapshot.hasData){
                        return Text("loading....");
                      }
                      int length = snapshot.data.documents.length;
                      //print("from the streamBuilder: "+ snapshot.data.documents[]);
                      print(length.toString()+ " doc length");
                      return ListView.builder(
                        itemCount: length,
                        itemBuilder: (_,int index){
                          final DocumentSnapshot doc = snapshot.data.documents[index];
                          //print(doc.data["name"].toString());
                          return new Container(child: Text(doc.data["name"]));
                        },
                      );
                    },
  
  ),);
}


Stream<QuerySnapshot> searchResult() {

  List<String> queryKeys = _searchQuery.split(" "); //the length of this should not be big I should assert but I wont for now TODO to improve latency
  CollectionReference firestoreCollection = Firestore.instance.collection("activities");
    List<Stream<QuerySnapshot>> streamList = [];

  for (int i =0 ; i <queryKeys.length; i++ ){
    try{ 
      Stream<QuerySnapshot> x = firestoreCollection.where("query", arrayContains: queryKeys[i]).snapshots();
       streamList.add(x);
       print(TAG + "added a stream "+queryKeys[i] ); 
} catch(e){print(TAG+" error on query keyword search"+e.toString());
 }
    }
   var x = Observable.merge(streamList);
   //this test shows that the stream gets contents from both qury streams
  // x.map((convert){ convert.documents.forEach((f){print(f.data["name"]);});}).listen(print); 
   
  
  return x;
}
}

void main(){runApp(MaterialApp(home:A()));
}

@zoechi
Copy link
Contributor

zoechi commented Oct 17, 2018

I think what the problem here is that each stream emits an event when the response from Firebase arrives, where the data is a collection of entries.
One event example is the result from StreamA is ['a', 'b', 'c'] and one event from StreamB is ['e', 'f', 'g'].
What you seem to expect from Observable.merge() is
['a', 'b', 'c', 'e', 'f', 'g'],
but actually it emits 2 events.
First ['a', 'b', 'c'], then ['e', 'f', 'g'].

Flutter/StreamBuilder first causes the data from the first event to render, then when the 2nd event arrives the widget is rebuilt with the data from the 2nd event.

The Observable.scan operator allows you to combine data from multiple events into a single event

import 'package:flutter/material.dart';
import 'package:rxdart/rxdart.dart' show Observable;
import 'dart:async';

class A extends StatefulWidget {
  @override
  State createState() => AState();
}

class AState extends State<A> {
  @override
  Widget build(BuildContext context) {
    return Scaffold(
      body: StreamBuilder(
        stream: searchResult(),
        builder: (BuildContext context, AsyncSnapshot<List<String>> snapshot) {
          if (!snapshot.hasData) {
            return Text("loading....");
          }
          int length = snapshot.data.length;
          //print("from the streamBuilder: "+ snapshot.data.documents[]);
          print(length.toString() + " doc length");
          return ListView.builder(
            itemCount: length,
            itemBuilder: (_, int index) {
              final String doc = snapshot.data[index];
              //print(doc.data["name"].toString());
              return new Container(child: Text(doc));
            },
          );
        },
      ),
    );
  }

  Stream<List<String>> searchResult() {
    List<Stream<List<String>>> streamList = [];

    for (int i = 0; i < 3; i++) {
      try {
        streamList.add(
          Observable<List<String>>.just(
            [
              '${i}_1',
              '${i}_2',
              '${i}_3',
            ],
          ).delay(Duration(seconds: i * 2)),
        );
      } catch (e) {}
    }
    var x = Observable.merge(streamList).scan<List<String>>((acc, curr, i) {
      return acc ?? <String>[]
        ..addAll(curr);
    });
    //this test shows that the stream gets contents from both query streams
    // x.map((convert){ convert.documents.forEach((f){print(f.data["name"]);});}).listen(print);

    return x;
  }
}

void main() {
  runApp(MaterialApp(home: A()));
}

List<String> is basically your QuerySnapshot.
I used that to have a running reproduction without a dependency on Firebase.
You might need to extract the data from QuerySnapshot first before you can combine them into a collection. Don't remember all Firebase details.

@brilliant-ember
Copy link
Author

A work around that worked for me is using a listview builder , and attaching a stream listener in initState

@OverRide
void initState(){
Stream x = searchResult();
x.listen((onData){
onData.documents.forEach((doc){
print("Added a doc " + e.length.toString());
e.add(Text(
doc.data['name']
));
print(doc.data['name']);
});
});
super.initState();

}

@zoechi
Copy link
Contributor

zoechi commented Oct 17, 2018

I think it would be a good idea to move this code out of Flutter widgets.
With your workaround the data will be fetched again when the widget is rebuilt, for example if you rotate the phone.
Outside of widgets you can control the lifetime better. Redux helps here and probably also Bloc, but I didn't have a cliser look at the later.

@brilliant-ember
Copy link
Author

I settled on a work around that uses StreamZip and list manipulation with the StreamBuilder

   StreamBuilder<List<QuerySnapshot>>(stream: streamGroup, builder: (BuildContext context, 
    AsyncSnapshot<List<QuerySnapshot>> snapshotList){
                  if(!snapshotList.hasData){
                    return MyLoadingWidget();
                  }
                  // note that snapshotList.data is the actual list of querysnapshots, snapshotList alone is just an AsyncSnapshot
                  
                  int lengthOfDocs=0;
                  int querySnapShotCounter = 0;
                  snapshotList.data.forEach((snap){lengthOfDocs = lengthOfDocs + snap.documents.length;});
                  int counter = 0;
                  return ListView.builder(
                    itemCount: lengthOfDocs,
                    itemBuilder: (_,int index){
                      try{DocumentSnapshot doc = snapshotList.data[querySnapShotCounter].documents[counter];
                      counter = counter + 1 ;
                       return new Container(child: Text(doc.data["name"]));
                      }
                      catch(RangeError){
                        querySnapShotCounter = querySnapShotCounter+1;
                        counter = 0;
                        DocumentSnapshot doc = snapshotList.data[querySnapShotCounter].documents[counter];
                        counter = counter + 1 ;
                         return new Container(child: Text(doc.data["name"]));
                      }

                    },
                  );
                },

@gbaranski
Copy link

Any new solution since RxDart doesn't have Observable since 0.23.1

@kamalbanga
Copy link

@zoechi the problem with Observable.scan is that it breaks connection with firestore. Editing a document doesn't update the local copy. If that would be acceptable, one could just do 2 get calls and get documents from both sources.

@kamleshwebtech
Copy link

Kindly suggest what is the issue in the code, mentioned at https://stackoverflow.com/questions/68000556/how-to-merge-2-firestore-queries-steams-in-flutter-dart

@github-actions
Copy link

This thread has been automatically locked since there has not been any recent activity after it was closed. If you are still experiencing a similar issue, please open a new bug, including the output of flutter doctor -v and a minimal reproduction of the issue.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Jul 31, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants