Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client backpressure bug #10253

Closed
pveentjer opened this issue Apr 7, 2017 · 1 comment
Closed

Client backpressure bug #10253

pveentjer opened this issue Apr 7, 2017 · 1 comment

Comments

@pveentjer
Copy link
Contributor

@pveentjer pveentjer commented Apr 7, 2017

There is a race condition here because the check if someting is complete and making the InternalDelegatingExecutionCallback isn't atomic.

So it could happen a single invocation future leads to multiple callidsequence completion. And the consequence is that the back pressure doesn't work for the client since you can get an increasing number of invocations.

  @Override
    public void andThen(ExecutionCallback<ClientMessage> callback) {
        isNotNull(callback, "callback");

        if (completeCount.get() == 0) {
            try {
                callback.onResponse(get());
            } catch (Exception e) {
                callback.onFailure(peel(e));
            }
            return;
        }
        super.andThen(new InternalDelegatingExecutionCallback(callback));
    }

I made some modifications to demonstrate the behavior:

public class ClientInvocationFuture extends AbstractInvocationFuture<ClientMessage> {

    private final AtomicIntegerFieldUpdater COMPLETED_COUNT = newUpdater(ClientInvocationFuture.class, "completeCount");

    private final ClientMessage request;
    private final ClientInvocation invocation;
    private final CallIdSequence callIdSequence;

    public final static ConcurrentHashMap<Long,AtomicLong> invocations = new ConcurrentHashMap<Long, AtomicLong>();

    private volatile int completeCount = 1;

    public ClientInvocationFuture(ClientInvocation invocation, Executor internalExecutor,
                                  ClientMessage request, ILogger logger,
                                  CallIdSequence callIdSequence) {
        super(internalExecutor, logger);
        this.request = request;
        this.invocation = invocation;
        this.callIdSequence = callIdSequence;
    }

    @Override
    protected String invocationToString() {
        return request.toString();
    }

    @Override
    protected void onInterruptDetected() {
        complete(new InterruptedException());
    }

    @Override
    protected TimeoutException newTimeoutException(long timeout, TimeUnit unit) {
        return new TimeoutException();
    }

    @Override
    protected Throwable unwrap(Throwable throwable) {
        return throwable;
    }

    @Override
    protected Object resolve(Object value) {
        if (value instanceof Throwable) {
            return new ExecutionException((Throwable) value);
        }
        return value;
    }

    @Override
    public void andThen(ExecutionCallback<ClientMessage> callback) {
        isNotNull(callback, "callback");
        System.out.println(Thread.currentThread().getName()+" getting paused: "+completeCount);

        if (completeCount == 0) {
            try {
                callback.onResponse(get());
            } catch (Exception e) {
                callback.onFailure(peel(e));
            }
            return;
        }

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+" continuing");
        super.andThen(new InternalDelegatingExecutionCallback(callback));
    }


    @Override
    protected void onComplete() {
        complete();
    }

    private void complete() {
        if (COMPLETED_COUNT.decrementAndGet(this) == 0) {
            AtomicLong counter = ConcurrencyUtil.getOrPutIfAbsent(invocations, request.getCorrelationId(), new ConstructorFunction<Long, AtomicLong>() {
                @Override
                public AtomicLong createNew(Long arg) {
                    return new AtomicLong();
                }
            });
            counter.incrementAndGet();
            System.out.println("completion:"+invocation.getClientMessage());
            callIdSequence.complete();
        }
    }

    @Override
    public ClientMessage resolveAndThrowIfException(Object response) throws ExecutionException, InterruptedException {
        if (response instanceof Throwable) {
            fixAsyncStackTrace((Throwable) response, Thread.currentThread().getStackTrace());
            if (response instanceof ExecutionException) {
                throw (ExecutionException) response;
            }
            if (response instanceof Error) {
                throw (Error) response;
            }
            if (response instanceof InterruptedException) {
                throw (InterruptedException) response;
            }
            if (response instanceof CancellationException) {
                throw (CancellationException) response;
            }
            throw new ExecutionException((Throwable) response);
        }
        return (ClientMessage) response;
    }

    public ClientInvocation getInvocation() {
        return invocation;
    }

    class InternalDelegatingExecutionCallback implements ExecutionCallback<ClientMessage> {

        private final ExecutionCallback<ClientMessage> callback;

        InternalDelegatingExecutionCallback(ExecutionCallback<ClientMessage> callback) {
            this.callback = callback;
            COMPLETED_COUNT.incrementAndGet(ClientInvocationFuture.this);
        }

        @Override
        public void onResponse(ClientMessage message) {
            try {
                callback.onResponse(message);
            } finally {
                complete();
            }
        }

        @Override
        public void onFailure(Throwable t) {
            try {
                callback.onFailure(t);
            } finally {
                complete();
            }
        }
    }
}

And a demo program

public class Main {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        HazelcastInstance client = HazelcastClient.newHazelcastClient();

        IAtomicLong atomicLong = client.getAtomicLong("f");

        System.out.println("Making async call--------------------------------------------------------------------");

        final ICompletableFuture f = atomicLong.alterAndGetAsync(new Function());
        System.out.println(f.getClass());

        f.andThen(new ExecutionCallback() {
            @Override
            public void onResponse(Object response) {
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        Thread.sleep(10000);


        for (Map.Entry<Long,AtomicLong> entry : ClientInvocationFuture.invocations.entrySet()) {

            System.out.println("call id:"+entry.getKey()+" completed count: "+entry.getValue());
        }

        System.out.println("done");
    }

    public static class Function implements IFunction<Long, Long> {
        @Override
        public Long apply(Long input) {
            System.out.println("Starting function");
            try {
                Thread.sleep(2500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Completed function");
            return input;
        }
    }
}

When I run this program, I get the following output:

call id:1 completed count: 2
call id:2 completed count: 1
call id:3 completed count: 1
call id:4 completed count: 1
call id:5 completed count: 1
call id:6 completed count: 1
call id:7 completed count: 1
done

As can be seen, the call with id 2, is completed twice.

@sancar
Copy link
Member

@sancar sancar commented Apr 10, 2017

fixed by #10270

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
3 participants