Skip to content

Commit

Permalink
JAMES-1908 add support for multiple cassandra node ip in configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Luc DUZAN authored and chibenwa committed Jan 23, 2017
1 parent 3581696 commit fabccf5
Show file tree
Hide file tree
Showing 12 changed files with 474 additions and 38 deletions.
@@ -0,0 +1,95 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.backends.cassandra.init;

import java.util.List;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;

public class CassandraNodeIpAndPort {
public static final int DEFAULT_CASSANDRA_PORT = 9042;

public static CassandraNodeIpAndPort parseConfString(String ipAndPort) {
Preconditions.checkNotNull(ipAndPort);
Preconditions.checkArgument(!ipAndPort.isEmpty());

List<String> parts = Splitter.on(':')
.trimResults()
.splitToList(ipAndPort);

if (parts.size() < 1 || parts.size() > 2) {
throw new IllegalArgumentException(ipAndPort + " is not a valid cassandra node");
}

String ip = parts.get(0);
int port = getPortFromConfPart(parts);

return new CassandraNodeIpAndPort(ip, port);
}

private static int getPortFromConfPart(List<String> parts) {
if (parts.size() == 2) {
return Integer.valueOf(parts.get(1));
} else {
return DEFAULT_CASSANDRA_PORT;
}
}

private final String ip;
private final int port;

public CassandraNodeIpAndPort(String ip, int port) {
this.ip = ip;
this.port = port;
}

public CassandraNodeIpAndPort(String ip) {
this(ip, DEFAULT_CASSANDRA_PORT);
}

public String getIp() {
return ip;
}

public int getPort() {
return port;
}

@Override
public int hashCode() {
return Objects.hashCode(ip, port);
}

@Override
public boolean equals(Object object) {
if (object instanceof CassandraNodeIpAndPort) {
CassandraNodeIpAndPort that = (CassandraNodeIpAndPort) object;
return Objects.equal(this.ip, that.ip) && Objects.equal(this.port, that.port);
}
return false;
}

@Override
public String toString() {
return this.ip + ":" + this.port;
}
}
Expand Up @@ -29,25 +29,14 @@

public class ClusterFactory {

public static class CassandraServer {
private final String ip;
private final int port;

public CassandraServer(String ip, int port) {
this.ip = ip;
this.port = port;
}
}

private final static String DEFAULT_CLUSTER_IP = "localhost";
private final static int DEFAULT_CLUSTER_PORT = 9042;

public static Cluster createClusterForClusterWithPassWord(List<CassandraServer> servers, String userName, String password,
public static Cluster createClusterForClusterWithPassWord(List<CassandraNodeIpAndPort> servers, String userName, String password,
Optional<Integer> refreshSchemaIntervalMillis) {

Cluster.Builder clusterBuilder = Cluster.builder();
servers.forEach(
(server) -> clusterBuilder.addContactPoint(server.ip).withPort(server.port)
(server) -> clusterBuilder.addContactPoint(server.getIp()).withPort(server.getPort())
);
if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) {
clusterBuilder.withCredentials(userName, password);
Expand All @@ -58,23 +47,23 @@ public static Cluster createClusterForClusterWithPassWord(List<CassandraServer>
return clusterBuilder.build();
}

public static Cluster createClusterForClusterWithoutPassWord(List<CassandraServer> servers) {
public static Cluster createClusterForClusterWithoutPassWord(List<CassandraNodeIpAndPort> servers) {
return createClusterForClusterWithPassWord(servers, null, null, Optional.empty());
}

public static Cluster createClusterForSingleServerWithPassWord(String ip, int port, String userName, String password) {
return createClusterForClusterWithPassWord(ImmutableList.of(new CassandraServer(ip, port)), userName, password, Optional.empty());
return createClusterForClusterWithPassWord(ImmutableList.of(new CassandraNodeIpAndPort(ip, port)), userName, password, Optional.empty());
}

public static Cluster createClusterForSingleServerWithoutPassWord(String ip, int port) {
return createClusterForClusterWithPassWord(ImmutableList.of(new CassandraServer(ip, port)), null, null, Optional.empty());
return createClusterForClusterWithPassWord(ImmutableList.of(new CassandraNodeIpAndPort(ip, port)), null, null, Optional.empty());
}

public static Cluster createTestingCluster(String ip, int port) {
return createClusterForClusterWithPassWord(ImmutableList.of(new CassandraServer(ip, port)), null, null, Optional.of(0));
return createClusterForClusterWithPassWord(ImmutableList.of(new CassandraNodeIpAndPort(ip, port)), null, null, Optional.of(0));
}

public static Cluster createDefaultSession() {
return createClusterForSingleServerWithoutPassWord(DEFAULT_CLUSTER_IP, DEFAULT_CLUSTER_PORT);
return createClusterForSingleServerWithoutPassWord(DEFAULT_CLUSTER_IP, CassandraNodeIpAndPort.DEFAULT_CASSANDRA_PORT);
}
}
@@ -0,0 +1,94 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.backends.cassandra.init;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class CassandraNodeIpAndPortTest {

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void parseConfStringShouldParseConfWithIpAndPort() {
//Given
String ipAndPort = "142.145.254.111:44";
int expectedPort = 44;
String expectedIp = "142.145.254.111";

//When
CassandraNodeIpAndPort actual = CassandraNodeIpAndPort.parseConfString(ipAndPort);

//Then
assertThat(actual).isEqualTo(new CassandraNodeIpAndPort(expectedIp, expectedPort));
}

@Test
public void parseConfStringShouldParseConfWithIpOnly() {
//Given
String ipAndPort = "142.145.254.111";
int expectedPort = CassandraNodeIpAndPort.DEFAULT_CASSANDRA_PORT;
String expectedIp = "142.145.254.111";

//When
CassandraNodeIpAndPort actual = CassandraNodeIpAndPort.parseConfString(ipAndPort);

//Then
assertThat(actual).isEqualTo(new CassandraNodeIpAndPort(expectedIp, expectedPort));
}

@Test
public void parseConfStringShouldFailWhenConfigIsAnEmptyString() {
expectedException.expect(IllegalArgumentException.class);

//Given
String ipAndPort = "";

//When
CassandraNodeIpAndPort.parseConfString(ipAndPort);
}

@Test
public void parseConfStringShouldFailWhenConfigIsANullString() {
expectedException.expect(NullPointerException.class);

//Given
String ipAndPort = null;

//When
CassandraNodeIpAndPort.parseConfString(ipAndPort);
}


@Test
public void parseConfStringShouldFailWhenConfigIsInvalid() {
expectedException.expect(IllegalArgumentException.class);

//Given
String ipAndPort = "10.10.10.10:42:43";

//When
CassandraNodeIpAndPort.parseConfString(ipAndPort);
}
}
@@ -1,7 +1,5 @@
# Configuration file for cassandra mailbox

cassandra.ip=cassandra
cassandra.port=9042
cassandra.nodes=cassandra
cassandra.keyspace=apache_james
cassandra.replication.factor=1
cassandra.retryConnection.maxRetries=10
Expand Down
6 changes: 6 additions & 0 deletions server/container/guice/cassandra-guice/pom.xml
Expand Up @@ -243,6 +243,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>james-server-util-java8</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand Down
@@ -1,6 +1,5 @@
# Configuration file for cassandra mailbox

cassandra.ip=172.17.0.2
cassandra.port=9042
cassandra.nodes=172.17.0.2:9042
cassandra.keyspace=apache_james
cassandra.replication.factor=1
@@ -0,0 +1,31 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.modules.mailbox;

import java.io.FileNotFoundException;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;

public interface CassandraSessionConfiguration {

PropertiesConfiguration getConfiguration() throws FileNotFoundException, ConfigurationException;

}
Expand Up @@ -19,6 +19,8 @@
package org.apache.james.modules.mailbox;

import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -27,6 +29,7 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.init.CassandraModuleComposite;
import org.apache.james.backends.cassandra.init.CassandraNodeIpAndPort;
import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
import org.apache.james.backends.cassandra.init.ClusterFactory;
import org.apache.james.backends.cassandra.init.ClusterWithKeyspaceCreatedFactory;
Expand All @@ -36,6 +39,7 @@
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.github.steveash.guavate.Guavate;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
Expand All @@ -54,7 +58,7 @@ protected void configure() {
Multibinder<CassandraModule> cassandraDataDefinitions = Multibinder.newSetBinder(binder(), CassandraModule.class);
cassandraDataDefinitions.addBinding().to(CassandraZonedDateTimeModule.class);
}

@Provides
@Singleton
CassandraModule composeDataDefinitions(Set<CassandraModule> modules) {
Expand All @@ -63,30 +67,42 @@ CassandraModule composeDataDefinitions(Set<CassandraModule> modules) {

@Provides
@Singleton
Session provideSession(FileSystem fileSystem, Cluster cluster, CassandraModule cassandraModule)
Session provideSession(CassandraSessionConfiguration configuration, Cluster cluster, CassandraModule cassandraModule)
throws FileNotFoundException, ConfigurationException{
PropertiesConfiguration configuration = getConfiguration(fileSystem);
String keyspace = configuration.getString("cassandra.keyspace");
String keyspace = configuration.getConfiguration().getString("cassandra.keyspace");
return new SessionWithInitializedTablesFactory(cassandraModule).createSession(cluster, keyspace);
}

@Provides
@Singleton
Cluster provideCluster(FileSystem fileSystem, AsyncRetryExecutor executor) throws FileNotFoundException, ConfigurationException, ExecutionException, InterruptedException {
PropertiesConfiguration configuration = getConfiguration(fileSystem);
CassandraSessionConfiguration getCassandraSessionConfiguration(FileSystem fileSystem) {
return () -> getConfiguration(fileSystem);
}

@Provides
@Singleton
Cluster provideCluster(CassandraSessionConfiguration cassandraSessionConfiguration, AsyncRetryExecutor executor) throws FileNotFoundException, ConfigurationException, ExecutionException, InterruptedException {
PropertiesConfiguration configuration = cassandraSessionConfiguration.getConfiguration();
List<CassandraNodeIpAndPort> servers = listCassandraServers(configuration);

return getRetryer(executor, configuration)
.getWithRetry(ctx -> ClusterWithKeyspaceCreatedFactory
.config(
ClusterFactory.createClusterForSingleServerWithoutPassWord(
configuration.getString("cassandra.ip"),
configuration.getInt("cassandra.port")),
ClusterFactory.createClusterForClusterWithoutPassWord(servers),
configuration.getString("cassandra.keyspace"))
.replicationFactor(configuration.getInt("cassandra.replication.factor"))
.clusterWithInitializedKeyspace())
.get();
}

private List<CassandraNodeIpAndPort> listCassandraServers(PropertiesConfiguration configuration) {
String[] ipAndPorts = configuration.getStringArray("cassandra.nodes");

return Arrays.stream(ipAndPorts)
.map(CassandraNodeIpAndPort::parseConfString)
.collect(Guavate.toImmutableList());
}

private static AsyncRetryExecutor getRetryer(AsyncRetryExecutor executor, PropertiesConfiguration configuration) {
return executor.retryOn(NoHostAvailableException.class)
.withProportionalJitter()
Expand Down

0 comments on commit fabccf5

Please sign in to comment.