Skip to content

Commit

Permalink
DO NOT REVIEW JAMES-2630 Migrate lasts CompletableFutur to Reactor #2156
Browse files Browse the repository at this point in the history
  • Loading branch information
mbaechler authored and Raphael Ouazana committed Feb 15, 2019
1 parent 429e9b1 commit ea3e231
Show file tree
Hide file tree
Showing 114 changed files with 876 additions and 1,766 deletions.
Expand Up @@ -67,12 +67,12 @@ public void clearAllTables() {
}

private Mono<Void> truncate(CassandraAsyncExecutor executor, String name) {
return executor.executeReactor(
return executor.execute(
QueryBuilder.select()
.from(name)
.limit(1)
.setFetchSize(1))
.filter(resultSet -> !resultSet.isExhausted())
.flatMap(ignored -> executor.executeVoidReactor(QueryBuilder.truncate(name)));
.flatMap(ignored -> executor.executeVoid(QueryBuilder.truncate(name)));
}
}
Expand Up @@ -29,7 +29,6 @@
import org.apache.james.backends.cassandra.components.CassandraTable;
import org.apache.james.backends.cassandra.components.CassandraType;
import org.apache.james.backends.cassandra.init.configuration.ClusterConfiguration;
import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;

Expand All @@ -51,8 +50,9 @@ private Session createSession(Cluster cluster, String keyspace) {
Session session = cluster.connect(keyspace);
try {
if (allOperationsAreFullyPerformed(session)) {
new CassandraSchemaVersionDAO(session, CassandraUtils.WITH_DEFAULT_CONFIGURATION)
.updateVersion(CassandraSchemaVersionManager.MAX_VERSION);
new CassandraSchemaVersionDAO(session)
.updateVersion(CassandraSchemaVersionManager.MAX_VERSION)
.block();
}
return session;
} catch (Exception e) {
Expand Down
Expand Up @@ -49,7 +49,7 @@ public CassandraMigrationService(CassandraSchemaVersionDAO schemaVersionDAO, Map
}

public Optional<SchemaVersion> getCurrentVersion() {
return schemaVersionDAO.getCurrentSchemaVersion().join();
return schemaVersionDAO.getCurrentSchemaVersion().block();
}

public Optional<SchemaVersion> getLatestVersion() {
Expand Down Expand Up @@ -91,7 +91,7 @@ private Migration toMigration(SchemaVersion version) {

logger.info("Migrating to version {} ", newVersion);
return allMigrationClazz.get(version).run()
.onComplete(() -> schemaVersionDAO.updateVersion(newVersion).join(),
.onComplete(() -> schemaVersionDAO.updateVersion(newVersion).block(),
() -> logger.info("Migrating to version {} done", newVersion))
.onFailure(() -> logger.warn(failureMessage(newVersion)),
() -> throwMigrationException(newVersion));
Expand Down
Expand Up @@ -20,15 +20,13 @@
package org.apache.james.backends.cassandra.utils;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import javax.inject.Inject;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;

import net.javacrumbs.futureconverter.java8guava.FutureConverter;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
Expand All @@ -42,48 +40,34 @@ public CassandraAsyncExecutor(Session session) {
this.session = session;
}

public CompletableFuture<ResultSet> execute(Statement statement) {
return executeReactor(statement).toFuture();
}

public CompletableFuture<Void> executeVoid(Statement statement) {
return executeVoidReactor(statement).toFuture();
}

public CompletableFuture<Optional<Row>> executeSingleRow(Statement statement) {
return executeSingleRowOptionalReactor(statement)
.toFuture();
}

public Mono<ResultSet> executeReactor(Statement statement) {
public Mono<ResultSet> execute(Statement statement) {
return Mono.defer(() -> Mono.fromFuture(FutureConverter
.toCompletableFuture(session.executeAsync(statement)))
.publishOn(Schedulers.elastic()));
}


public Mono<Boolean> executeReturnApplied(Statement statement) {
return executeSingleRowReactor(statement)
return executeSingleRow(statement)
.map(row -> row.getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED));
}

public Mono<Void> executeVoidReactor(Statement statement) {
return executeReactor(statement)
public Mono<Void> executeVoid(Statement statement) {
return execute(statement)
.then();
}

public Mono<Row> executeSingleRowReactor(Statement statement) {
return executeSingleRowOptionalReactor(statement)
public Mono<Row> executeSingleRow(Statement statement) {
return executeSingleRowOptional(statement)
.flatMap(Mono::justOrEmpty);
}

public Mono<Optional<Row>> executeSingleRowOptionalReactor(Statement statement) {
return executeReactor(statement)
public Mono<Optional<Row>> executeSingleRowOptional(Statement statement) {
return execute(statement)
.map(resultSet -> Optional.ofNullable(resultSet.one()));
}

public Mono<Boolean> executeReturnExists(Statement statement) {
return executeSingleRowReactor(statement)
return executeSingleRow(statement)
.hasElement();
}
}
Expand Up @@ -27,30 +27,28 @@
import static org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable.VALUE;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import javax.inject.Inject;

import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.backends.cassandra.versions.table.CassandraSchemaVersionTable;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.utils.UUIDs;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class CassandraSchemaVersionDAO {
private final PreparedStatement readVersionStatement;
private final PreparedStatement writeVersionStatement;
private CassandraUtils cassandraUtils;
private final CassandraAsyncExecutor cassandraAsyncExecutor;

@Inject
public CassandraSchemaVersionDAO(Session session, CassandraUtils cassandraUtils) {
public CassandraSchemaVersionDAO(Session session) {
cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
readVersionStatement = prepareReadVersionStatement(session);
writeVersionStatement = prepareWriteVersionStatement(session);
this.cassandraUtils = cassandraUtils;
}

private PreparedStatement prepareReadVersionStatement(Session session) {
Expand All @@ -66,15 +64,17 @@ private PreparedStatement prepareWriteVersionStatement(Session session) {
.value(VALUE, bindMarker(VALUE)));
}

public CompletableFuture<Optional<SchemaVersion>> getCurrentSchemaVersion() {
public Mono<Optional<SchemaVersion>> getCurrentSchemaVersion() {
return cassandraAsyncExecutor.execute(readVersionStatement.bind())
.thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
.map(row -> row.getInt(VALUE))
.reduce(Math::max))
.thenApply(i -> i.map(SchemaVersion::new));
.flatMapMany(Flux::fromIterable)
.map(row -> row.getInt(VALUE))
.reduce(Math::max)
.map(SchemaVersion::new)
.map(Optional::of)
.switchIfEmpty(Mono.just(Optional.empty()));
}

public CompletableFuture<Void> updateVersion(SchemaVersion newVersion) {
public Mono<Void> updateVersion(SchemaVersion newVersion) {
return cassandraAsyncExecutor.executeVoid(
writeVersionStatement.bind()
.setUUID(KEY, UUIDs.timeBased())
Expand Down
Expand Up @@ -68,7 +68,7 @@ public CassandraSchemaVersionManager(CassandraSchemaVersionDAO schemaVersionDAO,
public SchemaVersion computeVersion() {
return schemaVersionDAO
.getCurrentSchemaVersion()
.join()
.block()
.orElseGet(() -> {
LOGGER.warn("No schema version information found on Cassandra, we assume schema is at version {}",
CassandraSchemaVersionManager.DEFAULT_VERSION);
Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.apache.james.backends.cassandra.DockerCassandraExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.init.configuration.ClusterConfiguration;
import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
Expand Down Expand Up @@ -151,6 +150,6 @@ private CassandraSchemaVersionManager versionManager(Session session) {
}

private CassandraSchemaVersionDAO versionManagerDAO(Session session) {
return new CassandraSchemaVersionDAO(session, CassandraUtils.WITH_DEFAULT_CONFIGURATION);
return new CassandraSchemaVersionDAO(session);
}
}
Expand Up @@ -31,7 +31,6 @@

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -48,6 +47,7 @@

import com.datastax.driver.core.Session;
import com.google.common.collect.ImmutableMap;
import reactor.core.publisher.Mono;

public class CassandraMigrationServiceTest {
private static final SchemaVersion LATEST_VERSION = new SchemaVersion(3);
Expand All @@ -65,7 +65,7 @@ public class CassandraMigrationServiceTest {
@Before
public void setUp() throws Exception {
schemaVersionDAO = mock(CassandraSchemaVersionDAO.class);
when(schemaVersionDAO.updateVersion(any())).thenReturn(CompletableFuture.completedFuture(null));
when(schemaVersionDAO.updateVersion(any())).thenReturn(Mono.empty());

successfulMigration = mock(Migration.class);
when(successfulMigration.run()).thenReturn(Migration.Result.COMPLETED);
Expand All @@ -86,7 +86,7 @@ public void tearDown() {

@Test
public void getCurrentVersionShouldReturnCurrentVersion() {
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.of(CURRENT_VERSION)));

assertThat(testee.getCurrentVersion()).contains(CURRENT_VERSION);
}
Expand All @@ -98,15 +98,15 @@ public void getLatestVersionShouldReturnTheLatestVersion() {

@Test
public void upgradeToVersionShouldNotThrowWhenCurrentVersionIsUpToDate() {
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.of(CURRENT_VERSION)));

assertThat(testee.upgradeToVersion(OLDER_VERSION).run())
.isEqualTo(Task.Result.COMPLETED);
}

@Test
public void upgradeToVersionShouldUpdateToVersion() {
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.of(OLDER_VERSION)));

testee.upgradeToVersion(CURRENT_VERSION).run();

Expand All @@ -116,15 +116,15 @@ public void upgradeToVersionShouldUpdateToVersion() {
@Test
public void upgradeToLastVersionShouldNotThrowWhenVersionIsUpToDate() {

when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION)));
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.of(LATEST_VERSION)));

assertThat(testee.upgradeToLastVersion().run())
.isEqualTo(Task.Result.COMPLETED);
}

@Test
public void upgradeToLastVersionShouldUpdateToLatestVersion() {
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.of(OLDER_VERSION)));

testee.upgradeToLastVersion().run();

Expand All @@ -138,7 +138,7 @@ public void upgradeToVersionShouldThrowOnMissingVersion() {
.put(LATEST_VERSION, successfulMigration)
.build();
testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.of(OLDER_VERSION)));

expectedException.expect(NotImplementedException.class);

Expand All @@ -154,7 +154,7 @@ public void upgradeToVersionShouldUpdateIntermediarySuccessfulMigrationsInCaseOf
.put(LATEST_VERSION, successfulMigration)
.build();
testee = new CassandraMigrationService(schemaVersionDAO, allMigrationClazz, LATEST_VERSION);
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(OLDER_VERSION)));
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(Mono.just(Optional.of(OLDER_VERSION)));

expectedException.expect(RuntimeException.class);

Expand Down Expand Up @@ -209,19 +209,19 @@ public static class InMemorySchemaDAO extends CassandraSchemaVersionDAO {
private SchemaVersion currentVersion;

public InMemorySchemaDAO(SchemaVersion currentVersion) {
super(mock(Session.class), null);
super(mock(Session.class));
this.currentVersion = currentVersion;
}

@Override
public CompletableFuture<Optional<SchemaVersion>> getCurrentSchemaVersion() {
return CompletableFuture.completedFuture(Optional.of(currentVersion));
public Mono<Optional<SchemaVersion>> getCurrentSchemaVersion() {
return Mono.just(Optional.of(currentVersion));
}

@Override
public CompletableFuture<Void> updateVersion(SchemaVersion newVersion) {
public Mono<Void> updateVersion(SchemaVersion newVersion) {
currentVersion = newVersion;
return CompletableFuture.completedFuture(null);
return Mono.empty();
}
}
}
Expand Up @@ -25,18 +25,17 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.util.UUID;
import java.util.stream.IntStream;

import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.util.CompletableFutureUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.datastax.driver.core.DataType;
import com.datastax.driver.core.utils.UUIDs;
import reactor.core.publisher.Flux;

class PaggingTest {

Expand Down Expand Up @@ -65,22 +64,19 @@ void pagingShouldWork() {
int fetchSize = 200;
int size = 2 * fetchSize + 50;

CompletableFutureUtil.allOf(
IntStream.range(0, size)
.boxed()
.map(i ->
executor
.executeVoid(insertInto(TABLE_NAME)
.value(ID, UUID)
.value(CLUSTERING, i))))
.join();
Flux.range(0, size)
.flatMap(i -> executor
.executeVoid(insertInto(TABLE_NAME)
.value(ID, UUID)
.value(CLUSTERING, i)))
.blockLast();

assertThat(
executor.execute(select()
.from(TABLE_NAME)
.where(eq(ID, UUID))
.setFetchSize(fetchSize))
.join())
.from(TABLE_NAME)
.where(eq(ID, UUID))
.setFetchSize(fetchSize))
.block())
.hasSize(size);
}

Expand Down

0 comments on commit ea3e231

Please sign in to comment.