Skip to content

Commit

Permalink
Transport: Refactor guice startup
Browse files Browse the repository at this point in the history
* Removed & refactored unused module code
* Allowed to set transports programmatically
* Allow to set the source of the changed transport

Note: The current implementation breaks BWC as you need to specify a concrete
transport now instead of a module if you want to use a different
Transport or HttpServerTransport

Closes #7289
  • Loading branch information
spinscale committed Aug 19, 2014
1 parent 1efb685 commit 10af60b
Show file tree
Hide file tree
Showing 11 changed files with 303 additions and 358 deletions.
39 changes: 28 additions & 11 deletions src/main/java/org/elasticsearch/http/HttpServerModule.java
Expand Up @@ -19,33 +19,50 @@

package org.elasticsearch.http;

import com.google.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.Modules;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.netty.NettyHttpServerTransportModule;
import org.elasticsearch.http.netty.NettyHttpServerTransport;
import org.elasticsearch.plugins.Plugin;

import static org.elasticsearch.common.Preconditions.checkNotNull;

/**
*
*/
public class HttpServerModule extends AbstractModule implements SpawnModules {
public class HttpServerModule extends AbstractModule {

private final Settings settings;
private final ESLogger logger;

private Class<? extends HttpServerTransport> configuredHttpServerTransport;
private String configuredHttpServerTransportSource;

public HttpServerModule(Settings settings) {
this.settings = settings;
}

@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(Modules.createModule(settings.getAsClass("http.type", NettyHttpServerTransportModule.class, "org.elasticsearch.http.", "HttpServerTransportModule"), settings));
this.logger = Loggers.getLogger(getClass(), settings);
}

@SuppressWarnings({"unchecked"})
@Override
protected void configure() {
if (configuredHttpServerTransport != null) {
logger.info("Using [{}] as http transport, overridden by [{}]", configuredHttpServerTransport.getName(), configuredHttpServerTransportSource);
bind(HttpServerTransport.class).to(configuredHttpServerTransport).asEagerSingleton();
} else {
Class<? extends HttpServerTransport> defaultHttpServerTransport = NettyHttpServerTransport.class;
Class<? extends HttpServerTransport> httpServerTransport = settings.getAsClass("http.type", defaultHttpServerTransport, "org.elasticsearch.http.", "HttpServerTransport");
bind(HttpServerTransport.class).to(httpServerTransport).asEagerSingleton();
}

bind(HttpServer.class).asEagerSingleton();
}

public void setHttpServerTransport(Class<? extends HttpServerTransport> httpServerTransport, String source) {
checkNotNull(httpServerTransport, "Configured http server transport may not be null");
checkNotNull(source, "Plugin, that changes transport may not be null");
this.configuredHttpServerTransport = httpServerTransport;
this.configuredHttpServerTransportSource = source;
}
}

This file was deleted.

70 changes: 48 additions & 22 deletions src/main/java/org/elasticsearch/transport/TransportModule.java
Expand Up @@ -19,48 +19,74 @@

package org.elasticsearch.transport;

import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.Modules;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.local.LocalTransportModule;
import org.elasticsearch.transport.netty.NettyTransportModule;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.local.LocalTransport;
import org.elasticsearch.transport.netty.NettyTransport;

import static org.elasticsearch.common.Preconditions.checkNotNull;

/**
*
*/
public class TransportModule extends AbstractModule implements SpawnModules {
public class TransportModule extends AbstractModule {

private final Settings settings;

public static final String TRANSPORT_TYPE_KEY = "transport.type";
public static final String TRANSPORT_SERVICE_TYPE_KEY = "transport.service.type";

private final ESLogger logger;
private final Settings settings;

private Class<? extends TransportService> configuredTransportService;
private Class<? extends Transport> configuredTransport;
private String configuredTransportServiceSource;
private String configuredTransportSource;

public TransportModule(Settings settings) {
this.settings = settings;
this.logger = Loggers.getLogger(getClass(), settings);
}

@Override
public Iterable<? extends Module> spawnModules() {
Class<? extends Module> defaultTransportModule;
if (DiscoveryNode.localNode(settings)) {
defaultTransportModule = LocalTransportModule.class;
protected void configure() {
if (configuredTransportService != null) {
logger.info("Using [{}] as transport service, overridden by [{}]", configuredTransportService.getName(), configuredTransportServiceSource);
bind(TransportService.class).to(configuredTransportService).asEagerSingleton();
} else {
defaultTransportModule = NettyTransportModule.class;
Class<? extends TransportService> defaultTransportService = TransportService.class;
Class<? extends TransportService> transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, defaultTransportService, "org.elasticsearch.transport.", "TransportService");
if (!TransportService.class.equals(transportService)) {
bind(TransportService.class).to(transportService).asEagerSingleton();
} else {
bind(TransportService.class).asEagerSingleton();
}
}
return ImmutableList.of(Modules.createModule(settings.getAsClass(TRANSPORT_TYPE_KEY, defaultTransportModule, "org.elasticsearch.transport.", "TransportModule"), settings));
}

@Override
protected void configure() {
Class<? extends TransportService> transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, TransportService.class, "org.elasticsearch.transport.", "TransportService");
if (!TransportService.class.equals(transportService)) {
bind(TransportService.class).to(transportService).asEagerSingleton();
if (configuredTransport != null) {
logger.info("Using [{}] as transport, overridden by [{}]", configuredTransport.getName(), configuredTransportSource);
bind(Transport.class).to(configuredTransport).asEagerSingleton();
} else {
bind(TransportService.class).asEagerSingleton();
Class<? extends Transport> defaultTransport = DiscoveryNode.localNode(settings) ? LocalTransport.class : NettyTransport.class;
Class<? extends Transport> transport = settings.getAsClass(TRANSPORT_TYPE_KEY, defaultTransport, "org.elasticsearch.transport.", "Transport");
bind(Transport.class).to(transport).asEagerSingleton();
}
}

public void setTransportService(Class<? extends TransportService> transportService, String source) {
checkNotNull(transportService, "Configured transport service may not be null");
checkNotNull(source, "Plugin, that changes transport service may not be null");
this.configuredTransportService = transportService;
this.configuredTransportServiceSource = source;
}

public void setTransport(Class<? extends Transport> transport, String source) {
checkNotNull(transport, "Configured transport may not be null");
checkNotNull(source, "Plugin, that changes transport may not be null");
this.configuredTransport = transport;
this.configuredTransportSource = source;
}
}

This file was deleted.

This file was deleted.

@@ -0,0 +1,100 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugins;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.*;

/**
*
*/
@ClusterScope(scope = Scope.SUITE, numDataNodes = 2)
public class PluggableTransportModuleTests extends ElasticsearchIntegrationTest {

public static final AtomicInteger SENT_REQUEST_COUNTER = new AtomicInteger(0);

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return settingsBuilder()
.put("plugin.types", CountingSentRequestsPlugin.class.getName())
.put(super.nodeSettings(nodeOrdinal))
.build();
}

@Test
public void testThatPluginFunctionalityIsLoadedWithoutConfiguration() throws Exception {
for (Transport transport : internalCluster().getInstances(Transport.class)) {
assertThat(transport, instanceOf(CountingAssertingLocalTransport.class));
}

// the cluster node communication on start up is sufficient to increase the counter
// no need to do anything specific
int count = SENT_REQUEST_COUNTER.get();
assertThat("Expected send request counter to be greather than zero", count, is(greaterThan(0)));

// sending a new request via client node will increase the sent requests
internalCluster().clientNodeClient().admin().cluster().prepareHealth().get();
assertThat("Expected send request counter to be greather than zero", SENT_REQUEST_COUNTER.get(), is(greaterThan(count)));
}

public static class CountingSentRequestsPlugin extends AbstractPlugin {
@Override
public String name() {
return "counting-pipelines-plugin";
}

@Override
public String description() {
return "counting-pipelines-plugin";
}

public void onModule(TransportModule transportModule) {
transportModule.setTransport(CountingAssertingLocalTransport.class, this.name());
}
}

public static final class CountingAssertingLocalTransport extends AssertingLocalTransport {

@Inject
public CountingAssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version) {
super(settings, threadPool, version);
}

@Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
SENT_REQUEST_COUNTER.incrementAndGet();
super.sendRequest(node, requestId, action, request, options);
}
}
}

0 comments on commit 10af60b

Please sign in to comment.