Skip to content

Commit

Permalink
Addressed feedback from PR - made the Handlers class more reusable as…
Browse files Browse the repository at this point in the history
… a CyclicSequence
  • Loading branch information
vietj committed May 3, 2018
1 parent 16ef94c commit 042d3ff
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 103 deletions.
29 changes: 17 additions & 12 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java
Expand Up @@ -28,6 +28,7 @@
import io.vertx.core.eventbus.ReplyFailure; import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.eventbus.SendContext; import io.vertx.core.eventbus.SendContext;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.utils.CyclicSequence;
import io.vertx.core.logging.Logger; import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory; import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics; import io.vertx.core.spi.metrics.EventBusMetrics;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class EventBusImpl implements EventBus, MetricsProvider {
private final AtomicLong replySequence = new AtomicLong(0); private final AtomicLong replySequence = new AtomicLong(0);
protected final VertxInternal vertx; protected final VertxInternal vertx;
protected final EventBusMetrics metrics; protected final EventBusMetrics metrics;
protected final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<>(); protected final ConcurrentMap<String, CyclicSequence<HandlerHolder>> handlerMap = new ConcurrentHashMap<>();
protected final CodecManager codecManager = new CodecManager(); protected final CodecManager codecManager = new CodecManager();
protected volatile boolean started; protected volatile boolean started;


Expand Down Expand Up @@ -230,7 +231,7 @@ protected MessageImpl createMessage(boolean send, String address, MultiMap heade
protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration, protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) { boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(registration.getHandler(), "handler"); Objects.requireNonNull(registration.getHandler(), "handler");
LocalRegistrationResult result = addLocalRegistration(address, registration, replyHandler, localOnly); LocalRegistrationResult<T> result = addLocalRegistration(address, registration, replyHandler, localOnly);
addRegistration(result.newAddress, address, replyHandler, localOnly, registration::setResult); addRegistration(result.newAddress, address, replyHandler, localOnly, registration::setResult);
return result.holder; return result.holder;
} }
Expand All @@ -244,13 +245,13 @@ protected <T> void addRegistration(boolean newAddress, String address,
private static class LocalRegistrationResult<T> { private static class LocalRegistrationResult<T> {
final HandlerHolder<T> holder; final HandlerHolder<T> holder;
final boolean newAddress; final boolean newAddress;
public LocalRegistrationResult(HandlerHolder<T> holder, boolean newAddress) { LocalRegistrationResult(HandlerHolder<T> holder, boolean newAddress) {
this.holder = holder; this.holder = holder;
this.newAddress = newAddress; this.newAddress = newAddress;
} }
} }


private <T> LocalRegistrationResult addLocalRegistration(String address, HandlerRegistration<T> registration, private <T> LocalRegistrationResult<T> addLocalRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) { boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(address, "address"); Objects.requireNonNull(address, "address");


Expand All @@ -264,15 +265,18 @@ private <T> LocalRegistrationResult addLocalRegistration(String address, Handler


HandlerHolder<T> holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context); HandlerHolder<T> holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context);


Handlers handlers = new Handlers(holder); CyclicSequence<HandlerHolder> handlers = new CyclicSequence<HandlerHolder>().add(holder);
Handlers actual = handlerMap.merge(address, handlers, (prev, current) -> prev.add(current.first())); CyclicSequence<HandlerHolder> newHandlers = handlerMap.merge(
address,
handlers,
(oldHandlers, current) -> oldHandlers.add(current.first()));


if (hasContext) { if (hasContext) {
HandlerEntry entry = new HandlerEntry<>(address, registration); HandlerEntry entry = new HandlerEntry<>(address, registration);
context.addCloseHook(entry); context.addCloseHook(entry);
} }


boolean newAddress = handlers == actual; boolean newAddress = handlers == newHandlers;
return new LocalRegistrationResult<>(holder, newAddress); return new LocalRegistrationResult<>(holder, newAddress);
} }


Expand All @@ -281,7 +285,7 @@ protected <T> void removeRegistration(HandlerHolder<T> holder, Handler<AsyncResu
removeRegistration(last ? holder : null, holder.getHandler().address(), completionHandler); removeRegistration(last ? holder : null, holder.getHandler().address(), completionHandler);
} }


protected <T> void removeRegistration(HandlerHolder handlerHolder, String address, protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, String address,
Handler<AsyncResult<Void>> completionHandler) { Handler<AsyncResult<Void>> completionHandler) {
callCompletionHandlerAsync(completionHandler); callCompletionHandlerAsync(completionHandler);
} }
Expand All @@ -292,7 +296,8 @@ private <T> boolean removeLocalRegistration(HandlerHolder<T> holder) {
if (val == null) { if (val == null) {
return null; return null;
} }
return val.remove(holder); CyclicSequence<HandlerHolder> next = val.remove(holder);
return next.size() == 0 ? null : next;
}) == null; }) == null;
if (holder.setRemoved()) { if (holder.setRemoved()) {
holder.getContext().removeCloseHook(new HandlerEntry<>(address, holder.getHandler())); holder.getContext().removeCloseHook(new HandlerEntry<>(address, holder.getHandler()));
Expand Down Expand Up @@ -366,11 +371,11 @@ protected boolean isMessageLocal(MessageImpl msg) {


protected <T> boolean deliverMessageLocally(MessageImpl msg) { protected <T> boolean deliverMessageLocally(MessageImpl msg) {
msg.setBus(this); msg.setBus(this);
Handlers handlers = handlerMap.get(msg.address()); CyclicSequence<HandlerHolder> handlers = handlerMap.get(msg.address());
if (handlers != null) { if (handlers != null) {
if (msg.isSend()) { if (msg.isSend()) {
//Choose one //Choose one
HandlerHolder holder = handlers.choose(); HandlerHolder holder = handlers.next();
if (metrics != null) { if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), holder != null ? 1 : 0); metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), holder != null ? 1 : 0);
} }
Expand Down Expand Up @@ -498,7 +503,7 @@ public void next() {


private void unregisterAll() { private void unregisterAll() {
// Unregister all handlers explicitly - don't rely on context hooks // Unregister all handlers explicitly - don't rely on context hooks
for (Handlers handlers: handlerMap.values()) { for (CyclicSequence<HandlerHolder> handlers: handlerMap.values()) {
for (HandlerHolder holder: handlers) { for (HandlerHolder holder: handlers) {
holder.getHandler().unregister(); holder.getHandler().unregister();
} }
Expand Down
90 changes: 0 additions & 90 deletions src/main/java/io/vertx/core/eventbus/impl/Handlers.java

This file was deleted.

Expand Up @@ -215,7 +215,7 @@ protected <T> void addRegistration(boolean newAddress, String address,
} }


@Override @Override
protected <T> void removeRegistration(HandlerHolder lastHolder, String address, protected <T> void removeRegistration(HandlerHolder<T> lastHolder, String address,
Handler<AsyncResult<Void>> completionHandler) { Handler<AsyncResult<Void>> completionHandler) {
if (lastHolder != null && subs != null && !lastHolder.isLocalOnly()) { if (lastHolder != null && subs != null && !lastHolder.isLocalOnly()) {
ownSubs.remove(address); ownSubs.remove(address);
Expand Down
137 changes: 137 additions & 0 deletions src/main/java/io/vertx/core/impl/utils/CyclicSequence.java
@@ -0,0 +1,137 @@
/*
* Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/

package io.vertx.core.impl.utils;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A thread safe cyclic sequence of elements that can be used for round robin.
* <p/>
* A sequence is immutable and mutations uses {@link #add(Object)} and {@link #remove(Object)}
* to return a modified copy of the current instance.
* <p/>
* The iterator uses a volatile index, so it can be incremented concurrently by several
* threads with locking.
*
* @author <a href="http://tfox.org">Tim Fox</a>
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class CyclicSequence<T> implements Iterable<T>, Iterator<T> {

private final AtomicInteger pos;
private final Object[] elements;

/**
* Create a new empty sequence.
*/
public CyclicSequence() {
this(0, new Object[0]);
}

private CyclicSequence(int pos, Object[] elements) {
this.pos = new AtomicInteger(elements.length > 0 ? pos % elements.length : 0);
this.elements = elements;
}

/**
* @return the current index
*/
public int index() {
return pos.get();
}

/**
* @return the first element
*/
@SuppressWarnings("unchecked")
public T first() {
return (T) (elements.length > 0 ? elements[0] : null);
}

/**
* Copy the current sequence, add {@code element} at the tail of this sequence and returns it.
* @param element the element to add
* @return the result
*/
public CyclicSequence<T> add(T element) {
int len = elements.length;
Object[] copy = Arrays.copyOf(elements, len + 1);
copy[len] = element;
return new CyclicSequence<>(pos.get(), copy);
}

/**
* Remove the first occurrence of {@code element} in this sequence and returns it.
* <p/>
* If the sequence does not contains {@code element}, this instance is returned instead.
*
* @param element the element to remove
* @return the result
*/
public CyclicSequence<T> remove(T element) {
int len = elements.length;
for (int i = 0;i < len;i++) {
if (Objects.equals(element, elements[i])) {
if (len > 1) {
Object[] copy = new Object[len - 1];
System.arraycopy(elements,0, copy, 0, i);
System.arraycopy(elements, i + 1, copy, i, len - i - 1);
return new CyclicSequence<>(pos.get(), copy);
} else {
return new CyclicSequence<>();
}
}
}
return this;
}

@Override
public boolean hasNext() {
return true;
}

@SuppressWarnings("unchecked")
@Override
public T next() {
int len = elements.length;
switch (len) {
case 0:
return null;
case 1:
return (T) elements[0];
default:
int p = pos.getAndIncrement();
if (p >= len) {
p = p % len;
if (p == 0) {
pos.addAndGet(-len);
}
}
return (T) elements[p];
}
}

/**
* @return the size of this sequence
*/
public int size() {
return elements.length;
}

@Override
@SuppressWarnings("unchecked")
public Iterator<T> iterator() {
return Arrays.<T>asList((T[]) elements).iterator();
}
}

0 comments on commit 042d3ff

Please sign in to comment.