-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
Hio,
I'm using Rx ina function to perform batched batch-writes to a database we are using. I have a class which performs this functionality and uses subscribeOn with an executor pool to perform these operations.
I'm running into a situation where the below function ends up hanging and I believe it hangs due to my subscribeOn usage because when I take out subscribeOn, I'm able to reach the flat map following it, but unable to reach that code when subscribeOn is left in.
protected Observable<SherpaBatchWriteResponse> performBatchWriteOnBatchedRecords(final Observable<IDataRecord> encryptedRecordsObs) {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()));
return encryptedRecordsObs
.buffer(MAX_BATCH_SET_RECORDS)
.subscribeOn(Schedulers.executor(executorService))
.flatMap(new Func1<List<IDataRecord>, Observable<SherpaBatchWriteResponse>>() {
@Override
public Observable<SherpaBatchWriteResponse> call(List<IDataRecord> writeRecords) {
try {
SherpaBWRequestItems[] items = createSherpaBatchWriteRequestItemsForDataRecords(writeRecords);
return Observable.just(sherpaService.batchWriteRecords(items, (long) switchboardConfig.sherpaBatchGetTimeout()));
} catch (SwitchboardException e) {
return Observable.error(e);
} catch (SherpaException e) {
return Observable.error(new SwitchboardException(SwitchboardErrorCode.SHERPA_BATCH_WRITE_FAILED_TO_WRITE_SOME_RECORDS, e));
}
}
})
What also makes this situation weird is that I have two code paths that use this function, and one code path succeeds when using it while the other hangs. Granted I believe both code paths get to the point where it should be called
I have one code path that hangs that looks like
performBatchWriteOnBatchedRecords(Observable.from(Lists.newArrayList(record1,record2)))
where as a different code path succeeds that looks like
// observable that emits idataRecords and the iDataRecords are fetched in with
// observables via a network call
Observable<IDataRecord> async = getRecordsAsync();
performBatchWriteOnBatchedRecords(async)
Do you have any idea why this may be happening or any pitfalls I may be falling into?
I've created my own operator to run a specific map/flatmap function on a pool, so I've been able to overcome this, but I'm still interested on why subscribeOn was being goofy for me.
Thanks,
Blake