Skip to content

Commit

Permalink
changed to support a range of ports
Browse files Browse the repository at this point in the history
this is particularly useful for testing clusters on a single machine
  • Loading branch information
mschoch committed Dec 4, 2012
1 parent 4377462 commit f509f5a
Showing 1 changed file with 44 additions and 21 deletions.
Expand Up @@ -16,6 +16,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicReference;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.client.Client;
Expand All @@ -26,10 +27,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.PortsRange;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.couchbase.CouchbaseCAPITransport;

import com.couchbase.capi.CAPIBehavior;
Expand All @@ -56,6 +57,8 @@ public class CouchbaseCAPITransportImpl extends AbstractLifecycleComponent<Couch
private final String username;
private final String password;

private final Boolean resolveConflicts;

private BoundTransportAddress boundAddress;

private String defaultDocumentType;
Expand All @@ -69,14 +72,15 @@ public CouchbaseCAPITransportImpl(Settings settings, RestController restControll
this.indicesService = indicesService;
this.metaDataMappingService = metaDataMappingService;
this.client = client;
this.port = componentSettings.get("port", settings.get("couchbase.port", "9091"));
this.port = componentSettings.get("port", settings.get("http.port", "9091-10091"));
this.bindHost = componentSettings.get("bind_host");
this.publishHost = componentSettings.get("publish_host");
this.username = settings.get("couchbase.username", "Administrator");
this.password = settings.get("couchbase.password", "");
this.defaultDocumentType = settings.get("couchbase.defaultDocumentType", DEFAULT_DOCUMENT_TYPE_DOCUMENT);
this.checkpointDocumentType = settings.get("couchbase.checkpointDocumentType", DEFAULT_DOCUMENT_TYPE_CHECKPOINT);
this.dynamicTypePath = settings.get("couchbase.dynamicTypePath");
this.resolveConflicts = settings.getAsBoolean("couchbase.resolveConflicts", true);
}

@Override
Expand All @@ -92,32 +96,51 @@ protected void doStart() throws ElasticSearchException {
final InetAddress hostAddress = hostAddressX;


InetSocketAddress bindAddress = new InetSocketAddress(hostAddress, Integer.parseInt(port));

InetSocketAddress publishAddress = null;
String publishAddressString = null;
InetAddress publishAddressHostX;
try {
InetAddress publishAddressHost = networkService.resolvePublishHostAddress(publishHost);
publishAddress = new InetSocketAddress(publishAddressHost, bindAddress.getPort());
publishAddressString = publishAddressHost.toString();
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
publishAddressHostX = networkService.resolvePublishHostAddress(publishHost);
} catch (IOException e) {
throw new BindHttpException("FAiled to resolve publish address host [" + publishHost + "]", e);
}
final InetAddress publishAddressHost = publishAddressHostX;


capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath);
capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath, resolveConflicts.booleanValue());
couchbaseBehavior = new ElasticSearchCouchbaseBehavior(client);
server = new CAPIServer(capiBehavior, couchbaseBehavior, bindAddress, this.username, this.password);
if(publishAddressString != null) {
server.setPublishAddress(publishAddressString);
}

try {
server.start();
} catch (Exception e) {
throw new ElasticSearchException("Error starting jetty", e);
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
boolean success = portsRange.iterate(new PortsRange.PortCallback() {
@Override
public boolean onPortNumber(int portNumber) {
try {

server = new CAPIServer(capiBehavior, couchbaseBehavior,
new InetSocketAddress(hostAddress, portNumber),
CouchbaseCAPITransportImpl.this.username,
CouchbaseCAPITransportImpl.this.password);


if (publishAddressHost != null) {
server.setPublishAddress(publishAddressHost);
}

server.start();
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
}
});
if (!success) {
throw new BindHttpException("Failed to bind to [" + port + "]",
lastException.get());
}

this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(bindAddress), new InetSocketTransportAddress(publishAddress));
InetSocketAddress boundAddress = server.getBindAddress();
InetSocketAddress publishAddress = new InetSocketAddress(publishAddressHost, boundAddress.getPort());
this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
}

@Override
Expand Down

0 comments on commit f509f5a

Please sign in to comment.