Permalink
Browse files

changed to support a range of ports

this is particularly useful for testing clusters on a single machine
  • Loading branch information...
1 parent 4377462 commit f509f5a3baf42758dcbdb208d04db4b9be919163 @mschoch mschoch committed Dec 4, 2012
@@ -16,6 +16,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.client.Client;
@@ -26,10 +27,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.transport.PortsRange;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestController;
-import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.couchbase.CouchbaseCAPITransport;
import com.couchbase.capi.CAPIBehavior;
@@ -56,6 +57,8 @@
private final String username;
private final String password;
+ private final Boolean resolveConflicts;
+
private BoundTransportAddress boundAddress;
private String defaultDocumentType;
@@ -69,14 +72,15 @@ public CouchbaseCAPITransportImpl(Settings settings, RestController restControll
this.indicesService = indicesService;
this.metaDataMappingService = metaDataMappingService;
this.client = client;
- this.port = componentSettings.get("port", settings.get("couchbase.port", "9091"));
+ this.port = componentSettings.get("port", settings.get("http.port", "9091-10091"));
this.bindHost = componentSettings.get("bind_host");
this.publishHost = componentSettings.get("publish_host");
this.username = settings.get("couchbase.username", "Administrator");
this.password = settings.get("couchbase.password", "");
this.defaultDocumentType = settings.get("couchbase.defaultDocumentType", DEFAULT_DOCUMENT_TYPE_DOCUMENT);
this.checkpointDocumentType = settings.get("couchbase.checkpointDocumentType", DEFAULT_DOCUMENT_TYPE_CHECKPOINT);
this.dynamicTypePath = settings.get("couchbase.dynamicTypePath");
+ this.resolveConflicts = settings.getAsBoolean("couchbase.resolveConflicts", true);
}
@Override
@@ -92,32 +96,51 @@ protected void doStart() throws ElasticSearchException {
final InetAddress hostAddress = hostAddressX;
- InetSocketAddress bindAddress = new InetSocketAddress(hostAddress, Integer.parseInt(port));
-
- InetSocketAddress publishAddress = null;
- String publishAddressString = null;
+ InetAddress publishAddressHostX;
try {
- InetAddress publishAddressHost = networkService.resolvePublishHostAddress(publishHost);
- publishAddress = new InetSocketAddress(publishAddressHost, bindAddress.getPort());
- publishAddressString = publishAddressHost.toString();
- } catch (Exception e) {
- throw new BindTransportException("Failed to resolve publish address", e);
+ publishAddressHostX = networkService.resolvePublishHostAddress(publishHost);
+ } catch (IOException e) {
+ throw new BindHttpException("FAiled to resolve publish address host [" + publishHost + "]", e);
}
+ final InetAddress publishAddressHost = publishAddressHostX;
+
- capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath);
+ capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath, resolveConflicts.booleanValue());
couchbaseBehavior = new ElasticSearchCouchbaseBehavior(client);
- server = new CAPIServer(capiBehavior, couchbaseBehavior, bindAddress, this.username, this.password);
- if(publishAddressString != null) {
- server.setPublishAddress(publishAddressString);
- }
- try {
- server.start();
- } catch (Exception e) {
- throw new ElasticSearchException("Error starting jetty", e);
+ PortsRange portsRange = new PortsRange(port);
+ final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
+ boolean success = portsRange.iterate(new PortsRange.PortCallback() {
+ @Override
+ public boolean onPortNumber(int portNumber) {
+ try {
+
+ server = new CAPIServer(capiBehavior, couchbaseBehavior,
+ new InetSocketAddress(hostAddress, portNumber),
+ CouchbaseCAPITransportImpl.this.username,
+ CouchbaseCAPITransportImpl.this.password);
+
+
+ if (publishAddressHost != null) {
+ server.setPublishAddress(publishAddressHost);
+ }
+
+ server.start();
+ } catch (Exception e) {
+ lastException.set(e);
+ return false;
+ }
+ return true;
+ }
+ });
+ if (!success) {
+ throw new BindHttpException("Failed to bind to [" + port + "]",
+ lastException.get());
}
- this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(bindAddress), new InetSocketTransportAddress(publishAddress));
+ InetSocketAddress boundAddress = server.getBindAddress();
+ InetSocketAddress publishAddress = new InetSocketAddress(publishAddressHost, boundAddress.getPort());
+ this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
}
@Override

0 comments on commit f509f5a

Please sign in to comment.