Skip to content

Commit

Permalink
Merge pull request #1614 from hchiorean/MODE-2635
Browse files Browse the repository at this point in the history
MODE-2635 Adds replica-set support for the Mongo binary store to the JSON and WF configs
  • Loading branch information
Horia Chiorean committed Dec 13, 2016
2 parents 8b70c1a + b9f6958 commit e0d96e3
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 82 deletions.
Expand Up @@ -57,6 +57,10 @@ protected void writeBinaryStorageConfiguration( String repositoryName,
if (passwordModel.isDefined()) {
binaries.setString(FieldName.USER_PASSWORD, passwordModel.asString());
}
ModelNode hostAddressesModel = ModelAttributes.MONGO_HOST_ADDRESSES.resolveModelAttribute(context, model);
if (hostAddressesModel.isDefined()) {
binaries.setArray(FieldName.HOST_ADDRESSES, (Object[]) hostAddressesModel.asString().split(","));
}
}

@Override
Expand Down
Expand Up @@ -109,7 +109,8 @@ public enum Attribute {
PORT("port"),
DATABASE("database"),
REINDEXING_MODE("mode"),
BUCKET_NAME("bucket-name");
BUCKET_NAME("bucket-name"),
HOST_ADDRESSES("host-addresses");

private final String name;

Expand Down
Expand Up @@ -20,6 +20,7 @@
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.OP_ADDR;
import static org.jboss.as.controller.descriptions.ModelDescriptionConstants.SUBSYSTEM;
import static org.jboss.as.controller.parsing.ParseUtils.requireNoAttributes;

import java.util.ArrayList;
import java.util.List;
import javax.xml.stream.XMLStreamConstants;
Expand Down Expand Up @@ -814,6 +815,9 @@ private ModelNode parseMongoBinaryStorage(final XMLExtendedStreamReader reader,
case PASSWORD:
ModelAttributes.MONGO_PASSWORD.parseAndSetParameter(attrValue, storageType, reader);
break;
case HOST_ADDRESSES:
ModelAttributes.MONGO_HOST_ADDRESSES.parseAndSetParameter(attrValue, storageType, reader);
break;
case MIN_VALUE_SIZE:
ModelAttributes.MINIMUM_BINARY_SIZE.parseAndSetParameter(attrValue, storageType, reader);
break;
Expand Down
Expand Up @@ -340,6 +340,7 @@ private void writeBinaryStorageModel(XMLExtendedStreamWriter writer,
ModelAttributes.MONGO_DATABASE.marshallAsAttribute(storage, false, writer);
ModelAttributes.MONGO_USERNAME.marshallAsAttribute(storage, false, writer);
ModelAttributes.MONGO_PASSWORD.marshallAsAttribute(storage, false, writer);
ModelAttributes.MONGO_HOST_ADDRESSES.marshallAsAttribute(storage, false, writer);
writer.writeEndElement();
} else if (ModelKeys.S3_BINARY_STORAGE.equals(storageType)) {
writer.writeStartElement(Element.S3_BINARY_STORAGE.getLocalName());
Expand Down
Expand Up @@ -16,6 +16,7 @@
package org.modeshape.jboss.subsystem;

import static org.modeshape.jboss.subsystem.ModeShapeExtension.JBOSS_DATA_DIR_VARIABLE;

import org.jboss.as.controller.AttributeDefinition;
import org.jboss.as.controller.ListAttributeDefinition;
import org.jboss.as.controller.SimpleAttributeDefinition;
Expand Down Expand Up @@ -945,6 +946,14 @@ public class ModelAttributes {
.setAllowNull(true)
.setFlags(AttributeAccess.Flag.RESTART_RESOURCE_SERVICES)
.build();

public static final MappedSimpleAttributeDefinition MONGO_HOST_ADDRESSES =
new MappedAttributeDefinitionBuilder(Attribute.HOST_ADDRESSES.getLocalName(), ModelType.STRING,
FieldName.STORAGE, FieldName.BINARY_STORAGE, FieldName.HOST_ADDRESSES)
.setAllowExpression(true)
.setAllowNull(true)
.setFlags(AttributeAccess.Flag.RESTART_RESOURCE_SERVICES)
.build();

public static final MappedSimpleAttributeDefinition S3_USERNAME =
new MappedAttributeDefinitionBuilder(Attribute.USERNAME.getLocalName(), ModelType.STRING,
Expand Down Expand Up @@ -1001,8 +1010,8 @@ public class ModelAttributes {
public static final AttributeDefinition[] S3_BINARY_STORAGE_ATTRIBUTES = {MINIMUM_BINARY_SIZE, MINIMUM_STRING_SIZE,
MIME_TYPE_DETECTION, S3_USERNAME, S3_PASSWORD, S3_BUCKET_NAME};

public static final AttributeDefinition[] MONGO_BINARY_STORAGE_ATTRIBUTES = {MINIMUM_BINARY_SIZE, MINIMUM_STRING_SIZE,
MIME_TYPE_DETECTION, MONGO_HOST, MONGO_PORT, MONGO_DATABASE, MONGO_USERNAME, MONGO_PASSWORD};
public static final AttributeDefinition[] MONGO_BINARY_STORAGE_ATTRIBUTES = { MINIMUM_BINARY_SIZE, MINIMUM_STRING_SIZE,
MIME_TYPE_DETECTION, MONGO_HOST, MONGO_PORT, MONGO_DATABASE, MONGO_USERNAME, MONGO_PASSWORD, MONGO_HOST_ADDRESSES };

public static final AttributeDefinition[] COMPOSITE_BINARY_STORAGE_ATTRIBUTES = {MINIMUM_BINARY_SIZE, MINIMUM_STRING_SIZE,
NESTED_STORES, MIME_TYPE_DETECTION};
Expand Down
Expand Up @@ -242,6 +242,7 @@ modeshape.repository.mongo-binary-storage.port = The MongoDB port
modeshape.repository.mongo-binary-storage.database = The name of the Mongo database used to store the binary values
modeshape.repository.mongo-binary-storage.username = The Mongo user name
modeshape.repository.mongo-binary-storage.password = The Mongo password
modeshape.repository.mongo-binary-storage.host-addresses = The comma-separated list of hostname:port pairs for the servers which make up a MongoDB replica set

modeshape.repository.s3-binary-storage = Store binary values in Amazon S3
modeshape.repository.s3-binary-storage.add = Store binary values in Amazon S3
Expand Down
Expand Up @@ -1054,12 +1054,12 @@
</xs:attribute>
</xs:attributeGroup>
<xs:attributeGroup name="mongo-storage-attributes">
<xs:attribute name="host" type="xs:string" use="required">
<xs:attribute name="host" type="xs:string" use="optional">
<xs:annotation>
<xs:documentation>The MongoDB host name</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="port" type="xs:integer" use="required">
<xs:attribute name="port" type="xs:integer" use="optional">
<xs:annotation>
<xs:documentation>The MongoDB port number</xs:documentation>
</xs:annotation>
Expand All @@ -1078,6 +1078,11 @@
<xs:annotation>
<xs:documentation>The DB user password</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="host-addresses" type="xs:string" use="optional">
<xs:annotation>
<xs:documentation>The comma-separated list of hostname:port pairs for the servers which make up a MongoDB replica set</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:attributeGroup>
<xs:attributeGroup name="s3-storage-attributes">
Expand Down
@@ -1,6 +1,6 @@
<subsystem xmlns="urn:jboss:domain:modeshape:3.0">
<repository name="sample">
<mongo-binary-storage mime-type-detection="content" min-string-size="10" min-value-size="4096" database="test"
host="localhost" port="100" password="test" username="test"/>
host="localhost" port="100" password="test" username="test" host-addresses="192.1.68.1.1:90, 143.22.33.123:120"/>
</repository>
</subsystem>
Expand Up @@ -447,6 +447,8 @@ public static class FieldName {
public static final String DOCUMENT_OPTIMIZATION = "documentOptimization";
public static final String OPTIMIZATION_CHILD_COUNT_TARGET = "childCountTarget";
public static final String OPTIMIZATION_CHILD_COUNT_TOLERANCE = "childCountTolerance";

public static final String HOST_ADDRESSES = "hostAddresses";

/**
* The name for the field (under "sequencing" and "textExtraction") specifying the thread pool that should be used for sequencing.
Expand Down Expand Up @@ -1139,6 +1141,7 @@ public long getMinimumStringSize() {
return binaryStorage.getLong(FieldName.MINIMUM_STRING_SIZE, getMinimumBinarySizeInBytes());
}

@SuppressWarnings("unchecked")
public BinaryStore getBinaryStore() throws Exception {
String type = getType();
BinaryStore store = null;
Expand Down Expand Up @@ -1202,7 +1205,8 @@ public BinaryStore getBinaryStore() throws Exception {
String database = binaryStorage.getString(FieldName.DATABASE);
String username = binaryStorage.getString(FieldName.USER_NAME);
String password = binaryStorage.getString(FieldName.USER_PASSWORD);
store = new MongodbBinaryStore(host, port, database, username, password, null);
List<String> hostAddresses = (List<String>) binaryStorage.getArray(FieldName.HOST_ADDRESSES);
store = new MongodbBinaryStore(host, port, database, username, password, hostAddresses);
} else if (type.equalsIgnoreCase(FieldValue.BINARY_STORAGE_TYPE_S3)) {
String username = binaryStorage.getString(FieldName.USER_NAME);
String password = binaryStorage.getString(FieldName.USER_PASSWORD);
Expand Down
Expand Up @@ -19,12 +19,13 @@
import java.io.OutputStream;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.modeshape.common.util.IoUtil;
import org.modeshape.common.util.StringUtil;
import org.modeshape.jcr.JcrI18n;
Expand Down Expand Up @@ -72,88 +73,89 @@ public class MongodbBinaryStore extends AbstractBinaryStore {
// database name
private String database;

private String host;
private int port;

// credentials
private String username;
private String password;

// server address(es)
private Set<String> replicaSet = new HashSet<>();
// server address(es) - note that order is important
private Set<String> hostAddresses = new LinkedHashSet<>();

// database instance
private DB db;

// chunk size in bytes
protected int chunkSize = 1024;

/**
* Creates a new instance of the store, using a single MongoDB.
*
* @param host
* @param port
* @param database
*/
public MongodbBinaryStore( String host,
int port,
String database ) {
this(host, port, database, null, null, null);
}

/**
* Creates a new mongo binary store instance using the supplied params.
*
* @param host the mongo host; may not be null
* @param port the port
* @param host the mongo primary host; may be null in which case {@code hostAddresses} has to be provided
* @param port the port of the primary host; may be null in which case {@code hostAddresses} has to be provided
* @param database the name of the database; may be null in which case a default will be used
* @param username the username; may be null
* @param password the password; may be null
* @param replicaSet a {@link Set} of (host:port) pairs representing multiple server addresses; may be null
* @param hostAddresses a {@link List} of (host:port) pairs representing multiple server addresses; may be null
*/
public MongodbBinaryStore(String host, int port, String database, String username, String password, Set<String> replicaSet) {
public MongodbBinaryStore(String host, Integer port, String database, String username, String password, List<String> hostAddresses) {
this.cache = TransientBinaryStore.get();
this.host = Objects.requireNonNull(host);
this.port = port;
this.database = !StringUtil.isBlank(database) ? database : DEFAULT_DB_NAME;
this.username = username;
this.password = password;
this.replicaSet = replicaSet == null ? Collections.emptySet() : replicaSet;
boolean hostAddressesProvided = hostAddresses != null && !hostAddresses.isEmpty();
this.hostAddresses = new LinkedHashSet<>();
String defaultServer = !StringUtil.isBlank(host) && port != null ? host + ":" + port : null;
if (defaultServer == null && !hostAddressesProvided) {
throw new IllegalArgumentException("Invalid Mongo binary store configuration: either (host and port) or host addresses have to provided");
}
if (defaultServer != null) {
this.hostAddresses.add(defaultServer);
}
if (hostAddressesProvided) {
this.hostAddresses.addAll(hostAddresses);
}
}

/**
* Converts list of addresses specified in text format to mongodb specific address.
*
* @param addresses list of addresses in text format
* @return list of mongodb addresses
* @throws UnknownHostException when at least one host is unknown
* @throws IllegalArgumentException if address has bad format
* @throws IllegalArgumentException if address has bad format or is not valid
*/
private List<ServerAddress> replicaSet( Set<String> addresses ) throws UnknownHostException {
List<ServerAddress> list = new ArrayList<ServerAddress>();
for (String address : addresses) {
// address has format <host:port>
String[] tokens = address.split(":");

// checking tokens number after split
if (tokens.length != 2) {
throw new IllegalArgumentException("Wrong address format: " + address);
}

String host = tokens[0];

// convert port number
int port;
try {
port = Integer.parseInt(tokens[1]);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Wrong address format: " + address);
}

list.add(new ServerAddress(host, port));
private List<ServerAddress> convertToServerAddresses(Set<String> addresses) {
return addresses.stream()
.map(this::stringToServerAddress)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

private ServerAddress stringToServerAddress(String address) {
if (address == null || address.trim().length() == 0) {
return null;
}
// address has format <host:port>
String[] tokens = address.split(":");

// checking tokens number after split
if (tokens.length != 2) {
throw new IllegalArgumentException("Wrong address format: " + address + " (expected host:port)") ;
}

String host = tokens[0];

// convert port number
int port;
try {
port = Integer.parseInt(tokens[1]);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Wrong address format: " + address + " (expected host:port)");
}

try {
return new ServerAddress(host, port);
} catch (UnknownHostException e) {
throw new IllegalArgumentException(e);
}

return list;
}

/**
Expand Down Expand Up @@ -307,7 +309,7 @@ private Set<String> getStoredKeys( boolean onlyUsed ) {
}
DBCollection collection = db.getCollection(collectionName);
boolean unused = (Boolean)getAttribute(collection, FIELD_UNUSED);
if ((unused && !onlyUsed) || !unused) {
if (!unused || !onlyUsed) {
storedKeys.add(collectionName);
}
}
Expand All @@ -333,20 +335,11 @@ private void initMongo(String username, String password) {
}

// connect to database
try {
MongoClient client = null;
if (!replicaSet.isEmpty()) {
client = new MongoClient(replicaSet(replicaSet), credentials);
} else if (!StringUtil.isBlank(host)) {
client = new MongoClient(new ServerAddress(host, port), credentials);
} else {
client = new MongoClient(new ServerAddress(), credentials);
}
client.setWriteConcern(WriteConcern.ACKNOWLEDGED);
db = client.getDB(database);
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
MongoClient client = hostAddresses.size() > 1 ?
new MongoClient(convertToServerAddresses(hostAddresses), credentials) :
new MongoClient(stringToServerAddress(hostAddresses.iterator().next()), credentials);
client.setWriteConcern(WriteConcern.ACKNOWLEDGED);
db = client.getDB(database);
}

/**
Expand Down
Expand Up @@ -420,12 +420,10 @@
"host" : {
"type" : "string",
"description" : "The name of the MongoDB host",
"required" : true,
},
"port" : {
"type" : "integer",
"description" : "The MongoDB port",
"required" : true,
},
"database" : {
"type" : "string",
Expand All @@ -440,6 +438,11 @@
"type" : "string",
"description" : "The password to use when connecting to MongoDB",
},
"hostAddresses" : {
"type" : "array",
"uniqueItems" : true,
"description" : "The optional array of hostname:port values representing MongoDB server addresses for a replica set"
},
"minimumBinarySizeInBytes" : {
"type" : "integer",
"default" : 4096,
Expand Down

0 comments on commit e0d96e3

Please sign in to comment.