Skip to content

Commit

Permalink
Merge pull request #1177 from eclipse/eventbus-refactor
Browse files Browse the repository at this point in the history
Eventbus refactor
  • Loading branch information
purplefox committed Oct 15, 2015
2 parents c783c2e + 1c9ac00 commit 58eebfc
Show file tree
Hide file tree
Showing 30 changed files with 1,920 additions and 1,533 deletions.
51 changes: 32 additions & 19 deletions src/main/asciidoc/dataobjects.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ Sets whether or not the current link is hidden.
+++
Sets the argument index.
+++
|[[multiValued]]`multiValued`|`Boolean`|
+++
Sets whether or not the argument can receive several values. Only the last argument can receive several values.
+++
|[[required]]`required`|`Boolean`|
+++
Sets whether or not the current link is required.
Expand Down Expand Up @@ -452,6 +456,11 @@ Set the store as a buffer
+++
Sets te arg name for this option.
+++
|[[choices]]`choices`|`Array of String`|
+++
Adds a choice to the list of values accepted by this option. If the value set by the user does not match once of these
values, a link exception is thrown.
+++
|[[defaultValue]]`defaultValue`|`String`|
+++
Sets the default value of this option
Expand All @@ -468,6 +477,10 @@ Configures the current link to be a flag. It will be evaluated to <code>true</co
option.setFlag(true).setSingleValued(true)
</pre></code>
+++
|[[help]]`help`|`Boolean`|
+++
Sets whether or not this option is a "help" option
+++
|[[hidden]]`hidden`|`Boolean`|
+++
Sets whether or not this option should be hidden
Expand Down Expand Up @@ -619,25 +632,6 @@ Set whether Netty pooled buffers are enabled
+++
|===

[[MetricsOptions]]
== MetricsOptions

++++
Vert.x metrics base configuration, this class can be extended by provider implementations to configure
those specific implementations.
++++
'''

[cols=">25%,^25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
|[[enabled]]`enabled`|`Boolean`|
+++
Set whether metrics will be enabled on the Vert.x instance.
+++
|===

[[ClientOptionsBase]]
== ClientOptionsBase

Expand Down Expand Up @@ -737,6 +731,25 @@ Set whether Netty pooled buffers are enabled
+++
|===

[[MetricsOptions]]
== MetricsOptions

++++
Vert.x metrics base configuration, this class can be extended by provider implementations to configure
those specific implementations.
++++
'''

[cols=">25%,^25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
|[[enabled]]`enabled`|`Boolean`|
+++
Set whether metrics will be enabled on the Vert.x instance.
+++
|===

[[DeploymentOptions]]
== DeploymentOptions

Expand Down
4 changes: 2 additions & 2 deletions src/main/asciidoc/java/override/dependencies.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ project descriptor to access the Vert.x Core API:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.1.0</version>
<version>3.2.0-SNAPSHOT</version>
</dependency>
----

* Gradle (in your `build.gradle` file):
[source,groovy,subs="+attributes"]
----
compile io.vertx:vertx-core:3.1.0
compile io.vertx:vertx-core:3.2.0-SNAPSHOT
----
3 changes: 2 additions & 1 deletion src/main/asciidoc/java/streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,5 @@ This has the same meaning as `link:../../apidocs/io/vertx/core/streams/WriteStre

A pump can be started and stopped multiple times.

When a pump is first created it is _not_ started. You need to call the `start()` method to start it.
When a pump is first created it is _not_ started. You need to call the `start()` method to start it.
<a href="mailto:julien@julienviet.com">Julien Viet</a>
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
package io.vertx.core;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/vertx/core/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.vertx.core;

import io.netty.channel.EventLoop;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.impl.ContextImpl;
Expand Down Expand Up @@ -217,4 +216,10 @@ static boolean isOnVertxThread() {
*/
int getInstanceCount();

@GenIgnore
void addCloseHook(Closeable hook);

@GenIgnore
void removeCloseHook(Closeable hook);

}
23 changes: 15 additions & 8 deletions src/main/java/io/vertx/core/eventbus/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.metrics.Measured;
import io.vertx.core.streams.WriteStream;

/**
* A Vert.x event-bus is a light-weight distributed messaging system which allows different parts of your application,
Expand Down Expand Up @@ -194,13 +193,6 @@ public interface EventBus extends Measured {
*/
<T> MessageProducer<T> publisher(String address, DeliveryOptions options);

/**
* Close the event bus and release any resources held
*
* @param completionHandler may be {@code null}
*/
void close(Handler<AsyncResult<Void>> completionHandler);

/**
* Register a message codec.
* <p>
Expand Down Expand Up @@ -249,5 +241,20 @@ public interface EventBus extends Measured {
@GenIgnore
EventBus unregisterDefaultCodec(Class clazz);

/**
* Start the event bus. This would not normally be called in user code
*
* @param completionHandler
*/
void start(Handler<AsyncResult<Void>> completionHandler);

/**
* Close the event bus and release any resources held. This would not normally be called in user code
*
* @param completionHandler may be {@code null}
*/
void close(Handler<AsyncResult<Void>> completionHandler);


}

154 changes: 154 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/CodecManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.impl.codecs.*;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class CodecManager {

// The standard message codecs
public static final MessageCodec<String, String> PING_MESSAGE_CODEC = new PingMessageCodec();
public static final MessageCodec<String, String> NULL_MESSAGE_CODEC = new NullMessageCodec();
public static final MessageCodec<String, String> STRING_MESSAGE_CODEC = new StringMessageCodec();
public static final MessageCodec<Buffer, Buffer> BUFFER_MESSAGE_CODEC = new BufferMessageCodec();
public static final MessageCodec<JsonObject, JsonObject> JSON_OBJECT_MESSAGE_CODEC = new JsonObjectMessageCodec();
public static final MessageCodec<JsonArray, JsonArray> JSON_ARRAY_MESSAGE_CODEC = new JsonArrayMessageCodec();
public static final MessageCodec<byte[], byte[]> BYTE_ARRAY_MESSAGE_CODEC = new ByteArrayMessageCodec();
public static final MessageCodec<Integer, Integer> INT_MESSAGE_CODEC = new IntMessageCodec();
public static final MessageCodec<Long, Long> LONG_MESSAGE_CODEC = new LongMessageCodec();
public static final MessageCodec<Float, Float> FLOAT_MESSAGE_CODEC = new FloatMessageCodec();
public static final MessageCodec<Double, Double> DOUBLE_MESSAGE_CODEC = new DoubleMessageCodec();
public static final MessageCodec<Boolean, Boolean> BOOLEAN_MESSAGE_CODEC = new BooleanMessageCodec();
public static final MessageCodec<Short, Short> SHORT_MESSAGE_CODEC = new ShortMessageCodec();
public static final MessageCodec<Character, Character> CHAR_MESSAGE_CODEC = new CharMessageCodec();
public static final MessageCodec<Byte, Byte> BYTE_MESSAGE_CODEC = new ByteMessageCodec();
public static final MessageCodec<ReplyException, ReplyException> REPLY_EXCEPTION_MESSAGE_CODEC = new ReplyExceptionMessageCodec();

private final MessageCodec[] systemCodecs;
private final ConcurrentMap<String, MessageCodec> userCodecMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Class, MessageCodec> defaultCodecMap = new ConcurrentHashMap<>();

public CodecManager() {
this.systemCodecs = codecs(NULL_MESSAGE_CODEC, PING_MESSAGE_CODEC, STRING_MESSAGE_CODEC, BUFFER_MESSAGE_CODEC, JSON_OBJECT_MESSAGE_CODEC, JSON_ARRAY_MESSAGE_CODEC,
BYTE_ARRAY_MESSAGE_CODEC, INT_MESSAGE_CODEC, LONG_MESSAGE_CODEC, FLOAT_MESSAGE_CODEC, DOUBLE_MESSAGE_CODEC,
BOOLEAN_MESSAGE_CODEC, SHORT_MESSAGE_CODEC, CHAR_MESSAGE_CODEC, BYTE_MESSAGE_CODEC, REPLY_EXCEPTION_MESSAGE_CODEC);
}

public MessageCodec lookupCodec(Object body, String codecName) {
MessageCodec codec;
if (codecName != null) {
codec = userCodecMap.get(codecName);
if (codec == null) {
throw new IllegalArgumentException("No message codec for name: " + codecName);
}
} else if (body == null) {
codec = NULL_MESSAGE_CODEC;
} else if (body instanceof String) {
codec = STRING_MESSAGE_CODEC;
} else if (body instanceof Buffer) {
codec = BUFFER_MESSAGE_CODEC;
} else if (body instanceof JsonObject) {
codec = JSON_OBJECT_MESSAGE_CODEC;
} else if (body instanceof JsonArray) {
codec = JSON_ARRAY_MESSAGE_CODEC;
} else if (body instanceof byte[]) {
codec = BYTE_ARRAY_MESSAGE_CODEC;
} else if (body instanceof Integer) {
codec = INT_MESSAGE_CODEC;
} else if (body instanceof Long) {
codec = LONG_MESSAGE_CODEC;
} else if (body instanceof Float) {
codec = FLOAT_MESSAGE_CODEC;
} else if (body instanceof Double) {
codec = DOUBLE_MESSAGE_CODEC;
} else if (body instanceof Boolean) {
codec = BOOLEAN_MESSAGE_CODEC;
} else if (body instanceof Short) {
codec = SHORT_MESSAGE_CODEC;
} else if (body instanceof Character) {
codec = CHAR_MESSAGE_CODEC;
} else if (body instanceof Byte) {
codec = BYTE_MESSAGE_CODEC;
} else if (body instanceof ReplyException) {
codec = REPLY_EXCEPTION_MESSAGE_CODEC;
} else {
codec = defaultCodecMap.get(body.getClass());
if (codec == null) {
throw new IllegalArgumentException("No message codec for type: " + body.getClass());
}
}
return codec;
}

public MessageCodec getCodec(String codecName) {
return userCodecMap.get(codecName);
}

public void registerCodec(MessageCodec codec) {
Objects.requireNonNull(codec, "codec");
Objects.requireNonNull(codec.name(), "code.name()");
checkSystemCodec(codec);
if (userCodecMap.containsKey(codec.name())) {
throw new IllegalStateException("Already a codec registered with name " + codec.name());
}
userCodecMap.put(codec.name(), codec);
}

public void unregisterCodec(String name) {
Objects.requireNonNull(name);
userCodecMap.remove(name);
}

public <T> void registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec) {
Objects.requireNonNull(clazz);
Objects.requireNonNull(codec, "codec");
Objects.requireNonNull(codec.name(), "code.name()");
checkSystemCodec(codec);
if (defaultCodecMap.containsKey(clazz)) {
throw new IllegalStateException("Already a default codec registered for class " + clazz);
}
if (userCodecMap.containsKey(codec.name())) {
throw new IllegalStateException("Already a codec registered with name " + codec.name());
}
defaultCodecMap.put(clazz, codec);
userCodecMap.put(codec.name(), codec);
}

public void unregisterDefaultCodec(Class clazz) {
Objects.requireNonNull(clazz);
MessageCodec codec = defaultCodecMap.remove(clazz);
if (codec != null) {
userCodecMap.remove(codec.name());
}
}

public MessageCodec[] systemCodecs() {
return systemCodecs;
}

private void checkSystemCodec(MessageCodec codec) {
if (codec.systemCodecID() != -1) {
throw new IllegalArgumentException("Can't register a system codec");
}
}

private MessageCodec[] codecs(MessageCodec... codecs) {
MessageCodec[] arr = new MessageCodec[codecs.length];
for (MessageCodec codec: codecs) {
arr[codec.systemCodecID()] = codec;
}
return arr;
}


}
27 changes: 27 additions & 0 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusFactoryImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.vertx.core.eventbus.impl;

import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.spi.EventBusFactory;
import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus;
import io.vertx.core.eventbus.impl.local.LocalEventBus;
import io.vertx.core.impl.HAManager;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.spi.cluster.ClusterManager;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class EventBusFactoryImpl implements EventBusFactory {

@Override
public EventBus createEventBus(VertxInternal vertx, VertxOptions options, ClusterManager clusterManager, HAManager haManager) {
EventBus eb;
if (options.isClustered()) {
eb = new ClusteredEventBus(vertx, options, clusterManager, haManager);
} else {
eb = new LocalEventBus(vertx);
}
return eb;
}
}
Loading

0 comments on commit 58eebfc

Please sign in to comment.