Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove redundancy in EventBus classes #4957

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,10 @@ private <T> HandlerHolder<T> addLocalRegistration(String address, HandlerRegistr

HandlerHolder<T> holder = createHandlerHolder(registration, localOnly, context);

ConcurrentCyclicSequence<HandlerHolder> handlers = new ConcurrentCyclicSequence<HandlerHolder>().add(holder);
ConcurrentCyclicSequence<HandlerHolder> actualHandlers = handlerMap.merge(
address,
handlers,
(old, prev) -> old.add(prev.first()));
ConcurrentCyclicSequence<HandlerHolder> actualHandlers = handlerMap.compute(address,
(adr, prev) -> prev == null // first? => create new with one holder
? new ConcurrentCyclicSequence<HandlerHolder>().add(holder)
: prev.add(holder));

if (context.isDeployment()) {
context.addCloseHook(registration);
Expand Down Expand Up @@ -496,4 +495,3 @@ private void removeInterceptor(AtomicReferenceFieldUpdater<EventBusImpl, Handler
}
}
}

23 changes: 10 additions & 13 deletions src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class HandlerRegistration<T> implements Closeable {
protected final String address;
protected final boolean src;
private Consumer<Promise<Void>> registered;
private Object metric;
protected Object metric;

HandlerRegistration(ContextInternal context,
EventBusImpl bus,
Expand Down Expand Up @@ -60,11 +60,11 @@ public String address() {

protected abstract boolean doReceive(Message<T> msg);

protected abstract void dispatch(Message<T> msg, ContextInternal context, Handler<Message<T>> handler);
protected abstract void dispatch(Message<T> msg, ContextInternal context);

synchronized void register(boolean broadcast, boolean localOnly, Promise<Void> promise) {
if (registered != null) {
throw new IllegalStateException();
throw new IllegalStateException("already registered");
}
registered = bus.addRegistration(address, this, broadcast, localOnly, promise);
if (bus.metrics != null) {
Expand Down Expand Up @@ -92,8 +92,8 @@ public Future<Void> unregister() {
return promise.future();
}

void dispatch(Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl<?, T>) message, theHandler, context);
final void dispatchIDC(Message<T> message, ContextInternal context) {
InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl<?, T>) message, context);
deliveryCtx.dispatch();
}

Expand All @@ -108,16 +108,13 @@ void discard(Message<T> msg) {
}
}

private class InboundDeliveryContext extends DeliveryContextBase<T> {
private final class InboundDeliveryContext extends DeliveryContextBase<T> {

private final Handler<Message<T>> handler;

private InboundDeliveryContext(MessageImpl<?, T> message, Handler<Message<T>> handler, ContextInternal context) {
private InboundDeliveryContext(MessageImpl<?, T> message, ContextInternal context) {
super(message, message.bus.inboundInterceptors(), context);

this.handler = handler;
}

@Override
protected void execute() {
ContextInternal ctx = InboundDeliveryContext.super.context;
Object m = metric;
Expand All @@ -127,13 +124,13 @@ protected void execute() {
}
if (tracer != null && !src) {
message.trace = tracer.receiveRequest(ctx, SpanKind.RPC, TracingPolicy.PROPAGATE, message, message.isSend() ? "send" : "publish", message.headers(), MessageTagExtractor.INSTANCE);
HandlerRegistration.this.dispatch(message, ctx, handler);
HandlerRegistration.this.dispatch(message, ctx);
Object trace = message.trace;
if (message.replyAddress == null && trace != null) {
tracer.sendResponse(this.context, null, trace, null, TagExtractor.empty());
}
} else {
HandlerRegistration.this.dispatch(message, ctx, handler);
HandlerRegistration.this.dispatch(message, ctx);
}
}

Expand Down
48 changes: 21 additions & 27 deletions src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public class MessageConsumerImpl<T> extends HandlerRegistration<T> implements Me
private Queue<Message<T>> pending = new ArrayDeque<>(8);
private long demand = Long.MAX_VALUE;
private Promise<Void> result;
private boolean registered;

MessageConsumerImpl(ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly) {
super(context, eventBus, address, false);
Expand Down Expand Up @@ -85,14 +84,17 @@ public synchronized int getMaxBufferedMessages() {
return maxBufferedMessages;
}

public synchronized int getPendingQueueSize() {
return pending.size();
}

@Override
public synchronized Future<Void> completion() {
return result.future();
}

@Override
public synchronized Future<Void> unregister() {
handler = null;
if (endHandler != null) {
endHandler.handle(null);
}
Expand All @@ -108,20 +110,21 @@ public synchronized Future<Void> unregister() {
}
}
discardHandler = null;
Future<Void> fut = super.unregister();
if (registered) {
registered = false;
Promise<Void> res = result; // Alias reference because result can become null when the onComplete callback executes
boolean wasRegisteredOrRegistering = isRegistered();

Future<Void> fut = super.unregister();// ~ handler = null
if (wasRegisteredOrRegistering) {
Promise<Void> res = result; // Alias reference because result can be changed after onComplete callback executes
fut.onComplete(ar -> res.tryFail("Consumer unregistered before registration completed"));
result = context.promise();
result = context.promise();// old result is-Complete or will be-Complete shortly
}
return fut;
}

@Override
protected boolean doReceive(Message<T> message) {
Handler<Message<T>> theHandler;
synchronized (this) {
if (handler == null) {
if (handler == null || !isRegistered()) {
return false;
}
if (demand == 0L) {
Expand All @@ -145,25 +148,21 @@ protected boolean doReceive(Message<T> message) {
if (demand != Long.MAX_VALUE) {
demand--;
}
theHandler = handler;
}
}
deliver(theHandler, message);
deliver(message);
return true;
}

@Override
protected void dispatch(Message<T> msg, ContextInternal context, Handler<Message<T>> handler) {
if (handler == null) {
throw new NullPointerException();
}
protected void dispatch(Message<T> msg, ContextInternal context) {
context.dispatch(msg, handler);
}

private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
private void deliver(Message<T> message) {
// Handle the message outside the sync block
// https://bugs.eclipse.org/bugs/show_bug.cgi?id=473714
dispatch(theHandler, message, context.duplicate());
dispatchIDC(message, context.duplicate());
checkNextTick();
}

Expand All @@ -172,17 +171,15 @@ private synchronized void checkNextTick() {
if (!pending.isEmpty() && demand > 0L) {
context.nettyEventLoop().execute(() -> {
Message<T> message;
Handler<Message<T>> theHandler;
synchronized (MessageConsumerImpl.this) {
if (demand == 0L || (message = pending.poll()) == null) {
return;
}
if (demand != Long.MAX_VALUE) {
demand--;
}
theHandler = handler;
}
deliver(theHandler, message);
deliver(message);
});
}
}
Expand All @@ -198,12 +195,11 @@ public synchronized void discardHandler(Handler<Message<T>> handler) {
public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
if (h != null) {
synchronized (this) {
handler = h;
if (!registered) {
registered = true;
handler = h; // set or update
if (!isRegistered()) {
Promise<Void> p = result;
Promise<Void> registration = context.promise();
register(true, localOnly, registration);
register(true, localOnly, registration);// registered = true
registration.future().onComplete(ar -> {
if (ar.succeeded()) {
p.tryComplete();
Expand Down Expand Up @@ -237,9 +233,7 @@ public MessageConsumer<T> resume() {

@Override
public synchronized MessageConsumer<T> fetch(long amount) {
if (amount < 0) {
throw new IllegalArgumentException();
}
Arguments.require(amount >= 0, "fetch(amount) must be positive, but: "+amount);
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void handle(Long id) {

@Override
protected boolean doReceive(Message<T> reply) {
dispatch(null, reply, context);
dispatchIDC(reply, context);
return true;
}

Expand All @@ -81,7 +81,7 @@ void register() {
}

@Override
protected void dispatch(Message<T> reply, ContextInternal context, Handler<Message<T>> handler /* null */) {
protected void dispatch(Message<T> reply, ContextInternal context) {
if (context.owner().cancelTimer(timeoutID)) {
unregister();
if (reply.body() instanceof ReplyException) {
Expand Down
Loading
Loading