Skip to content

Commit

Permalink
add profile name to TransportChannel
Browse files Browse the repository at this point in the history
Today, only the NettyTransportChannel implements the getProfileName method
and the other channel implementations do not. The profile name is useful for some
plugins to perform custom actions based on the name. Rather than checking the
type of the channel, it makes sense to always expose the profile name.

For DirectResponseChannels we use a name that cannot be used in the settings
to define another profile with that name. For LocalTransportChannel we use the
same name as the default profile.

Closes elastic#10483
  • Loading branch information
jaymode committed May 21, 2015
1 parent 6b3918a commit 8060cd0
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 9 deletions.
Expand Up @@ -28,6 +28,8 @@ public interface TransportChannel {

String action();

String getProfileName();

void sendResponse(TransportResponse response) throws IOException;

void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException;
Expand Down
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.transport;

import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
Expand Down Expand Up @@ -56,6 +55,8 @@
*/
public class TransportService extends AbstractLifecycleComponent<TransportService> {

public static final String DIRECT_RESPONSE_PROFILE = ".direct";

private final AtomicBoolean started = new AtomicBoolean(false);
protected final Transport transport;
protected final ThreadPool threadPool;
Expand Down Expand Up @@ -722,6 +723,11 @@ public String action() {
return action;
}

@Override
public String getProfileName() {
return DIRECT_RESPONSE_PROFILE;
}

@Override
public void sendResponse(TransportResponse response) throws IOException {
sendResponse(response, TransportResponseOptions.EMPTY);
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStatus;

Expand All @@ -34,6 +33,8 @@
*/
public class LocalTransportChannel implements TransportChannel {

private static final String LOCAL_TRANSPORT_PROFILE = "default";

private final LocalTransport sourceTransport;
private final TransportServiceAdapter sourceTransportServiceAdapter;
// the transport we will *send to*
Expand All @@ -56,6 +57,11 @@ public String action() {
return action;
}

@Override
public String getProfileName() {
return LOCAL_TRANSPORT_PROFILE;
}

@Override
public void sendResponse(TransportResponse response) throws IOException {
sendResponse(response, TransportResponseOptions.EMPTY);
Expand Down
Expand Up @@ -252,22 +252,23 @@ protected void doStart() {
Settings fallbackSettings = createFallbackSettings();
Settings defaultSettings = profiles.get(DEFAULT_PROFILE);

// loop through all profiles and strart them app, special handling for default one
// loop through all profiles and start them up, special handling for default one
for (Map.Entry<String, Settings> entry : profiles.entrySet()) {
Settings profileSettings = entry.getValue();
String name = entry.getKey();

if (DEFAULT_PROFILE.equals(name)) {
if (!Strings.hasLength(name)) {
logger.info("transport profile configured without a name. skipping profile with settings [{}]", profileSettings.toDelimitedString(','));
continue;
} else if (DEFAULT_PROFILE.equals(name)) {
profileSettings = settingsBuilder()
.put(profileSettings)
.put("port", profileSettings.get("port", this.settings.get("transport.tcp.port", DEFAULT_PORT_RANGE)))
.build();
} else {
} else if (profileSettings.get("port") == null) {
// if profile does not have a port, skip it
if (profileSettings.get("port") == null) {
logger.info("No port configured for profile [{}], not binding", name);
continue;
}
logger.info("No port configured for profile [{}], not binding", name);
continue;
}

// merge fallback settings with default settings with profile settings so we have complete settings with default values
Expand Down
Expand Up @@ -61,6 +61,7 @@ public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter t
this.profileName = profileName;
}

@Override
public String getProfileName() {
return profileName;
}
Expand Down
Expand Up @@ -838,6 +838,11 @@ public String action() {
return null;
}

@Override
public String getProfileName() {
return "";
}

@Override
public void sendResponse(TransportResponse response) throws IOException {
}
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.test.cache.recycler.MockBigArrays;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.TransportService;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -155,6 +156,28 @@ public void testThatBindingOnDifferentHostsWorks() throws Exception {
}
}

@Test
public void testThatProfileWithoutValidNameIsIgnored() throws Exception {
int[] ports = getRandomPorts(3);

Settings settings = settingsBuilder()
.put("network.host", "127.0.0.1")
.put("transport.tcp.port", ports[0])
// mimics someone trying to define a profile for .local which is the profile for a node request to itself
.put("transport.profiles." + TransportService.DIRECT_RESPONSE_PROFILE + ".port", ports[1])
.put("transport.profiles..port", ports[2])
.build();

ThreadPool threadPool = new ThreadPool("tst");
try (NettyTransport ignored = startNettyTransport(settings, threadPool)) {
assertPortIsBound(ports[0]);
assertConnectionRefused(ports[1]);
assertConnectionRefused(ports[2]);
} finally {
terminate(threadPool);
}
}

private int[] getRandomPorts(int numberOfPorts) {
IntHashSet ports = new IntHashSet();

Expand Down

0 comments on commit 8060cd0

Please sign in to comment.