From 4f70aa1172f0900c4a77f9c5a03d3132900c0440 Mon Sep 17 00:00:00 2001 From: Martin Harris Date: Fri, 20 Jun 2014 10:34:39 +0100 Subject: [PATCH 1/5] Copied in @ZaidM's work from https://github.com/ZaidM/brooklyn/tree/couchbase-sync-gateway1 --- .../nosql/couchbase/CouchbaseCluster.java | 23 ++- .../nosql/couchbase/CouchbaseClusterImpl.java | 68 +++++++- .../entity/nosql/couchbase/CouchbaseNode.java | 6 + .../nosql/couchbase/CouchbaseNodeDriver.java | 2 + .../nosql/couchbase/CouchbaseNodeImpl.java | 4 + .../couchbase/CouchbaseNodeSshDriver.java | 12 ++ .../nosql/couchbase/CouchbaseSyncGateway.java | 61 ++++++++ .../couchbase/CouchbaseSyncGatewayDriver.java | 9 ++ .../couchbase/CouchbaseSyncGatewayImpl.java | 61 ++++++++ .../CouchbaseSyncGatewaySshDriver.java | 147 ++++++++++++++++++ 10 files changed, 390 insertions(+), 3 deletions(-) create mode 100644 software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java create mode 100644 software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java create mode 100644 software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java create mode 100644 software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java index b44f47a6e3..c478b07730 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseCluster.java @@ -19,10 +19,9 @@ package brooklyn.entity.nosql.couchbase; import java.util.List; +import java.util.Map; import java.util.Set; -import com.google.common.reflect.TypeToken; - import brooklyn.config.ConfigKey; import brooklyn.entity.Entity; import brooklyn.entity.basic.ConfigKeys; @@ -33,6 +32,8 @@ import brooklyn.util.flags.SetFromFlag; import brooklyn.util.time.Duration; +import com.google.common.reflect.TypeToken; + @ImplementedBy(CouchbaseClusterImpl.class) public interface CouchbaseCluster extends DynamicCluster { @@ -90,4 +91,22 @@ public interface CouchbaseCluster extends DynamicCluster { "Average across cluster for pools/nodes//interestingStats/couch_docs_actual_disk_size"); AttributeSensor COUCH_VIEWS_DATA_SIZE_PER_NODE = Sensors.newLongSensor("couchbase.stats.cluster.per.node.couch.views.data.size", "Average across cluster for pools/nodes//interestingStats/couch_views_data_size"); + + AttributeSensor BUCKET_CREATION_IN_PROGRESS = Sensors.newBooleanSensor("couchbase.cluster.bucketCreationInProgress", "Indicates that a bucket is currently being created, and" + + "further bucket creation should be deferred"); + + /** + * createBuckets is a list of all the buckets to be created on the couchbase cluster + * the buckets will be created on the primary node of the cluster + * each map entry for a bucket should contain the following parameters: + * - <"bucket",(String) name of the bucket (default: default)> + * - <"bucket-type",(String) name of bucket type (default: couchbase)> + * - <"bucket-port",(Integer) the bucket port to connect to (default: 11222)> + * - <"bucket-ramsize",(Integer) ram size allowed for bucket (default: 200)> + * - <"bucket-replica",(Integer) number of replicas for the bucket (default: 1)> + */ + @SuppressWarnings("serial") + @SetFromFlag("createBuckets") + ConfigKey>> CREATE_BUCKETS = ConfigKeys.newConfigKey(new TypeToken>>() {}, + "couchbase.cluster.createBuckets", "a list of all dedicated port buckets to be created on the couchbase cluster"); } diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java index 11c56d7513..b7c202630c 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,16 +44,24 @@ import brooklyn.event.AttributeSensor; import brooklyn.event.SensorEvent; import brooklyn.event.SensorEventListener; +import brooklyn.event.basic.DependentConfiguration; +import brooklyn.event.feed.http.HttpFeed; +import brooklyn.event.feed.http.HttpPollConfig; +import brooklyn.event.feed.http.HttpValueFunctions; import brooklyn.location.Location; import brooklyn.policy.PolicySpec; import brooklyn.util.collections.MutableSet; +import brooklyn.util.task.DynamicTasks; +import brooklyn.util.task.TaskBuilder; import brooklyn.util.task.Tasks; import brooklyn.util.text.ByteSizeStrings; import brooklyn.util.time.Time; import com.google.common.base.Function; +import com.google.common.base.Functions; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -59,6 +69,7 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements CouchbaseCluster { private static final Logger log = LoggerFactory.getLogger(CouchbaseClusterImpl.class); private final Object mutex = new Object[0]; + private final HttpFeed[] resetBucketCreation = new HttpFeed[]{null}; public void init() { log.info("Initializing the Couchbase cluster..."); @@ -146,7 +157,7 @@ public void start(Collection locations) { serversToAdd.remove(getPrimaryNode()); if (getUpNodes().size() >= getQuorumSize() && getUpNodes().size() > 1) { - log.info("number of SERVICE_UP nodes:{} in cluster:{} did reached Quorum:{}, adding the servers", new Object[]{getUpNodes().size(), getId(), getQuorumSize()}); + log.info("number of SERVICE_UP nodes:{} in cluster:{} reached Quorum:{}, adding the servers", new Object[]{getUpNodes().size(), getId(), getQuorumSize()}); addServers(serversToAdd); //wait for servers to be added to the couchbase server @@ -157,6 +168,10 @@ public void start(Collection locations) { Tasks.resetBlockingDetails(); } Entities.invokeEffector(this, getPrimaryNode(), CouchbaseNode.REBALANCE); + + if (Optional.fromNullable(CREATE_BUCKETS).isPresent()) { + createBuckets(); + } setAttribute(IS_CLUSTER_INITIALIZED, true); } else { @@ -333,6 +348,57 @@ public boolean isMemberInCluster(Entity e) { return Optional.fromNullable(e.getAttribute(CouchbaseNode.IS_IN_CLUSTER)).or(false); } + public void createBuckets() { + //FIXME: multiple buckets require synchronization/wait time (checks for port conflicts and exceeding ram size) + //TODO: check for multiple bucket conflicts with port + List> bucketsToCreate = getConfig(CREATE_BUCKETS); + Entity primaryNode = getPrimaryNode(); + + for (Map bucketMap : bucketsToCreate) { + String bucketName = bucketMap.containsKey("bucket") ? (String) bucketMap.get("bucket") : "default"; + String bucketType = bucketMap.containsKey("bucket-type") ? (String) bucketMap.get("bucket-type") : "couchbase"; + Integer bucketPort = bucketMap.containsKey("bucket-port") ? (Integer) bucketMap.get("bucket-port") : 11222; + Integer bucketRamSize = bucketMap.containsKey("bucket-ramsize") ? (Integer) bucketMap.get("bucket-ramsize") : 200; + Integer bucketReplica = bucketMap.containsKey("bucket-replica") ? (Integer) bucketMap.get("bucket-replica") : 1; + + log.info("adding bucket: {} to primary node: {}", bucketName, primaryNode.getId()); + createBucket(primaryNode, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); + //TODO: add if bucket has been created. + } + } + + public void createBucket(final Entity primaryNode, final String bucketName, final String bucketType, final Integer bucketPort, final Integer bucketRamSize, final Integer bucketReplica) { + DynamicTasks.queueIfPossible(TaskBuilder.builder().name("Creating bucket " + bucketName).body( + new Callable() { + @Override + public Void call() throws Exception { + DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); + if (CouchbaseClusterImpl.this.resetBucketCreation[0] != null) { + CouchbaseClusterImpl.this.resetBucketCreation[0].stop(); + } + setAttribute(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true); + + CouchbaseClusterImpl.this.resetBucketCreation[0] = HttpFeed.builder() + .entity(CouchbaseClusterImpl.this) + .period(500, TimeUnit.MILLISECONDS) + .baseUri(String.format("%s/pools/default/buckets/%s", primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL), bucketName)) + .credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD)) + .poll(new HttpPollConfig(BUCKET_CREATION_IN_PROGRESS) + .onSuccess(HttpValueFunctions.responseCodeEquals(404)) + .onFailureOrException(Functions.constant(false))) + .build(); + + Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); + DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); + if (CouchbaseClusterImpl.this.resetBucketCreation[0] != null) { + CouchbaseClusterImpl.this.resetBucketCreation[0].stop(); + } + return null; + } + } + ).build()).orSubmitAndBlock(); + } + static { RendererHints.register(COUCH_DOCS_DATA_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); RendererHints.register(COUCH_DOCS_ACTUAL_DISK_SIZE_PER_NODE, RendererHints.displayValue(ByteSizeStrings.metric())); diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java index a27045f9ca..dd46eba88c 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNode.java @@ -122,6 +122,7 @@ class RootUrl { MethodEffector SERVER_ADD = new MethodEffector(CouchbaseNode.class, "serverAdd"); MethodEffector SERVER_ADD_AND_REBALANCE = new MethodEffector(CouchbaseNode.class, "serverAddAndRebalance"); MethodEffector REBALANCE = new MethodEffector(CouchbaseNode.class, "rebalance"); + MethodEffector BUCKET_CREATE = new MethodEffector(CouchbaseNode.class, "bucketCreate"); @Effector(description = "add a server to a cluster") public void serverAdd(@EffectorParam(name = "serverHostname") String serverToAdd, @EffectorParam(name = "username") String username, @EffectorParam(name = "password") String password); @@ -131,5 +132,10 @@ class RootUrl { @Effector(description = "rebalance the couchbase cluster") public void rebalance(); + + @Effector(description = "create a new bucket") + public void bucketCreate(@EffectorParam(name = "bucketName") String bucketName, @EffectorParam(name = "bucketType") String bucketType, + @EffectorParam(name = "bucketPort") Integer bucketPort, @EffectorParam(name = "bucketRamSize") Integer bucketRamSize, + @EffectorParam(name = "bucketReplica") Integer bucketReplica); } diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java index 9fc5877579..b77f244e41 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeDriver.java @@ -26,6 +26,8 @@ public interface CouchbaseNodeDriver extends SoftwareProcessDriver { public void serverAdd(String serverToAdd, String username, String password); public void rebalance(); + + public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica); public void serverAddAndRebalance(String serverToAdd, String username, String password); diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java index e556c5dd02..11e36bc069 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeImpl.java @@ -181,4 +181,8 @@ public void disconnectSensors() { } } + @Override + public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { + getDriver().bucketCreate(bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); + } } diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java index d55f7df20f..e02ddc9f28 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java @@ -218,4 +218,16 @@ public void serverAddAndRebalance(String serverToAdd, String username, String pa entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "Rebalance Started"); } + @Override + public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { + newScript("bucketCreate").body.append(couchbaseCli("bucket-create") + + getCouchbaseHostnameAndCredentials() + + " --bucket=" + bucketName + + " --bucket-type=" + bucketType + + " --bucket-port=" + bucketPort + + " --bucket-ramsize=" + bucketRamSize + + " --bucket-replica=" + bucketReplica) + .failOnNonZeroResultCode() + .execute(); + } } diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java new file mode 100644 index 0000000000..ddea7217b3 --- /dev/null +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java @@ -0,0 +1,61 @@ +package brooklyn.entity.nosql.couchbase; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; + +@ImplementedBy(CouchbaseSyncGatewayImpl.class) +public interface CouchbaseSyncGateway extends SoftwareProcess { + + @SetFromFlag("version") + ConfigKey SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, + "1.0-beta3.1"); + + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey( + SoftwareProcess.DOWNLOAD_URL, "http://packages.couchbase.com/releases/couchbase-sync-gateway/1.0-beta/couchbase-sync-gateway-community_${version}_${driver.osTag}"); + + @SetFromFlag("couchbaseServer") + ConfigKey COUCHBASE_SERVER = ConfigKeys.newConfigKey(Entity.class, "couchbaseSyncGateway.couchbaseNode", + "Couchbase server node or cluster the sync gateway connects to"); + + @SetFromFlag("serverPool") + ConfigKey COUCHBASE_SERVER_POOL = ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverPool", + "Couchbase Server pool name in which to find buckets", "default"); + + @SetFromFlag("couchbaseServerBucket") + ConfigKey COUCHBASE_SERVER_BUCKET = ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverBucket", + "Name of the Couchbase bucket to use", "sync_gateway"); + + @SetFromFlag("couchbaseServerUrl") + ConfigKey COUCHBASE_SERVER_URL = ConfigKeys.newStringConfigKey("couchbaseSyncGateway.couchbaseServerUrl", + "Couchbase Server Admin Url to connect the gateway to"); + + @SetFromFlag("pretty") + ConfigKey PRETTY = ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.pretty", + "Pretty-print JSON responses. This is useful for debugging, but reduces performance.", false); + + @SetFromFlag("verbose") + ConfigKey VERBOSE = ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.verbose", + "Logs more information about requests.", false); + + AttributeSensor COUCHBASE_SERVER_WEB_URL = Sensors.newStringSensor("couchbaseSyncGateway.serverWebUrl", + "The Url and web port of the couchbase server to connect to"); + + AttributeSensor MANAGEMENT_URL = Sensors.newStringSensor("coucbaseSyncGateway.managementUrl", + "Management URL for Couchbase Sycn Gateway"); + + PortAttributeSensorAndConfigKey SYNC_REST_API_PORT = new PortAttributeSensorAndConfigKey("couchbaseSyncGateway.syncRestPort", + "Port the Sync REST API listens on", "4984"); + + PortAttributeSensorAndConfigKey ADMIN_REST_API_PORT = new PortAttributeSensorAndConfigKey("couchbaseSyncGateway.adminRestPort", + "Port the Admin REST API listens on", "4985"); + +} \ No newline at end of file diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java new file mode 100644 index 0000000000..b1b4339b2c --- /dev/null +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java @@ -0,0 +1,9 @@ +package brooklyn.entity.nosql.couchbase; + +import brooklyn.entity.basic.SoftwareProcessDriver; + +public interface CouchbaseSyncGatewayDriver extends SoftwareProcessDriver { + + public String getOsTag(); + +} \ No newline at end of file diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java new file mode 100644 index 0000000000..09ef569d79 --- /dev/null +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java @@ -0,0 +1,61 @@ +package brooklyn.entity.nosql.couchbase; + + +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.event.feed.http.HttpFeed; +import brooklyn.event.feed.http.HttpPollConfig; +import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.location.access.BrooklynAccessUtils; + +import com.google.common.net.HostAndPort; + +public class CouchbaseSyncGatewayImpl extends SoftwareProcessImpl implements CouchbaseSyncGateway { + + private HttpFeed httpFeed; + + @Override + public Class getDriverInterface() { + return CouchbaseSyncGatewayDriver.class; + } + + @Override + protected void connectSensors() { + super.connectSensors(); + connectServiceUpIsRunning(); + } + + @Override + protected void connectServiceUpIsRunning() { + + + HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, + getAttribute(CouchbaseSyncGateway.ADMIN_REST_API_PORT)); + + String managementUri = String.format("http://%s:%s", + hp.getHostText(), hp.getPort()); + + setAttribute(MANAGEMENT_URL, managementUri); + + httpFeed = HttpFeed.builder() + .entity(this) + .period(200) + .baseUri(managementUri) + .poll(new HttpPollConfig(SERVICE_UP) + .onSuccess(HttpValueFunctions.responseCodeEquals(200))) + .build(); + + } + + @Override + protected void disconnectSensors() { + super.disconnectSensors(); + disconnectServiceUpIsRunning(); + } + + @Override + protected void disconnectServiceUpIsRunning() { + if (httpFeed != null) { + httpFeed.stop(); + } + } +} \ No newline at end of file diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java new file mode 100644 index 0000000000..c7c9129240 --- /dev/null +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java @@ -0,0 +1,147 @@ +package brooklyn.entity.nosql.couchbase; + +import static brooklyn.util.ssh.BashCommands.INSTALL_CURL; +import static brooklyn.util.ssh.BashCommands.alternatives; +import static brooklyn.util.ssh.BashCommands.chainGroup; +import static brooklyn.util.ssh.BashCommands.sudo; +import static java.lang.String.format; + +import java.util.List; + +import javax.annotation.Nullable; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.EntityLocal; +import brooklyn.entity.drivers.downloads.DownloadResolver; +import brooklyn.location.OsDetails; +import brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.ssh.BashCommands; +import brooklyn.util.time.Duration; + +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +public class CouchbaseSyncGatewaySshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseSyncGatewayDriver { + public CouchbaseSyncGatewaySshDriver(EntityLocal entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + public boolean isRunning() { + return Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP)); + } + + @Override + public void stop() { + + } + + @Override + public void install() { + //reference http://docs.couchbase.com/sync-gateway/#getting-started-with-sync-gateway + DownloadResolver resolver = Entities.newDownloader(this); + List urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + + OsDetails osDetails = getMachine().getMachineDetails().getOsDetails(); + + log.info("Installing couchbase-sync-gateway version: {}", getVersion()); + if (osDetails.isLinux()) { + List commands = installLinux(urls, saveAs); + newScript(INSTALLING) + .body.append(commands).execute(); + } + } + + @Override + public void customize() { + + } + + @Override + public void launch() { + Entity cbNode = entity.getConfig(CouchbaseSyncGateway.COUCHBASE_SERVER); + Entities.waitForServiceUp(cbNode, Duration.ONE_HOUR); + + + if (cbNode instanceof CouchbaseCluster) { + Optional cbClusterNode = Iterables.tryFind(cbNode.getAttribute(CouchbaseCluster.GROUP_MEMBERS), new Predicate() { + + @Override + public boolean apply(@Nullable Entity entity) { + if (entity instanceof CouchbaseNode && Boolean.TRUE.equals(entity.getAttribute(CouchbaseNode.IS_IN_CLUSTER))) { + return true; + } + return false; + } + }); + if (cbClusterNode.isPresent()) { + cbNode = cbClusterNode.get(); + } else { + throw new IllegalArgumentException(format("The cluster %s does not contain any suitable Couchbase nodes to connect to..", cbNode.getId())); + } + + } + String hostname = cbNode.getAttribute(CouchbaseNode.HOSTNAME); + String webPort = cbNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_PORT).toString(); + + + String username = cbNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME); + String password = cbNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD); + + String bucketName = entity.getConfig(CouchbaseSyncGateway.COUCHBASE_SERVER_BUCKET); + String pool = entity.getConfig(CouchbaseSyncGateway.COUCHBASE_SERVER_POOL); + String pretty = entity.getConfig(CouchbaseSyncGateway.PRETTY) ? "-pretty" : ""; + String verbose = entity.getConfig(CouchbaseSyncGateway.VERBOSE) ? "-verbose" : ""; + + String adminRestApiPort = entity.getConfig(CouchbaseSyncGateway.ADMIN_REST_API_PORT).iterator().next().toString(); + String syncRestApiPort = entity.getConfig(CouchbaseSyncGateway.SYNC_REST_API_PORT).iterator().next().toString(); + + String serverWebAdminUrl = format("http://%s:%s@%s:%s", username, password, hostname, webPort); + String options = format("-url %s -bucket %s -adminInterface 0.0.0.0:%s -interface 0.0.0.0:%s -pool %s %s %s", + serverWebAdminUrl, bucketName, adminRestApiPort, syncRestApiPort, pool, pretty, verbose); + + newScript(LAUNCHING) + .body.append(format("/opt/couchbase-sync-gateway/bin/sync_gateway %s &", options)) + .execute(); + } + + private List installLinux(List urls, String saveAs) { + + String apt = chainGroup( + "which apt-get", + sudo("apt-get update"), + sudo(format("dpkg -i %s", saveAs))); + + String yum = chainGroup( + "which yum", + sudo(format("rpm --install %s", saveAs))); + + return ImmutableList.builder() + .add(INSTALL_CURL) + .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) + .add(alternatives(apt, yum)) + .build(); + } + + @Override + public String getOsTag() { + OsDetails os = getLocation().getOsDetails(); + if (os == null) { + // Default to generic linux + return "x86_64.rpm"; + } else { + //FIXME should be a better way to check for OS name and version + String osName = os.getName().toLowerCase(); + String fileExtension = osName.contains("deb") || osName.contains("ubuntu") ? ".deb" : ".rpm"; + String arch = os.is64bit() ? "x86_64" : "x86"; + return arch + fileExtension; + } + } + +} \ No newline at end of file From 7d0467d887629225b919d9fce93d5abf3c9a0b20 Mon Sep 17 00:00:00 2001 From: Martin Harris Date: Wed, 25 Jun 2014 16:44:11 +0100 Subject: [PATCH 2/5] Fixed bucket creation --- .../nosql/couchbase/CouchbaseClusterImpl.java | 47 ++++++- .../couchbase/CouchbaseNodeSshDriver.java | 116 ++++++++++++++++-- .../couchbase/CouchbaseSyncGatewayImpl.java | 5 + .../CouchbaseSyncGatewaySshDriver.java | 34 +++-- 4 files changed, 175 insertions(+), 27 deletions(-) diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java index b7c202630c..f424eafce4 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java @@ -48,23 +48,27 @@ import brooklyn.event.feed.http.HttpFeed; import brooklyn.event.feed.http.HttpPollConfig; import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.event.feed.http.JsonFunctions; import brooklyn.location.Location; import brooklyn.policy.PolicySpec; import brooklyn.util.collections.MutableSet; +import brooklyn.util.guava.Functionals; import brooklyn.util.task.DynamicTasks; import brooklyn.util.task.TaskBuilder; import brooklyn.util.task.Tasks; import brooklyn.util.text.ByteSizeStrings; +import brooklyn.util.text.Strings; import brooklyn.util.time.Time; import com.google.common.base.Function; -import com.google.common.base.Functions; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; public class CouchbaseClusterImpl extends DynamicClusterImpl implements CouchbaseCluster { private static final Logger log = LoggerFactory.getLogger(CouchbaseClusterImpl.class); @@ -141,6 +145,8 @@ public void start(Collection locations) { connectSensors(); connectEnrichers(); + + setAttribute(BUCKET_CREATION_IN_PROGRESS, false); //start timeout before adding the servers Time.sleep(getConfig(SERVICE_UP_TIME_OUT)); @@ -167,10 +173,12 @@ public void start(Collection locations) { } finally { Tasks.resetBlockingDetails(); } - Entities.invokeEffector(this, getPrimaryNode(), CouchbaseNode.REBALANCE); + + ((CouchbaseNode)getPrimaryNode()).rebalance(); if (Optional.fromNullable(CREATE_BUCKETS).isPresent()) { createBuckets(); + DependentConfiguration.waitInTaskForAttributeReady(this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); } setAttribute(IS_CLUSTER_INITIALIZED, true); @@ -187,6 +195,9 @@ public void start(Collection locations) { @Override public void stop() { + if (resetBucketCreation[0] != null) { + resetBucketCreation[0].stop(); + } super.stop(); } @@ -205,7 +216,7 @@ protected void connectSensors() { .displayName("Controller targets tracker") .configure("group", this)); } - + public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { @Override protected void onEntityChange(Entity member) { ((CouchbaseClusterImpl)entity).onServerPoolMemberChanged(member); @@ -377,17 +388,41 @@ public Void call() throws Exception { CouchbaseClusterImpl.this.resetBucketCreation[0].stop(); } setAttribute(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true); - + CouchbaseClusterImpl.this.resetBucketCreation[0] = HttpFeed.builder() .entity(CouchbaseClusterImpl.this) .period(500, TimeUnit.MILLISECONDS) .baseUri(String.format("%s/pools/default/buckets/%s", primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL), bucketName)) .credentials(primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_USERNAME), primaryNode.getConfig(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD)) .poll(new HttpPollConfig(BUCKET_CREATION_IN_PROGRESS) - .onSuccess(HttpValueFunctions.responseCodeEquals(404)) - .onFailureOrException(Functions.constant(false))) + .onSuccess(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.walkN("nodes"), new Function() { + @Override public Boolean apply(JsonElement input) { + // Wait until bucket has been created on all nodes and the couchApiBase element has been published (indicating that the bucket is useable) + JsonArray servers = input.getAsJsonArray(); + if (servers.size() != CouchbaseClusterImpl.this.getMembers().size()) { + return true; + } + for (JsonElement server : servers) { + Object api = server.getAsJsonObject().get("couchApiBase"); + if (api == null || Strings.isEmpty(String.valueOf(api))) { + return true; + } + } + return false; + } + })) + .onFailureOrException(new Function() { + @Override + public Boolean apply(Object input) { + if (((brooklyn.util.http.HttpToolResponse) input).getResponseCode() == 404) { + return true; + } + throw new IllegalStateException("Unexpected response when creating bucket:" + input); + } + })) .build(); + // TODO: Bail out if bucket creation fails, to allow next bucket to proceed Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); if (CouchbaseClusterImpl.this.resetBucketCreation[0] != null) { diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java index e02ddc9f28..6f81b730f2 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java @@ -24,19 +24,38 @@ import static brooklyn.util.ssh.BashCommands.sudo; import static java.lang.String.format; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.http.auth.Credentials; +import org.apache.http.auth.UsernamePasswordCredentials; import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; import brooklyn.entity.basic.Entities; import brooklyn.entity.drivers.downloads.DownloadResolver; +import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.event.feed.http.JsonFunctions; import brooklyn.location.OsDetails; import brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.guava.Functionals; +import brooklyn.util.http.HttpTool; +import brooklyn.util.http.HttpToolResponse; +import brooklyn.util.repeat.Repeater; import brooklyn.util.ssh.BashCommands; import brooklyn.util.task.Tasks; import brooklyn.util.time.Duration; import brooklyn.util.time.Time; +import com.google.common.base.Function; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; public class CouchbaseNodeSshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseNodeDriver { @@ -193,8 +212,83 @@ public void rebalance() { .failOnNonZeroResultCode() .execute(); entity.setAttribute(CouchbaseNode.REBALANCE_STATUS, "Rebalance Started"); + // wait until the re-balance is complete + Repeater.create() + .every(Duration.millis(500)) + .limitTimeTo(Duration.THIRTY_SECONDS) + .until(new Callable() { + @Override + public Boolean call() throws Exception { + for (String nodeHostName : CouchbaseNodeSshDriver.this.getNodeHostNames()) { + if (isNodeRebalancing(nodeHostName)) { + return true; + } + } + return false; + } + }) + .run(); + Repeater.create() + .every(Duration.FIVE_SECONDS) + .limitTimeTo(Duration.FIVE_MINUTES) + .until(new Callable() { + @Override + public Boolean call() throws Exception { + for (String nodeHostName : CouchbaseNodeSshDriver.this.getNodeHostNames()) { + if (isNodeRebalancing(nodeHostName)) { + return false; + } + } + return true; + } + }) + .run(); + log.info("rebalanced cluster via primary node {}", getEntity()); } + private Iterable getNodeHostNames() throws URISyntaxException { + Function> getNodesAsList = new Function>() { + @Override public Iterable apply(JsonElement input) { + if (input == null) { + return Collections.emptyList(); + } + Collection names = Lists.newArrayList(); + JsonArray nodes = input.getAsJsonArray(); + for (JsonElement element : nodes) { + // NOTE: the 'hostname' element also includes the port + names.add(element.getAsJsonObject().get("hostname").toString().replace("\"", "")); + } + return names; + } + }; + HttpToolResponse nodesResponse = getAPIResponse(String.format("http://%s:%s/pools/nodes", getHostname(), getWebPort())); + return Functionals.chain( + HttpValueFunctions.jsonContents(), + JsonFunctions.walkN("nodes"), + getNodesAsList + ).apply(nodesResponse); + } + + private boolean isNodeRebalancing(String nodeHostName) throws URISyntaxException { + HttpToolResponse response = getAPIResponse("http://" + nodeHostName + "/pools/nodes/rebalanceProgress"); + if (response.getResponseCode() != 200) { + throw new IllegalStateException("failed to rebalance cluster: " + response); + } + return !HttpValueFunctions.jsonContents("status", String.class).apply(response).equals("none"); + } + + private HttpToolResponse getAPIResponse(String path) throws URISyntaxException { + URI uri = new URI(path); + Credentials credentials = new UsernamePasswordCredentials(getUsername(), getPassword()); + return HttpTool.httpGet(HttpTool.httpClientBuilder() + // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials + .uri(uri) + .credentials(credentials) + .build(), + uri, + ImmutableMap.of()); + } + @Override public void serverAdd(String serverToAdd, String username, String password) { newScript("serverAdd").body.append(couchbaseCli("server-add") @@ -219,15 +313,15 @@ public void serverAddAndRebalance(String serverToAdd, String username, String pa } @Override - public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { - newScript("bucketCreate").body.append(couchbaseCli("bucket-create") - + getCouchbaseHostnameAndCredentials() + - " --bucket=" + bucketName + - " --bucket-type=" + bucketType + - " --bucket-port=" + bucketPort + - " --bucket-ramsize=" + bucketRamSize + - " --bucket-replica=" + bucketReplica) - .failOnNonZeroResultCode() - .execute(); - } + public void bucketCreate(String bucketName, String bucketType, Integer bucketPort, Integer bucketRamSize, Integer bucketReplica) { + newScript("bucketCreate").body.append(couchbaseCli("bucket-create") + + getCouchbaseHostnameAndCredentials() + + " --bucket=" + bucketName + + " --bucket-type=" + bucketType + + " --bucket-port=" + bucketPort + + " --bucket-ramsize=" + bucketRamSize + + " --bucket-replica=" + bucketReplica) + .failOnNonZeroResultCode() + .execute(); + } } diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java index 09ef569d79..0ead1105ec 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java @@ -1,6 +1,7 @@ package brooklyn.entity.nosql.couchbase; +import brooklyn.config.render.RendererHints; import brooklyn.entity.basic.SoftwareProcessImpl; import brooklyn.event.feed.http.HttpFeed; import brooklyn.event.feed.http.HttpPollConfig; @@ -58,4 +59,8 @@ protected void disconnectServiceUpIsRunning() { httpFeed.stop(); } } + + static { + RendererHints.register(MANAGEMENT_URL, new RendererHints.NamedActionWithUrl("Open")); + } } \ No newline at end of file diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java index c7c9129240..7339c40dfc 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java @@ -12,18 +12,21 @@ import brooklyn.entity.Entity; import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; -import brooklyn.entity.basic.Attributes; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.EntityLocal; import brooklyn.entity.drivers.downloads.DownloadResolver; +import brooklyn.event.basic.DependentConfiguration; import brooklyn.location.OsDetails; import brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.collections.MutableMap; import brooklyn.util.ssh.BashCommands; import brooklyn.util.time.Duration; import com.google.common.base.Optional; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; public class CouchbaseSyncGatewaySshDriver extends AbstractSoftwareProcessSshDriver implements CouchbaseSyncGatewayDriver { @@ -31,11 +34,6 @@ public CouchbaseSyncGatewaySshDriver(EntityLocal entity, SshMachineLocation mach super(entity, machine); } - @Override - public boolean isRunning() { - return Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP)); - } - @Override public void stop() { @@ -67,8 +65,13 @@ public void customize() { public void launch() { Entity cbNode = entity.getConfig(CouchbaseSyncGateway.COUCHBASE_SERVER); Entities.waitForServiceUp(cbNode, Duration.ONE_HOUR); - - + DependentConfiguration.waitInTaskForAttributeReady(cbNode, CouchbaseCluster.IS_CLUSTER_INITIALIZED, Predicates.equalTo(true)); + try { + // Even once the bucket has published its API URL, it can still take a couple of seconds for it to become available + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + // no-op + } if (cbNode instanceof CouchbaseCluster) { Optional cbClusterNode = Iterables.tryFind(cbNode.getAttribute(CouchbaseCluster.GROUP_MEMBERS), new Predicate() { @@ -106,10 +109,21 @@ public boolean apply(@Nullable Entity entity) { String options = format("-url %s -bucket %s -adminInterface 0.0.0.0:%s -interface 0.0.0.0:%s -pool %s %s %s", serverWebAdminUrl, bucketName, adminRestApiPort, syncRestApiPort, pool, pretty, verbose); - newScript(LAUNCHING) - .body.append(format("/opt/couchbase-sync-gateway/bin/sync_gateway %s &", options)) + newScript(ImmutableMap.of("usePidFile", true), LAUNCHING) + .body.append(format("/opt/couchbase-sync-gateway/bin/sync_gateway %s ", options) + "> out.log 2> err.log < /dev/null &") + .failOnNonZeroResultCode() .execute(); } + + @Override + public boolean isRunning() { + return newScript(MutableMap.of("usePidFile", true), CHECK_RUNNING).execute() == 0; + } + + @Override + public void kill() { + newScript(MutableMap.of("usePidFile", true), KILLING).execute(); + } private List installLinux(List urls, String saveAs) { From 48c22178417698c7891a4cd29ebc78d5d1db2ec9 Mon Sep 17 00:00:00 2001 From: Martin Harris Date: Tue, 1 Jul 2014 10:43:16 +0100 Subject: [PATCH 3/5] Fixed CouchbaseSyncGateway live tests --- .../couchbase/CouchbaseNodeSshDriver.java | 17 ++- .../CouchbaseSyncGatewaySshDriver.java | 9 +- .../CouchbaseSyncGatewayEc2LiveTest.java | 114 ++++++++++++++++++ 3 files changed, 129 insertions(+), 11 deletions(-) create mode 100644 software/nosql/src/test/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayEc2LiveTest.java diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java index 6f81b730f2..8eb95e0e55 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java @@ -21,6 +21,7 @@ import static brooklyn.util.ssh.BashCommands.INSTALL_CURL; import static brooklyn.util.ssh.BashCommands.alternatives; import static brooklyn.util.ssh.BashCommands.chainGroup; +import static brooklyn.util.ssh.BashCommands.ok; import static brooklyn.util.ssh.BashCommands.sudo; import static java.lang.String.format; @@ -48,7 +49,6 @@ import brooklyn.util.ssh.BashCommands; import brooklyn.util.task.Tasks; import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; @@ -102,10 +102,13 @@ private List installLinux(List urls, String saveAs) { String yum = chainGroup( "which yum", - sudo("yum check-update"), + // The following prevents failure on RHEL AWS nodes: + // https://forums.aws.amazon.com/thread.jspa?threadID=100509 + ok(sudo("sed -i.bk s/^enabled=1$/enabled=0/ /etc/yum/pluginconf.d/subscription-manager.conf")), + ok(sudo("yum check-update")), sudo("yum install -y pkgconfig"), // RHEL requires openssl version 098 - sudo("[ -f /etc/redhat-release ] && (grep -i \"red hat\" /etc/redhat-release && yum install -y openssl098e) || :"), + sudo("[ -f /etc/redhat-release ] && (grep -i \"red hat\" /etc/redhat-release && sudo yum install -y openssl098e) || :"), sudo(format("rpm --install %s", saveAs))); return ImmutableList.builder() @@ -130,11 +133,15 @@ public void customize() { @Override public void launch() { - //FIXME needs time for http server to initialize - Time.sleep(Duration.TEN_SECONDS); newScript(LAUNCHING) .body.append( sudo("/etc/init.d/couchbase-server start"), + "for i in {0..120}\n" + + "do\n" + + " if [ $i -eq 120 ]; then echo REST API unavailable after 120 seconds, failing; exit 1; fi;\n" + + " curl -s " + String.format("http://%s:%s", getHostname(), getWebPort()) + " > /dev/null && echo REST API available after $i seconds && break\n" + + " sleep 1\n" + + "done\n" + couchbaseCli("cluster-init") + getCouchbaseHostnameAndPort() + " --cluster-init-username=" + getUsername() + diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java index 7339c40dfc..6d9157c8e0 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java @@ -21,6 +21,7 @@ import brooklyn.util.collections.MutableMap; import brooklyn.util.ssh.BashCommands; import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; import com.google.common.base.Optional; import com.google.common.base.Predicate; @@ -66,12 +67,8 @@ public void launch() { Entity cbNode = entity.getConfig(CouchbaseSyncGateway.COUCHBASE_SERVER); Entities.waitForServiceUp(cbNode, Duration.ONE_HOUR); DependentConfiguration.waitInTaskForAttributeReady(cbNode, CouchbaseCluster.IS_CLUSTER_INITIALIZED, Predicates.equalTo(true)); - try { - // Even once the bucket has published its API URL, it can still take a couple of seconds for it to become available - Thread.sleep(10 * 1000); - } catch (InterruptedException e) { - // no-op - } + // Even once the bucket has published its API URL, it can still take a couple of seconds for it to become available + Time.sleep(10 * 1000); if (cbNode instanceof CouchbaseCluster) { Optional cbClusterNode = Iterables.tryFind(cbNode.getAttribute(CouchbaseCluster.GROUP_MEMBERS), new Predicate() { diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayEc2LiveTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayEc2LiveTest.java new file mode 100644 index 0000000000..4c659d61dc --- /dev/null +++ b/software/nosql/src/test/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayEc2LiveTest.java @@ -0,0 +1,114 @@ +package brooklyn.entity.nosql.couchbase; + +import java.util.List; +import java.util.Map; + +import org.testng.annotations.Test; + +import brooklyn.entity.AbstractEc2LiveTest; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.trait.Startable; +import brooklyn.location.Location; +import brooklyn.test.EntityTestUtils; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +@Test +public class CouchbaseSyncGatewayEc2LiveTest extends AbstractEc2LiveTest { + + @Override + protected void doTest(Location loc) throws Exception { + CouchbaseCluster cluster = app.createAndManageChild(EntitySpec.create(CouchbaseCluster.class) + .configure(CouchbaseNode.COUCHBASE_ADMIN_USERNAME, "Administrator") + .configure(CouchbaseNode.COUCHBASE_ADMIN_PASSWORD, "Password") + .configure(DynamicCluster.INITIAL_SIZE, 3) + .configure(CouchbaseCluster.CREATE_BUCKETS, (List>)ImmutableList.of( + (Map)ImmutableMap.of( + "bucket", "default", + "bucket-ramsize", 100, + "bucket-type", "couchbase", + "bucket-port", 11211 + ), + (Map)ImmutableMap.of( + "bucket", "my_bucket", + "bucket-ramsize", 100, + "bucket-type", "couchbase", + "bucket-port", 11223 + ), + (Map)ImmutableMap.of( + "bucket", "another", + "bucket-ramsize", 100, + "bucket-type", "couchbase", + "bucket-port", 11224 + )) + ) + ); + CouchbaseSyncGateway gateway = app.createAndManageChild(EntitySpec.create(CouchbaseSyncGateway.class) + .configure(CouchbaseSyncGateway.COUCHBASE_SERVER, cluster) + .configure(CouchbaseSyncGateway.COUCHBASE_SERVER_BUCKET, "my_bucket") + ); + + app.start(ImmutableList.of(loc)); + + EntityTestUtils.assertAttributeEqualsEventually(gateway, Startable.SERVICE_UP, true); + } + + + // Supported operating systems + + @Override + public void test_Ubuntu_12_0() throws Exception { + super.test_Ubuntu_12_0(); + } + + @Override + public void test_Red_Hat_Enterprise_Linux_6() throws Exception { + super.test_Red_Hat_Enterprise_Linux_6(); + } + + @Override + public void test_CentOS_6_3() throws Exception { + super.test_CentOS_6_3(); + } + + // Unsupported operating systems + + @Override + public void test_CentOS_5_6() throws Exception { + // Unsupported + // error: Failed dependencies: + // libc.so.6(GLIBC_2.7)(64bit) is needed by couchbase-server-2.5.1-1083.x86_64 + // libcrypto.so.10()(64bit) is needed by couchbase-server-2.5.1-1083.x86_64 + // libreadline.so.6()(64bit) is needed by couchbase-server-2.5.1-1083.x86_64 + // libssl.so.10()(64bit) is needed by couchbase-server-2.5.1-1083.x86_64 + // libstdc++.so.6(GLIBCXX_3.4.10)(64bit) is needed by couchbase-server-2.5.1-1083.x86_64 + // libstdc++.so.6(GLIBCXX_3.4.11)(64bit) is needed by couchbase-server-2.5.1-1083.x86_64 + // libstdc++.so.6(GLIBCXX_3.4.9)(64bit) is needed by couchbase-server-2.5.1-1083.x86_64 + // libtinfo.so.5()(64bit) is needed by couchbase-server-2.5.1-1083.x86_64 + // openssl >= 1.0.0 is needed by couchbase-server-2.5.1-1083.x86_64 + // rpmlib(FileDigests) <= 4.6.0-1 is needed by couchbase-server-2.5.1-1083.x86_64 + // rpmlib(PayloadIsXz) <= 5.2-1 is needed by couchbase-server-2.5.1-1083.x86_64 + } + + @Override + public void test_Debian_6() throws Exception { + // Unsupported + } + + @Override + public void test_Debian_7_2() throws Exception { + // Unsupported + } + + @Override + public void test_Ubuntu_10_0() throws Exception { + // Unsupported + // Installing cannot proceed since the package 'libssl1*' is missing. + // Please install libssl1* and try again. + // $sudo apt-get install libssl1* + // + // Installing libssl1* doesn't fix the issue + } +} From 4885a798700aa22f39e030d97a2d65213ecb9a31 Mon Sep 17 00:00:00 2001 From: Martin Harris Date: Fri, 11 Jul 2014 17:12:48 +0100 Subject: [PATCH 4/5] fixup me - temp commit --- .../nosql/couchbase/CouchbaseClusterImpl.java | 25 +++++++++--------- .../couchbase/CouchbaseNodeSshDriver.java | 8 +++--- .../nosql/couchbase/CouchbaseSyncGateway.java | 22 +++++++++++++--- .../couchbase/CouchbaseSyncGatewayDriver.java | 18 +++++++++++++ .../couchbase/CouchbaseSyncGatewayImpl.java | 26 +++++++++++++++---- .../CouchbaseSyncGatewaySshDriver.java | 18 +++++++++++++ 6 files changed, 92 insertions(+), 25 deletions(-) diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java index f424eafce4..4affcd1bda 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseClusterImpl.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +74,9 @@ public class CouchbaseClusterImpl extends DynamicClusterImpl implements CouchbaseCluster { private static final Logger log = LoggerFactory.getLogger(CouchbaseClusterImpl.class); private final Object mutex = new Object[0]; - private final HttpFeed[] resetBucketCreation = new HttpFeed[]{null}; + // Used to serialize bucket creation as only one bucket can be created at a time, + // so a feed is used to determine when a bucket has finished being created + private final AtomicReference resetBucketCreation = new AtomicReference(); public void init() { log.info("Initializing the Couchbase cluster..."); @@ -195,8 +198,8 @@ public void start(Collection locations) { @Override public void stop() { - if (resetBucketCreation[0] != null) { - resetBucketCreation[0].stop(); + if (resetBucketCreation.get() != null) { + resetBucketCreation.get().stop(); } super.stop(); } @@ -360,8 +363,7 @@ public boolean isMemberInCluster(Entity e) { } public void createBuckets() { - //FIXME: multiple buckets require synchronization/wait time (checks for port conflicts and exceeding ram size) - //TODO: check for multiple bucket conflicts with port + //TODO: check for port conflicts if buckets are being created with a port List> bucketsToCreate = getConfig(CREATE_BUCKETS); Entity primaryNode = getPrimaryNode(); @@ -374,7 +376,6 @@ public void createBuckets() { log.info("adding bucket: {} to primary node: {}", bucketName, primaryNode.getId()); createBucket(primaryNode, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); - //TODO: add if bucket has been created. } } @@ -384,12 +385,12 @@ public void createBucket(final Entity primaryNode, final String bucketName, fina @Override public Void call() throws Exception { DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); - if (CouchbaseClusterImpl.this.resetBucketCreation[0] != null) { - CouchbaseClusterImpl.this.resetBucketCreation[0].stop(); + if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) { + CouchbaseClusterImpl.this.resetBucketCreation.get().stop(); } setAttribute(CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, true); - CouchbaseClusterImpl.this.resetBucketCreation[0] = HttpFeed.builder() + CouchbaseClusterImpl.this.resetBucketCreation.set(HttpFeed.builder() .entity(CouchbaseClusterImpl.this) .period(500, TimeUnit.MILLISECONDS) .baseUri(String.format("%s/pools/default/buckets/%s", primaryNode.getAttribute(CouchbaseNode.COUCHBASE_WEB_ADMIN_URL), bucketName)) @@ -420,13 +421,13 @@ public Boolean apply(Object input) { throw new IllegalStateException("Unexpected response when creating bucket:" + input); } })) - .build(); + .build()); // TODO: Bail out if bucket creation fails, to allow next bucket to proceed Entities.invokeEffectorWithArgs(CouchbaseClusterImpl.this, primaryNode, CouchbaseNode.BUCKET_CREATE, bucketName, bucketType, bucketPort, bucketRamSize, bucketReplica); DependentConfiguration.waitInTaskForAttributeReady(CouchbaseClusterImpl.this, CouchbaseCluster.BUCKET_CREATION_IN_PROGRESS, Predicates.equalTo(false)); - if (CouchbaseClusterImpl.this.resetBucketCreation[0] != null) { - CouchbaseClusterImpl.this.resetBucketCreation[0].stop(); + if (CouchbaseClusterImpl.this.resetBucketCreation.get() != null) { + CouchbaseClusterImpl.this.resetBucketCreation.get().stop(); } return null; } diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java index 8eb95e0e55..67fa30a304 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseNodeSshDriver.java @@ -284,15 +284,15 @@ private boolean isNodeRebalancing(String nodeHostName) throws URISyntaxException return !HttpValueFunctions.jsonContents("status", String.class).apply(response).equals("none"); } - private HttpToolResponse getAPIResponse(String path) throws URISyntaxException { - URI uri = new URI(path); + private HttpToolResponse getAPIResponse(String uri) throws URISyntaxException { + URI apiUri = new URI(uri); Credentials credentials = new UsernamePasswordCredentials(getUsername(), getPassword()); return HttpTool.httpGet(HttpTool.httpClientBuilder() // the uri is required by the HttpClientBuilder in order to set the AuthScope of the credentials - .uri(uri) + .uri(apiUri) .credentials(credentials) .build(), - uri, + apiUri, ImmutableMap.of()); } diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java index ddea7217b3..c0740ee47f 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGateway.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package brooklyn.entity.nosql.couchbase; import brooklyn.config.ConfigKey; @@ -34,10 +52,6 @@ public interface CouchbaseSyncGateway extends SoftwareProcess { ConfigKey COUCHBASE_SERVER_BUCKET = ConfigKeys.newStringConfigKey("couchbaseSyncGateway.serverBucket", "Name of the Couchbase bucket to use", "sync_gateway"); - @SetFromFlag("couchbaseServerUrl") - ConfigKey COUCHBASE_SERVER_URL = ConfigKeys.newStringConfigKey("couchbaseSyncGateway.couchbaseServerUrl", - "Couchbase Server Admin Url to connect the gateway to"); - @SetFromFlag("pretty") ConfigKey PRETTY = ConfigKeys.newBooleanConfigKey("couchbaseSyncGateway.pretty", "Pretty-print JSON responses. This is useful for debugging, but reduces performance.", false); diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java index b1b4339b2c..148ec0b6c3 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayDriver.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package brooklyn.entity.nosql.couchbase; import brooklyn.entity.basic.SoftwareProcessDriver; diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java index 0ead1105ec..4ac3400dba 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayImpl.java @@ -1,6 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package brooklyn.entity.nosql.couchbase; - import brooklyn.config.render.RendererHints; import brooklyn.entity.basic.SoftwareProcessImpl; import brooklyn.event.feed.http.HttpFeed; @@ -8,6 +25,7 @@ import brooklyn.event.feed.http.HttpValueFunctions; import brooklyn.location.access.BrooklynAccessUtils; +import com.google.common.base.Functions; import com.google.common.net.HostAndPort; public class CouchbaseSyncGatewayImpl extends SoftwareProcessImpl implements CouchbaseSyncGateway { @@ -27,8 +45,6 @@ protected void connectSensors() { @Override protected void connectServiceUpIsRunning() { - - HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getAttribute(CouchbaseSyncGateway.ADMIN_REST_API_PORT)); @@ -42,9 +58,9 @@ protected void connectServiceUpIsRunning() { .period(200) .baseUri(managementUri) .poll(new HttpPollConfig(SERVICE_UP) - .onSuccess(HttpValueFunctions.responseCodeEquals(200))) + .onSuccess(HttpValueFunctions.responseCodeEquals(200)) + .onFailureOrException(Functions.constant(false))) .build(); - } @Override diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java index 6d9157c8e0..3d6104bf78 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewaySshDriver.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package brooklyn.entity.nosql.couchbase; import static brooklyn.util.ssh.BashCommands.INSTALL_CURL; From f23ee27b877642d794f5a209aac8f1597b59ceb8 Mon Sep 17 00:00:00 2001 From: Martin Harris Date: Wed, 20 Aug 2014 11:06:16 +0100 Subject: [PATCH 5/5] Adds apache header to Ec2LiveTest, adds 'Live' group to tests --- .../CouchbaseSyncGatewayEc2LiveTest.java | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayEc2LiveTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayEc2LiveTest.java index 4c659d61dc..7f2327a682 100644 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayEc2LiveTest.java +++ b/software/nosql/src/test/java/brooklyn/entity/nosql/couchbase/CouchbaseSyncGatewayEc2LiveTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package brooklyn.entity.nosql.couchbase; import java.util.List; @@ -15,7 +33,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -@Test public class CouchbaseSyncGatewayEc2LiveTest extends AbstractEc2LiveTest { @Override @@ -57,17 +74,19 @@ protected void doTest(Location loc) throws Exception { // Supported operating systems - + @Test(groups = {"Live"}) @Override public void test_Ubuntu_12_0() throws Exception { super.test_Ubuntu_12_0(); } + @Test(groups = {"Live"}) @Override public void test_Red_Hat_Enterprise_Linux_6() throws Exception { super.test_Red_Hat_Enterprise_Linux_6(); } + @Test(groups = {"Live"}) @Override public void test_CentOS_6_3() throws Exception { super.test_CentOS_6_3(); @@ -75,6 +94,7 @@ public void test_CentOS_6_3() throws Exception { // Unsupported operating systems + @Test(groups = {"Live"}) @Override public void test_CentOS_5_6() throws Exception { // Unsupported @@ -92,16 +112,19 @@ public void test_CentOS_5_6() throws Exception { // rpmlib(PayloadIsXz) <= 5.2-1 is needed by couchbase-server-2.5.1-1083.x86_64 } + @Test(groups = {"Live"}) @Override public void test_Debian_6() throws Exception { // Unsupported } + @Test(groups = {"Live"}) @Override public void test_Debian_7_2() throws Exception { // Unsupported } + @Test(groups = {"Live"}) @Override public void test_Ubuntu_10_0() throws Exception { // Unsupported