Skip to content

RxJava2 : Multiple exit criteria for gouping #6423

@DareUrDream

Description

@DareUrDream

HI, I have the below situation where I want to determine whether my group is complete or not based my takeUntil operator. But recently we got into a situation where the record being streamed by the database was delayed by few seconds (due to network) and we ended up processing incomplete group of records as the last record reached before the other needed records. So, now I am planning to implement a solution where I wish to wait for X few seconds once the takeUntil operator returns true.

Any help is much appreciated in how to achieve the same.

Flowable<List<DBRecord>> flowable = observable
					.toFlowable(BackpressureStrategy.BUFFER)
					.groupBy(callrecord -> callrecord.getSessionid())
					.flatMapSingle(group -> 
						group
							.timeout(1, TimeUnit.HOURS) // Certain cases records can come within one
							.onErrorResumeNext(Flowable.never())
							.takeUntil(cr -> cr instanceof CCDR && ((CCDR) cr).isCallCompleted())
							.toList()
					);

P.S - Is it possible to take the group members out in a list and analyze the records in a custom function to determine if the group is complete. In other words something like collectUntil/takeUntil the custom predicate accepting the list of all elements returns true.

Cheers !!
DareUrDream

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions