Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: remove remaining field updaters #3979

Merged
merged 1 commit into from
Jun 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/rx/internal/operators/BackpressureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ private BackpressureUtils() {
* @param n
* the number of requests to add to the requested count
* @return requested value just prior to successful addition
* @deprecated Android has issues with reflection-based atomics
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Android/Samsung/ 😀

Let's not lump all of Android in with the likes of the insane OS developers at Samsung who modify Java system packages needlessly without rhyme or reason.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's one thing those devices can't find fields but how about reflection in general. Desktop Java does well with inlining and intrinsification of AtomicXFieldUpdaters; I'm not sure about Android in general through.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm not sure. I would imagine the new runtime on Android 5.0+ could do something about it. I'll have to investigate it more.

*/
@Deprecated
public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
// add n to field but check for overflow
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ static final class CompletableConcatSubscriber

volatile boolean done;

volatile int once;
static final AtomicIntegerFieldUpdater<CompletableConcatSubscriber> ONCE =
AtomicIntegerFieldUpdater.newUpdater(CompletableConcatSubscriber.class, "once");
final AtomicBoolean once;

final ConcatInnerSubscriber inner;

Expand All @@ -67,6 +65,7 @@ public CompletableConcatSubscriber(CompletableSubscriber actual, int prefetch) {
this.sr = new SerialSubscription();
this.inner = new ConcatInnerSubscriber();
this.wip = new AtomicInteger();
this.once = new AtomicBoolean();
add(sr);
request(prefetch);
}
Expand All @@ -84,7 +83,7 @@ public void onNext(Completable t) {

@Override
public void onError(Throwable t) {
if (ONCE.compareAndSet(this, 0, 1)) {
if (once.compareAndSet(false, true)) {
actual.onError(t);
return;
}
Expand Down Expand Up @@ -121,7 +120,7 @@ void next() {
Completable c = queue.poll();
if (c == null) {
if (d) {
if (ONCE.compareAndSet(this, 0, 1)) {
if (once.compareAndSet(false, true)) {
actual.onCompleted();
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,9 @@ static final class CompletableMergeSubscriber

volatile boolean done;

volatile Queue<Throwable> errors;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<CompletableMergeSubscriber, Queue> ERRORS =
AtomicReferenceFieldUpdater.newUpdater(CompletableMergeSubscriber.class, Queue.class, "errors");
final AtomicReference<Queue<Throwable>> errors;

volatile int once;
static final AtomicIntegerFieldUpdater<CompletableMergeSubscriber> ONCE =
AtomicIntegerFieldUpdater.newUpdater(CompletableMergeSubscriber.class, "once");
final AtomicBoolean once;

final AtomicInteger wip;

Expand All @@ -72,6 +67,8 @@ public CompletableMergeSubscriber(CompletableSubscriber actual, int maxConcurren
this.delayErrors = delayErrors;
this.set = new CompositeSubscription();
this.wip = new AtomicInteger(1);
this.once = new AtomicBoolean();
this.errors = new AtomicReference<Queue<Throwable>>();
if (maxConcurrency == Integer.MAX_VALUE) {
request(Long.MAX_VALUE);
} else {
Expand All @@ -80,17 +77,17 @@ public CompletableMergeSubscriber(CompletableSubscriber actual, int maxConcurren
}

Queue<Throwable> getOrCreateErrors() {
Queue<Throwable> q = errors;
Queue<Throwable> q = errors.get();

if (q != null) {
return q;
}

q = new ConcurrentLinkedQueue<Throwable>();
if (ERRORS.compareAndSet(this, null, q)) {
if (errors.compareAndSet(null, q)) {
return q;
}
return errors;
return errors.get();
}

@Override
Expand Down Expand Up @@ -167,23 +164,23 @@ public void onCompleted() {

void terminate() {
if (wip.decrementAndGet() == 0) {
Queue<Throwable> q = errors;
Queue<Throwable> q = errors.get();
if (q == null || q.isEmpty()) {
actual.onCompleted();
} else {
Throwable e = collectErrors(q);
if (ONCE.compareAndSet(this, 0, 1)) {
if (once.compareAndSet(false, true)) {
actual.onError(e);
} else {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
}
}
} else
if (!delayErrors) {
Queue<Throwable> q = errors;
Queue<Throwable> q = errors.get();
if (q != null && !q.isEmpty()) {
Throwable e = collectErrors(q);
if (ONCE.compareAndSet(this, 0, 1)) {
if (once.compareAndSet(false, true)) {
actual.onError(e);
} else {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
Expand Down
97 changes: 40 additions & 57 deletions src/main/java/rx/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,28 +106,16 @@ public static final class GroupBySubscriber<T, K, V>

final ProducerArbiter s;

volatile int cancelled;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupBySubscriber> CANCELLED =
AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "cancelled");
final AtomicBoolean cancelled;

volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<GroupBySubscriber> REQUESTED =
AtomicLongFieldUpdater.newUpdater(GroupBySubscriber.class, "requested");

volatile int groupCount;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupBySubscriber> GROUP_COUNT =
AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "groupCount");
final AtomicLong requested;

final AtomicInteger groupCount;

Throwable error;
volatile boolean done;

volatile int wip;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupBySubscriber> WIP =
AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "wip");
final AtomicInteger wip;

public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Func1<? super T, ? extends K> keySelector, Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError) {
this.actual = actual;
Expand All @@ -137,10 +125,13 @@ public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Fun
this.delayError = delayError;
this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
this.queue = new ConcurrentLinkedQueue<GroupedObservable<K, V>>();
GROUP_COUNT.lazySet(this, 1);
this.s = new ProducerArbiter();
this.s.request(bufferSize);
this.producer = new GroupByProducer(this);
this.cancelled = new AtomicBoolean();
this.requested = new AtomicLong();
this.groupCount = new AtomicInteger(1);
this.wip = new AtomicInteger();
}

@Override
Expand Down Expand Up @@ -172,11 +163,11 @@ public void onNext(T t) {
if (group == null) {
// if the main has been cancelled, stop creating groups
// and skip this value
if (cancelled == 0) {
if (!cancelled.get()) {
group = GroupedUnicast.createWith(key, bufferSize, this, delayError);
groups.put(mapKey, group);

GROUP_COUNT.getAndIncrement(this);
groupCount.getAndIncrement();

notNew = false;
q.offer(group);
Expand Down Expand Up @@ -210,7 +201,7 @@ public void onError(Throwable t) {
}
error = t;
done = true;
GROUP_COUNT.decrementAndGet(this);
groupCount.decrementAndGet();
drain();
}

Expand All @@ -226,7 +217,7 @@ public void onCompleted() {
groups.clear();

done = true;
GROUP_COUNT.decrementAndGet(this);
groupCount.decrementAndGet();
drain();
}

Expand All @@ -235,15 +226,15 @@ public void requestMore(long n) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}

BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
BackpressureUtils.getAndAddRequest(requested, n);
drain();
}

public void cancel() {
// cancelling the main source means we don't want any more groups
// but running groups still require new values
if (CANCELLED.compareAndSet(this, 0, 1)) {
if (GROUP_COUNT.decrementAndGet(this) == 0) {
if (cancelled.compareAndSet(false, true)) {
if (groupCount.decrementAndGet() == 0) {
unsubscribe();
}
}
Expand All @@ -252,14 +243,14 @@ public void cancel() {
public void cancel(K key) {
Object mapKey = key != null ? key : NULL_KEY;
if (groups.remove(mapKey) != null) {
if (GROUP_COUNT.decrementAndGet(this) == 0) {
if (groupCount.decrementAndGet() == 0) {
unsubscribe();
}
}
}

void drain() {
if (WIP.getAndIncrement(this) != 0) {
if (wip.getAndIncrement() != 0) {
return;
}

Expand All @@ -274,7 +265,7 @@ void drain() {
return;
}

long r = requested;
long r = requested.get();
boolean unbounded = r == Long.MAX_VALUE;
long e = 0L;

Expand All @@ -301,12 +292,12 @@ void drain() {

if (e != 0L) {
if (!unbounded) {
REQUESTED.addAndGet(this, e);
requested.addAndGet(e);
}
s.request(-e);
}

missed = WIP.addAndGet(this, -missed);
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
Expand Down Expand Up @@ -378,35 +369,27 @@ static final class State<T, K> extends AtomicInteger implements Producer, Subscr
final GroupBySubscriber<?, K, T> parent;
final boolean delayError;

volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<State> REQUESTED =
AtomicLongFieldUpdater.newUpdater(State.class, "requested");
final AtomicLong requested;

volatile boolean done;
Throwable error;

volatile int cancelled;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<State> CANCELLED =
AtomicIntegerFieldUpdater.newUpdater(State.class, "cancelled");

volatile Subscriber<? super T> actual;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<State, Subscriber> ACTUAL =
AtomicReferenceFieldUpdater.newUpdater(State.class, Subscriber.class, "actual");
final AtomicBoolean cancelled;

final AtomicReference<Subscriber<? super T>> actual;

volatile int once;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<State> ONCE =
AtomicIntegerFieldUpdater.newUpdater(State.class, "once");
final AtomicBoolean once;


public State(int bufferSize, GroupBySubscriber<?, K, T> parent, K key, boolean delayError) {
this.queue = new ConcurrentLinkedQueue<Object>();
this.parent = parent;
this.key = key;
this.delayError = delayError;
this.cancelled = new AtomicBoolean();
this.actual = new AtomicReference<Subscriber<? super T>>();
this.once = new AtomicBoolean();
this.requested = new AtomicLong();
}

@Override
Expand All @@ -415,19 +398,19 @@ public void request(long n) {
throw new IllegalArgumentException("n >= required but it was " + n);
}
if (n != 0L) {
BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
BackpressureUtils.getAndAddRequest(requested, n);
drain();
}
}

@Override
public boolean isUnsubscribed() {
return cancelled != 0;
return cancelled.get();
}

@Override
public void unsubscribe() {
if (CANCELLED.compareAndSet(this, 0, 1)) {
if (cancelled.compareAndSet(false, true)) {
if (getAndIncrement() == 0) {
parent.cancel(key);
}
Expand All @@ -436,10 +419,10 @@ public void unsubscribe() {

@Override
public void call(Subscriber<? super T> s) {
if (ONCE.compareAndSet(this, 0, 1)) {
if (once.compareAndSet(false, true)) {
s.add(this);
s.setProducer(this);
ACTUAL.lazySet(this, s);
actual.lazySet(s);
drain();
} else {
s.onError(new IllegalStateException("Only one Subscriber allowed!"));
Expand Down Expand Up @@ -475,15 +458,15 @@ void drain() {

final Queue<Object> q = queue;
final boolean delayError = this.delayError;
Subscriber<? super T> a = actual;
Subscriber<? super T> a = actual.get();
NotificationLite<T> nl = NotificationLite.instance();
for (;;) {
if (a != null) {
if (checkTerminated(done, q.isEmpty(), a, delayError)) {
return;
}

long r = requested;
long r = requested.get();
boolean unbounded = r == Long.MAX_VALUE;
long e = 0;

Expand All @@ -508,7 +491,7 @@ void drain() {

if (e != 0L) {
if (!unbounded) {
REQUESTED.addAndGet(this, e);
requested.addAndGet(e);
}
parent.s.request(-e);
}
Expand All @@ -519,13 +502,13 @@ void drain() {
break;
}
if (a == null) {
a = actual;
a = actual.get();
}
}
}

boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, boolean delayError) {
if (cancelled != 0) {
if (cancelled.get()) {
queue.clear();
parent.cancel(key);
return true;
Expand Down