Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Implementation of SerializedObserver #999

Merged

Conversation

benjchristensen
Copy link
Member

Rewrite of SerializedObserver by @akarnokd discussed at #962 (comment) from this Gist: https://gist.github.com/akarnokd/9545150

JMH Benchmarks

0.17.3

Benchmark                                                          (size)   Mode   Samples         Mean   Mean error    Units
r.operators.OperatorSerializePerf.noSerializationSingleThreaded      1024   avgt         5       45.504        1.710    ns/op
r.operators.OperatorSerializePerf.noSerializationSingleThreaded   1048576   avgt         5       58.600        5.647    ns/op
r.operators.OperatorSerializePerf.serializedSingleStream             1024   avgt         5       68.610        4.596    ns/op
r.operators.OperatorSerializePerf.serializedSingleStream          1048576   avgt         5       71.313        2.318    ns/op
r.operators.OperatorSerializePerf.synchronizedSingleStream           1024   avgt         5       73.322        3.666    ns/op
r.operators.OperatorSerializePerf.synchronizedSingleStream        1048576   avgt         5       76.518        1.355    ns/op

0.17.2

Benchmark                                                          (size)   Mode   Samples         Mean   Mean error    Units
r.operators.OperatorSerializePerf.noSerializationSingleThreaded      1024   avgt         5       45.790        1.184    ns/op
r.operators.OperatorSerializePerf.noSerializationSingleThreaded   1048576   avgt         5       58.518        3.788    ns/op
r.operators.OperatorSerializePerf.serializedSingleStream             1024   avgt         5       72.665        7.851    ns/op
r.operators.OperatorSerializePerf.serializedSingleStream          1048576   avgt         5       74.788        2.946    ns/op
r.operators.OperatorSerializePerf.synchronizedSingleStream           1024   avgt         5       73.661        3.499    ns/op
r.operators.OperatorSerializePerf.synchronizedSingleStream        1048576   avgt         5       78.386        5.036    ns/op

Manual Benchmarks

/**
 * 0.17.3:
 *
 * Run: 10 - 9,746,505 ops/sec
 * Run: 11 - 9,956,019 ops/sec
 * Run: 12 - 10,053,770 ops/sec
 * Run: 13 - 10,076,958 ops/sec
 * Run: 14 - 9,983,319 ops/sec
 *
 * 0.17.2:
 *
 * Run: 10 - 9,851,999 ops/sec
 * Run: 11 - 9,726,975 ops/sec
 * Run: 12 - 9,719,762 ops/sec
 * Run: 13 - 9,668,141 ops/sec
 * Run: 14 - 9,799,700 ops/sec
 *
 * @param input
 */
public void serializedSingleStream(Input input) {
    for (int i = 0; i < reps; i++) {
        input.observable.serialize().subscribe(input.subscriber);
    }
}

#### JMH Benchmarks

0.17.3

Benchmark                                                          (size)   Mode   Samples         Mean   Mean error    Units
r.operators.OperatorSerializePerf.noSerializationSingleThreaded      1024   avgt         5       45.504        1.710    ns/op
r.operators.OperatorSerializePerf.noSerializationSingleThreaded   1048576   avgt         5       58.600        5.647    ns/op
r.operators.OperatorSerializePerf.serializedSingleStream             1024   avgt         5       68.610        4.596    ns/op
r.operators.OperatorSerializePerf.serializedSingleStream          1048576   avgt         5       71.313        2.318    ns/op
r.operators.OperatorSerializePerf.synchronizedSingleStream           1024   avgt         5       73.322        3.666    ns/op
r.operators.OperatorSerializePerf.synchronizedSingleStream        1048576   avgt         5       76.518        1.355    ns/op

0.17.2

Benchmark                                                          (size)   Mode   Samples         Mean   Mean error    Units
r.operators.OperatorSerializePerf.noSerializationSingleThreaded      1024   avgt         5       45.790        1.184    ns/op
r.operators.OperatorSerializePerf.noSerializationSingleThreaded   1048576   avgt         5       58.518        3.788    ns/op
r.operators.OperatorSerializePerf.serializedSingleStream             1024   avgt         5       72.665        7.851    ns/op
r.operators.OperatorSerializePerf.serializedSingleStream          1048576   avgt         5       74.788        2.946    ns/op
r.operators.OperatorSerializePerf.synchronizedSingleStream           1024   avgt         5       73.661        3.499    ns/op
r.operators.OperatorSerializePerf.synchronizedSingleStream        1048576   avgt         5       78.386        5.036    ns/op

#### Manual Benchmarks

/**
 * 0.17.3:
 *
 * Run: 10 - 9,746,505 ops/sec
 * Run: 11 - 9,956,019 ops/sec
 * Run: 12 - 10,053,770 ops/sec
 * Run: 13 - 10,076,958 ops/sec
 * Run: 14 - 9,983,319 ops/sec
 *
 * 0.17.2:
 *
 * Run: 10 - 9,851,999 ops/sec
 * Run: 11 - 9,726,975 ops/sec
 * Run: 12 - 9,719,762 ops/sec
 * Run: 13 - 9,668,141 ops/sec
 * Run: 14 - 9,799,700 ops/sec
 *
 * @param input
 */
public void serializedSingleStream(Input input) {
    for (int i = 0; i < reps; i++) {
        input.observable.serialize().subscribe(input.subscriber);
    }
}
Unit test showing delays. Fails when MAX_DRAIN_ITERATION set to 1, passes as currently configured.
Added a thread starvation unit test and marked as ignored for now. Doesn't pass even with MAX_DRAIN_ITERATION set to 1. Probably needs backpressure solution.
@cloudbees-pull-request-builder

RxJava-pull-requests #932 FAILURE
Looks like there's a problem with this pull request

synchronized (this) {
list = queue;
queue = null;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If list is null here, there is a window between this sync block and the finally sync block where emitting is still true and events are queued and not replayed until a subsequent event appears. A better way would be:

synchronized (this) {
list = queue;
queue = null;
if (list = null) {
emitting = false;
break;
}
}

But then the finally block should be changed to avoid setting emitting to false.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean, but the intricacies with onComplete/onError and the race for terminated then becomes quite complicated.

If I understand correctly, the finally block would not only need to not touch emitting in this case when not terminated, but it would also have to check if terminated and !emitting to reclaim the right to drain the queue, correct? Otherwise the terminal state could result in duplicate emission.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, something like that. I'll post the proposed correction within a day.

benjchristensen added a commit that referenced this pull request Apr 1, 2014
@benjchristensen benjchristensen merged commit a569f72 into ReactiveX:master Apr 1, 2014
@benjchristensen benjchristensen deleted the serialize-optimizations branch April 1, 2014 21:00
@cloudbees-pull-request-builder

RxJava-pull-requests #936 FAILURE
Looks like there's a problem with this pull request

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants