Slow signal delivery from off-main thread #31

Closed
akarnokd opened this Issue Apr 21, 2016 · 1 comment

Comments

Projects
None yet
2 participants
@akarnokd

This code snippet takes significantly more time to run through than a similar RxJava sequence when n is 10.000+ (i7 4790, Windows 7 x64, Studio 2.0, HAXM on).

I have no experience with the platform or the intentions of this library, but my guess is that the mandatory hopping in the underlying Agera infrastructure creates excessive amounts of tasks/messages, basically for each update() signal.

Given only the base types, I'd reduce the overhead by coalescing the update signals into a simple number of calls that have to be done on the other side of the boundary:

import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.support.annotation.NonNull;

import com.google.android.agera.Observable;
import com.google.android.agera.Updatable;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Signals the update() coming from an upstream Observable on the specified Looper.
 */
public final class UpdateOn implements Observable {

    final Observable source;

    final Looper looper;

    final Map<Updatable, UpdateOnUpdatable> map;

    public UpdateOn(Observable source, Looper looper) {
        this.source = source;
        this.looper = looper;
        this.map = new HashMap<>();
    }


    @Override
    public void addUpdatable(@NonNull Updatable updatable) {
        UpdateOnUpdatable u;
        synchronized (this) {
            if (map.containsKey(updatable)) {
                throw new IllegalStateException("Updatable already added");
            }

            u = new UpdateOnUpdatable(updatable, looper);
            map.put(updatable, u);
        }
        source.addUpdatable(u);
    }

    @Override
    public void removeUpdatable(@NonNull Updatable updatable) {
        UpdateOnUpdatable u;
        synchronized (this) {
            u = map.remove(updatable);
            if (u == null) {
                throw new IllegalStateException("Updatable already removed");
            }
        }
        u.cancelled = true;
        source.removeUpdatable(u);
    }

    static final class UpdateOnUpdatable implements Updatable, Handler.Callback {
        final Updatable actual;

        final Handler handler;

        final AtomicLong count;

        volatile boolean cancelled;

        UpdateOnUpdatable(Updatable actual, Looper looper) {
            this.actual = actual;
            this.count = new AtomicLong();
            this.handler = new Handler(looper, this);
        }

        @Override
        public void update() {
            if (count.getAndIncrement() == 0) {
                Message msg = handler.obtainMessage();
                handler.dispatchMessage(msg);
            }
        }

        @Override
        public boolean handleMessage(Message msg) {
            long c = count.get();

            for (;;) {
                for (long i = 0; i < c; i++) {
                    if (cancelled) {
                        break;
                    }
                    actual.update();
                }

                c = count.addAndGet(-c);
                if (c == 0L) {
                    break;
                }
            }

            return false;
        }
    }
}

(Note, however, that Agera seems to send other types of messages through the looper/handler such as first/last updatable and my suggestion above might starve or be unfair towards those messages.)

@ernstsson

This comment has been minimized.

Show comment
Hide comment
@ernstsson

ernstsson Apr 21, 2016

Collaborator

Nice benchmark app, been playing around with it quite a bit now. You are indeed right, looks like way too many messages are sent. Since a Repository doesn't guarantee one update per change (a consequence of the push event/pull data design) sending them all on when there's already messages in the queue is not needed. I have a fix coming up that simply filters these out that will bring the performance down to reasonable levels. The benchmark app will report a "Done:" of a lower number, but it's in the nature of the Repository design (as mentioned in the first paragraph of Reservoirs and parallelism.
Thanks!

Collaborator

ernstsson commented Apr 21, 2016

Nice benchmark app, been playing around with it quite a bit now. You are indeed right, looks like way too many messages are sent. Since a Repository doesn't guarantee one update per change (a consequence of the push event/pull data design) sending them all on when there's already messages in the queue is not needed. I have a fix coming up that simply filters these out that will bring the performance down to reasonable levels. The benchmark app will report a "Done:" of a lower number, but it's in the nature of the Repository design (as mentioned in the first paragraph of Reservoirs and parallelism.
Thanks!

@ernstsson ernstsson closed this in e0313d4 Apr 21, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment