Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
WHIRR-288. Add blob store persistence for cluster state (asavu)
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/incubator/whirr/trunk@1099810 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Andrei Savu committed May 5, 2011
1 parent b641a72 commit fb826a9
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 58 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ Trunk (unreleased changes)
WHIRR-297. Separate ZooKeeper and ElasticSearch install and configuration
scripts into more generic functions (asavu)

WHIRR-288. Add blob store persistence for cluster state (asavu)

BUG FIXES

WHIRR-253. ZooKeeper service should only authorize ingress to ZooKeeper
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/whirr/ClusterController.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ public Set<Cluster.Instance> getInstances(ClusterSpec spec)
public Set<Cluster.Instance> getInstances(ClusterSpec spec, ClusterStateStore stateStore)
throws IOException, InterruptedException {
Set<Cluster.Instance> instances = Sets.newLinkedHashSet();
if (stateStore != null) {
Cluster cluster = (stateStore != null) ? stateStore.load() : null;
if (cluster != null) {
/* enrich the instance information with node metadata */
Cluster cluster = stateStore.load();

for(NodeMetadata node : getNodes(spec)) {
Cluster.Instance instance = cluster.getInstanceMatching(withIds(node.getId()));
Expand All @@ -176,6 +176,7 @@ public Set<Cluster.Instance> getInstances(ClusterSpec spec, ClusterStateStore st
}
} else {
/* return a list of instances with no roles attached */

Credentials credentials = new Credentials(spec.getClusterUser(), spec.getPrivateKey());
for(NodeMetadata node : getNodes(spec)) {
instances.add(new Cluster.Instance(credentials, Sets.<String>newHashSet(),
Expand Down
64 changes: 63 additions & 1 deletion core/src/main/java/org/apache/whirr/ClusterSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
import org.apache.commons.configuration.interpol.ConfigurationInterpolator;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.text.StrLookup;
import org.jclouds.predicates.Validator;
import org.jclouds.predicates.validators.DnsNameValidator;
import org.jclouds.rest.annotations.ParamValidators;
import org.jclouds.s3.predicates.validators.BucketNameValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -125,6 +129,15 @@ public enum Property {
BLOBSTORE_CREDENTIAL(String.class, false, "The blob store credential"),

BLOBSTORE_LOCATION_ID(String.class, false, "The blob store location ID"),

STATE_STORE(String.class, false, "What kind of store to use for state " +
"(local, blob or none). Defaults to local."),

STATE_STORE_CONTAINER(String.class, false, "Container where to store state. " +
"Valid only for the blob state store."),

STATE_STORE_BLOB(String.class, false, "Blob name for state storage. " +
"Valid only for the blob state store. Defaults to whirr-<cluster-name>"),

IMAGE_ID(String.class, false, "The ID of the image to use for " +
"instances. If not specified then a vanilla Linux image is " +
Expand Down Expand Up @@ -237,6 +250,10 @@ public static ClusterSpec withNoDefaults(Configuration conf)
private String blobStoreIdentity;
private String blobStoreCredential;

private String stateStore;
private String stateStoreContainer;
private String stateStoreBlob;

private String privateKey;
private File privateKeyFile;
private String publicKey;
Expand Down Expand Up @@ -292,6 +309,10 @@ public ClusterSpec(Configuration userConfig, boolean loadDefaults)
setBlobStoreIdentity(getString(Property.BLOBSTORE_IDENTITY));
setBlobStoreCredential(getString(Property.BLOBSTORE_CREDENTIAL));

setStateStore(getString(Property.STATE_STORE));
setStateStoreContainer(getString(Property.STATE_STORE_CONTAINER));
setStateStoreBlob(getString(Property.STATE_STORE_BLOB));

checkAndSetKeyPair();

setImageId(getString(Property.IMAGE_ID));
Expand All @@ -307,7 +328,7 @@ public ClusterSpec(Configuration userConfig, boolean loadDefaults)
}

private String getString(Property key) {
return config.getString(key.getConfigName());
return config.getString(key.getConfigName(), null);
}

private int getInt(Property key, int defaultValue) {
Expand Down Expand Up @@ -447,6 +468,24 @@ public String getBlobStoreLocationId() {
return blobStoreLocationId;
}

public String getStateStore() {
if (stateStore == null) {
return "local";
}
return stateStore;
}

public String getStateStoreContainer() {
return stateStoreContainer;
}

public String getStateStoreBlob() {
if (stateStoreBlob == null && "blob".equals(stateStore)) {
return "whirr-" + getClusterName();
}
return stateStoreBlob;
}

public String getServiceName() {
return serviceName;
}
Expand Down Expand Up @@ -535,6 +574,29 @@ public void setBlobStoreLocationId(String locationId) {
blobStoreLocationId = locationId;
}

public void setStateStore(String type) {
if (type != null) {
checkArgument(Sets.newHashSet("local", "blob", "none").contains(type),
"Invalid state store. Valid values are local, blob or none.");
}
this.stateStore = type;
}

public void setStateStoreContainer(String container) {
checkContainerName(container);
this.stateStoreContainer = container;
}

private void checkContainerName(String name) {
if (name != null) {
checkArgument((new DnsNameValidator(3, 63){}).apply(name));
}
}

public void setStateStoreBlob(String blob) {
this.stateStoreBlob = blob;
}

public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.whirr.service;

import org.apache.commons.io.IOUtils;
import org.apache.whirr.Cluster;
import org.apache.whirr.ClusterSpec;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import static com.google.common.base.Preconditions.checkNotNull;

public class BlobClusterStateStore extends ClusterStateStore {

private static final Logger LOG = LoggerFactory
.getLogger(FileClusterStateStore.class);

private ClusterSpec spec;
private BlobStoreContext context;

private String container;
private String blobName;

public BlobClusterStateStore(ClusterSpec spec) {
this.spec = spec;
this.context = BlobStoreContextBuilder.build(spec);

this.container = checkNotNull(spec.getStateStoreContainer());
this.blobName = checkNotNull(spec.getStateStoreBlob());

/* create container if it does not already exists */
if (!context.getBlobStore().containerExists(container)) {
context.getBlobStore().createContainerInLocation(null, container);
}
}

@Override
public Cluster load() throws IOException {
Blob blob = context.getBlobStore().getBlob(container, blobName);
if (blob != null) {
return unserialize(spec,
IOUtils.toString(blob.getPayload().getInput(), "utf-8"));
}
return null;
}

@Override
public void save(Cluster cluster) throws IOException {
BlobStore store = context.getBlobStore();

Blob blob = store.newBlob(blobName);
blob.setPayload(serialize(cluster));
store.putBlob(container, blob);

LOG.info("Saved cluster state to '{}' ", context.getSigner()
.signGetBlob(container, blobName).getEndpoint().toString());
}

@Override
public void destroy() throws IOException {
context.getBlobStore().removeBlob(container, blobName);
}
}
69 changes: 69 additions & 0 deletions core/src/main/java/org/apache/whirr/service/ClusterStateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@
package org.apache.whirr.service;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.Set;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Sets;
import org.apache.whirr.Cluster;
import org.apache.whirr.ClusterSpec;
import org.apache.whirr.util.DnsUtil;
import org.jclouds.domain.Credentials;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Interface for cluster state storage facilities.
Expand Down Expand Up @@ -51,4 +63,61 @@ public abstract class ClusterStateStore {
*/
public abstract void destroy() throws IOException;


/**
* Create parser friendly string representation for a {@link Cluster}
*
* @param cluster
* @return String representation
* @throws IOException
*/
protected String serialize(Cluster cluster) throws IOException {
StringBuilder sb = new StringBuilder();

for (Cluster.Instance instance : cluster.getInstances()) {
String id = instance.getId();
String roles = Joiner.on(',').join(instance.getRoles());

String publicAddress = DnsUtil.resolveAddress(instance.getPublicAddress()
.getHostAddress());
String privateAddress = instance.getPrivateAddress().getHostAddress();

sb.append(id).append("\t");
sb.append(roles).append("\t");
sb.append(publicAddress).append("\t");
sb.append(privateAddress).append("\n");
}

return sb.toString();
}

/**
* Rebuild the {@link Cluster} instance by using the string representation
*
* @param spec
* @param content
* @return
* @throws UnknownHostException
*/
protected Cluster unserialize(ClusterSpec spec, String content) throws UnknownHostException {
Credentials credentials = new Credentials(spec.getClusterUser(), spec.getPrivateKey());
Set<Cluster.Instance> instances = Sets.newLinkedHashSet();

for(String line : Splitter.on("\n").split(content)) {
if (line.trim().equals("")) continue; /* ignore empty lines */
Iterator<String> fields = Splitter.on("\t").split(line).iterator();

String id = fields.next();
Set<String> roles = Sets.newLinkedHashSet(Splitter.on(",").split(fields.next()));
String publicAddress = fields.next();
String privateAddress = fields.next();

instances.add(new Cluster.Instance(credentials, roles,
InetAddress.getByName(publicAddress).getHostAddress(),
privateAddress, id, null));
}

return new Cluster(instances);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,57 @@

package org.apache.whirr.service;

import com.google.common.collect.Maps;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.whirr.Cluster;
import org.apache.whirr.ClusterSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;

/**
* A factory for ClusterStateStores.
*
*/
public class ClusterStateStoreFactory {

private static final Logger LOG = LoggerFactory
.getLogger(ClusterStateStoreFactory.class);

private class NoopClusterStateStore extends ClusterStateStore {
public NoopClusterStateStore() {
LOG.warn("No cluster state is going to be persisted. There is no easy " +
"way to retrieve instance roles after launch.");
}
@Override
public Cluster load() throws IOException {
return null;
}
@Override
public void save(Cluster cluster) throws IOException {
}
@Override
public void destroy() throws IOException {
}
}

public ClusterStateStore create(ClusterSpec spec) {
return create(spec, new PropertiesConfiguration());
}

public ClusterStateStore create(ClusterSpec spec, Configuration conf) {
return new FileClusterStateStore(spec);
if ("local".equals(spec.getStateStore())) {
return new FileClusterStateStore(spec);

} else if("blob".equals(spec.getStateStore())) {
return new BlobClusterStateStore(spec);

} else {
return new NoopClusterStateStore();
}
}

}
Loading

0 comments on commit fb826a9

Please sign in to comment.