Skip to content

Commit

Permalink
Add Netty messaging service implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 26, 2017
1 parent c267991 commit 3160133
Show file tree
Hide file tree
Showing 7 changed files with 1,337 additions and 0 deletions.
25 changes: 25 additions & 0 deletions messaging/netty/pom.xml
Expand Up @@ -32,5 +32,30 @@
<artifactId>atomix-messaging</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix-utils</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,31 @@
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.messaging.netty;

/**
* State transitions a decoder goes through as it is decoding an incoming message.
*/
public enum DecoderState {
READ_MESSAGE_PREAMBLE,
READ_MESSAGE_ID,
READ_SENDER_IP,
READ_SENDER_PORT,
READ_MESSAGE_TYPE_LENGTH,
READ_MESSAGE_TYPE,
READ_MESSAGE_STATUS,
READ_CONTENT_LENGTH,
READ_CONTENT
}
@@ -0,0 +1,172 @@
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.messaging.netty;

import io.atomix.messaging.Endpoint;
import io.atomix.utils.ArraySizeHashPrinter;

import static com.google.common.base.MoreObjects.toStringHelper;

/**
* Internal message representation with additional attributes
* for supporting, synchronous request/reply behavior.
*/
public final class InternalMessage {

/**
* Message status.
*/
public enum Status {

// NOTE: For backwards compatibility enum constant IDs should not be changed.

/**
* All ok.
*/
OK(0),

/**
* Response status signifying no registered handler.
*/
ERROR_NO_HANDLER(1),

/**
* Response status signifying an exception handling the message.
*/
ERROR_HANDLER_EXCEPTION(2),

/**
* Response status signifying invalid message structure.
*/
PROTOCOL_EXCEPTION(3);

private final int id;

Status(int id) {
this.id = id;
}

/**
* Returns the unique status ID.
*
* @return the unique status ID.
*/
public int id() {
return id;
}

/**
* Returns the status enum associated with the given ID.
*
* @param id the status ID.
* @return the status enum for the given ID.
*/
public static Status forId(int id) {
switch (id) {
case 0:
return OK;
case 1:
return ERROR_NO_HANDLER;
case 2:
return ERROR_HANDLER_EXCEPTION;
case 3:
return PROTOCOL_EXCEPTION;
default:
throw new IllegalArgumentException("Unknown status ID " + id);
}
}
}

private final int preamble;
private final long id;
private final Endpoint sender;
private final String type;
private final byte[] payload;
private final Status status;

public InternalMessage(int preamble,
long id,
Endpoint sender,
String type,
byte[] payload) {
this(preamble, id, sender, type, payload, null);
}

public InternalMessage(int preamble,
long id,
Endpoint sender,
byte[] payload,
Status status) {
this(preamble, id, sender, "", payload, status);
}

InternalMessage(int preamble,
long id,
Endpoint sender,
String type,
byte[] payload,
Status status) {
this.preamble = preamble;
this.id = id;
this.sender = sender;
this.type = type;
this.payload = payload;
this.status = status;
}

public boolean isRequest() {
return status == null;
}

public boolean isReply() {
return status != null;
}

public int preamble() {
return preamble;
}

public long id() {
return id;
}

public String type() {
return type;
}

public Endpoint sender() {
return sender;
}

public byte[] payload() {
return payload;
}

public Status status() {
return status;
}

@Override
public String toString() {
return toStringHelper(this)
.add("id", id)
.add("type", type)
.add("sender", sender)
.add("status", status)
.add("payload", ArraySizeHashPrinter.of(payload))
.toString();
}
}
@@ -0,0 +1,120 @@
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.messaging.netty;

import com.google.common.base.Charsets;
import io.atomix.logging.Logger;
import io.atomix.logging.LoggerFactory;
import io.atomix.messaging.Endpoint;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.net.InetAddress;
import java.util.List;

import static com.google.common.base.Preconditions.checkState;

/**
* Decoder for inbound messages.
*/
public class MessageDecoder extends ReplayingDecoder<DecoderState> {

private final Logger log = LoggerFactory.getLogger(getClass());

private long messageId;
private int preamble;
private InetAddress senderIp;
private int senderPort;
private int messageTypeLength;
private String messageType;
private InternalMessage.Status status;
private int contentLength;

public MessageDecoder() {
super(DecoderState.READ_MESSAGE_PREAMBLE);
}

@Override
@SuppressWarnings("squid:S128") // suppress switch fall through warning
protected void decode(
ChannelHandlerContext context,
ByteBuf buffer,
List<Object> out) throws Exception {

switch (state()) {
case READ_MESSAGE_PREAMBLE:
preamble = buffer.readInt();
checkpoint(DecoderState.READ_MESSAGE_ID);
case READ_MESSAGE_ID:
messageId = buffer.readLong();
checkpoint(DecoderState.READ_SENDER_IP);
case READ_SENDER_IP:
byte[] octets = new byte[buffer.readByte()];
buffer.readBytes(octets);
senderIp = InetAddress.getByAddress(octets);
checkpoint(DecoderState.READ_SENDER_PORT);
case READ_SENDER_PORT:
senderPort = buffer.readInt();
checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
case READ_MESSAGE_TYPE_LENGTH:
messageTypeLength = buffer.readShort();
checkpoint(DecoderState.READ_MESSAGE_TYPE);
case READ_MESSAGE_TYPE:
byte[] messageTypeBytes = new byte[messageTypeLength];
buffer.readBytes(messageTypeBytes);
messageType = new String(messageTypeBytes, Charsets.UTF_8);
checkpoint(DecoderState.READ_MESSAGE_STATUS);
case READ_MESSAGE_STATUS:
int statusId = buffer.readByte();
if (statusId == -1) {
status = null;
} else {
status = InternalMessage.Status.forId(statusId);
}
checkpoint(DecoderState.READ_CONTENT_LENGTH);
case READ_CONTENT_LENGTH:
contentLength = buffer.readInt();
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
byte[] payload;
if (contentLength > 0) {
//TODO Perform a sanity check on the size before allocating
payload = new byte[contentLength];
buffer.readBytes(payload);
} else {
payload = new byte[0];
}
InternalMessage message = new InternalMessage(preamble,
messageId,
new Endpoint(senderIp, senderPort),
messageType,
payload,
status);
out.add(message);
checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
break;
default:
checkState(false, "Must not be here");
}
}

@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
}

0 comments on commit 3160133

Please sign in to comment.