Skip to content

Commit

Permalink
Add messaging service/interfaces.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 12, 2017
1 parent bbd973f commit 08e9022
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 27 deletions.
18 changes: 8 additions & 10 deletions core/src/main/java/io/atomix/messaging/Endpoint.java
Expand Up @@ -13,30 +13,28 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.onosproject.store.cluster.messaging; package io.atomix.messaging;


import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.MoreObjects;


import java.net.InetAddress;
import java.util.Objects; import java.util.Objects;


import org.onlab.packet.IpAddress; import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.MoreObjects;


/** /**
* Representation of a TCP/UDP communication end point. * Representation of a TCP/UDP communication end point.
*/ */
public final class Endpoint { public final class Endpoint {

private final int port; private final int port;
private final IpAddress ip; private final InetAddress ip;


public Endpoint(IpAddress host, int port) { public Endpoint(InetAddress host, int port) {
this.ip = checkNotNull(host); this.ip = checkNotNull(host);
this.port = port; this.port = port;
} }


public IpAddress host() { public InetAddress host() {
return ip; return ip;
} }


Expand Down Expand Up @@ -70,6 +68,6 @@ public boolean equals(Object obj) {
} }
Endpoint that = (Endpoint) obj; Endpoint that = (Endpoint) obj;
return this.port == that.port && return this.port == that.port &&
Objects.equals(this.ip, that.ip); Objects.equals(this.ip, that.ip);
} }
} }
6 changes: 3 additions & 3 deletions core/src/main/java/io/atomix/messaging/MessageSubject.java
Expand Up @@ -13,12 +13,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.onosproject.store.cluster.messaging; package io.atomix.messaging;

import static com.google.common.base.Preconditions.checkNotNull;


import java.util.Objects; import java.util.Objects;


import static com.google.common.base.Preconditions.checkNotNull;

/** /**
* Representation of a message subject. * Representation of a message subject.
* Cluster messages have associated subjects that dictate how they get handled * Cluster messages have associated subjects that dictate how they get handled
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.onosproject.store.cluster.messaging; package io.atomix.messaging;


import java.io.IOException; import java.io.IOException;


Expand Down
33 changes: 20 additions & 13 deletions core/src/main/java/io/atomix/messaging/MessagingService.java
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.onosproject.store.cluster.messaging; package io.atomix.messaging;


import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
Expand All @@ -28,57 +28,64 @@ public interface MessagingService {
/** /**
* Sends a message asynchronously to the specified communication end point. * Sends a message asynchronously to the specified communication end point.
* The message is specified using the type and payload. * The message is specified using the type and payload.
* @param ep end point to send the message to. *
* @param type type of message. * @param ep end point to send the message to.
* @param type type of message.
* @param payload message payload bytes. * @param payload message payload bytes.
* @return future that is completed when the message is sent * @return future that is completed when the message is sent
*/ */
CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload); CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload);


/** /**
* Sends a message asynchronously and expects a response. * Sends a message asynchronously and expects a response.
* @param ep end point to send the message to. *
* @param type type of message. * @param ep end point to send the message to.
* @param type type of message.
* @param payload message payload. * @param payload message payload.
* @return a response future * @return a response future
*/ */
CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload); CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);


/** /**
* Sends a message synchronously and expects a response. * Sends a message synchronously and expects a response.
* @param ep end point to send the message to. *
* @param type type of message. * @param ep end point to send the message to.
* @param payload message payload. * @param type type of message.
* @param payload message payload.
* @param executor executor over which any follow up actions after completion will be executed. * @param executor executor over which any follow up actions after completion will be executed.
* @return a response future * @return a response future
*/ */
CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor); CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor);


/** /**
* Registers a new message handler for message type. * Registers a new message handler for message type.
* @param type message type. *
* @param handler message handler * @param type message type.
* @param handler message handler
* @param executor executor to use for running message handler logic. * @param executor executor to use for running message handler logic.
*/ */
void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor); void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor);


/** /**
* Registers a new message handler for message type. * Registers a new message handler for message type.
* @param type message type. *
* @param handler message handler * @param type message type.
* @param handler message handler
* @param executor executor to use for running message handler logic. * @param executor executor to use for running message handler logic.
*/ */
void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor); void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor);


/** /**
* Registers a new message handler for message type. * Registers a new message handler for message type.
* @param type message type. *
* @param type message type.
* @param handler message handler * @param handler message handler
*/ */
void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler); void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler);


/** /**
* Unregister current handler, if one exists for message type. * Unregister current handler, if one exists for message type.
*
* @param type message type * @param type message type
*/ */
void unregisterHandler(String type); void unregisterHandler(String type);
Expand Down

0 comments on commit 08e9022

Please sign in to comment.