Skip to content

Commit

Permalink
Cleanup of the transport request/response messages
Browse files Browse the repository at this point in the history
Now both TransportRequest and TransportResponse inherit from a base TransportMessage that holds the message headers and also now added the remote transport address (where this message came from).
  • Loading branch information
uboness committed Jul 11, 2014
1 parent c4c0270 commit 25a21c6
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 117 deletions.
Expand Up @@ -39,6 +39,11 @@ public short uniqueAddressTypeId() {
return 0;
}

@Override
public boolean sameHost(TransportAddress other) {
return other == INSTANCE;
}

@Override
public void readFrom(StreamInput in) throws IOException {
}
Expand Down
Expand Up @@ -67,6 +67,12 @@ public short uniqueAddressTypeId() {
return 1;
}

@Override
public boolean sameHost(TransportAddress other) {
return other instanceof InetSocketTransportAddress &&
address.getAddress().equals(((InetSocketTransportAddress) other).address.getAddress());
}

public InetSocketAddress address() {
return this.address;
}
Expand Down
Expand Up @@ -47,6 +47,11 @@ public short uniqueAddressTypeId() {
return 2;
}

@Override
public boolean sameHost(TransportAddress other) {
return other instanceof LocalTransportAddress && id.equals(((LocalTransportAddress) other).id);
}

@Override
public void readFrom(StreamInput in) throws IOException {
id = in.readString();
Expand Down
Expand Up @@ -29,4 +29,6 @@
public interface TransportAddress extends Streamable, Serializable {

short uniqueAddressTypeId();

boolean sameHost(TransportAddress other);
}
99 changes: 99 additions & 0 deletions src/main/java/org/elasticsearch/transport/TransportMessage.java
@@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import com.google.common.collect.Maps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.transport.TransportAddress;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
*
*/
public abstract class TransportMessage<TM extends TransportMessage<TM>> implements Streamable {

private Map<String, Object> headers;

private TransportAddress remoteAddress;

protected TransportMessage() {
}

protected TransportMessage(TM message) {
// create a new copy of the headers, since we are creating a new request which might have
// its headers changed in the context of that specific request
if (message.getHeaders() != null) {
this.headers = new HashMap<>(message.getHeaders());
}
}


public void remoteAddress(TransportAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}

public TransportAddress remoteAddress() {
return remoteAddress;
}

@SuppressWarnings("unchecked")
public final TM putHeader(String key, Object value) {
if (headers == null) {
headers = Maps.newHashMap();
}
headers.put(key, value);
return (TM) this;
}

@SuppressWarnings("unchecked")
public final <V> V getHeader(String key) {
if (headers == null) {
return null;
}
return (V) headers.get(key);
}

public Map<String, Object> getHeaders() {
return this.headers;
}


@Override
public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
headers = in.readMap();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (headers == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeMap(headers);
}
}
}
57 changes: 2 additions & 55 deletions src/main/java/org/elasticsearch/transport/TransportRequest.java
Expand Up @@ -19,18 +19,9 @@

package org.elasticsearch.transport;

import com.google.common.collect.Maps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
*/
public abstract class TransportRequest implements Streamable {
public abstract class TransportRequest extends TransportMessage<TransportRequest> {

public static class Empty extends TransportRequest {

Expand All @@ -45,55 +36,11 @@ public Empty(TransportRequest request) {
}
}

private Map<String, Object> headers;

protected TransportRequest() {

}

protected TransportRequest(TransportRequest request) {
// create a new copy of the headers, since we are creating a new request which might have
// its headers changed in the context of that specific request
if (request.getHeaders() != null) {
this.headers = new HashMap<>(request.getHeaders());
}
}

@SuppressWarnings("unchecked")
public final TransportRequest putHeader(String key, Object value) {
if (headers == null) {
headers = Maps.newHashMap();
}
headers.put(key, value);
return this;
}

@SuppressWarnings("unchecked")
public final <V> V getHeader(String key) {
if (headers == null) {
return null;
}
return (V) headers.get(key);
super(request);
}

public Map<String, Object> getHeaders() {
return this.headers;
}

@Override
public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
headers = in.readMap();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (headers == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeMap(headers);
}
}
}
59 changes: 3 additions & 56 deletions src/main/java/org/elasticsearch/transport/TransportResponse.java
Expand Up @@ -19,18 +19,9 @@

package org.elasticsearch.transport;

import com.google.common.collect.Maps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
*/
public abstract class TransportResponse implements Streamable {
public abstract class TransportResponse extends TransportMessage<TransportResponse> {

public static class Empty extends TransportResponse {

Expand All @@ -45,55 +36,11 @@ public Empty(TransportResponse request) {
}
}

private Map<String, Object> headers;

protected TransportResponse() {

}

protected TransportResponse(TransportResponse request) {
// create a new copy of the headers, since we are creating a new request which might have
// its headers changed in the context of that specific request
if (request.getHeaders() != null) {
this.headers = new HashMap<>(request.getHeaders());
}
}

@SuppressWarnings("unchecked")
public final TransportResponse putHeader(String key, Object value) {
if (headers == null) {
headers = Maps.newHashMap();
}
headers.put(key, value);
return this;
}

@SuppressWarnings("unchecked")
public final <V> V getHeader(String key) {
if (headers == null) {
return null;
}
return (V) headers.get(key);
protected TransportResponse(TransportResponse response) {
super(response);
}

public Map<String, Object> getHeaders() {
return this.headers;
}

@Override
public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
headers = in.readMap();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (headers == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeMap(headers);
}
}
}
Expand Up @@ -217,7 +217,7 @@ protected void messageReceived(byte[] data, String action, LocalTransport source
if (TransportStatus.isError(status)) {
handlerResponseError(stream, handler);
} else {
handleResponse(stream, handler);
handleResponse(stream, sourceTransport, handler);
}
}
}
Expand All @@ -242,6 +242,7 @@ private void handleRequest(StreamInput stream, long requestId, LocalTransport so
throw new ActionNotFoundTransportException("Action [" + action + "] not found");
}
final TransportRequest request = handler.newInstance();
request.remoteAddress(sourceTransport.boundAddress.publishAddress());
request.readFrom(stream);
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
Expand Down Expand Up @@ -282,9 +283,9 @@ public boolean isForceExecution() {
}
}


protected void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {
protected void handleResponse(StreamInput buffer, LocalTransport sourceTransport, final TransportResponseHandler handler) {
final TransportResponse response = handler.newInstance();
response.remoteAddress(sourceTransport.boundAddress.publishAddress());
try {
response.readFrom(buffer);
} catch (Throwable e) {
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
Expand All @@ -36,6 +37,7 @@
import org.jboss.netty.channel.*;

import java.io.IOException;
import java.net.InetSocketAddress;

/**
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
Expand Down Expand Up @@ -122,7 +124,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
if (TransportStatus.isError(status)) {
handlerResponseError(wrappedStream, handler);
} else {
handleResponse(wrappedStream, handler);
handleResponse(ctx.getChannel(), wrappedStream, handler);
}
} else {
// if its null, skip those bytes
Expand All @@ -140,8 +142,10 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
wrappedStream.close();
}

private void handleResponse(StreamInput buffer, final TransportResponseHandler handler) {
private void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
final TransportResponse response = handler.newInstance();
response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
response.remoteAddress();
try {
response.readFrom(buffer);
} catch (Throwable e) {
Expand Down Expand Up @@ -206,6 +210,7 @@ private String handleRequest(Channel channel, StreamInput buffer, long requestId
throw new ActionNotFoundTransportException(action);
}
final TransportRequest request = handler.newInstance();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
Expand Down

0 comments on commit 25a21c6

Please sign in to comment.