Skip to content

Commit

Permalink
JAMES-2272 Migration should succeed when already applied
Browse files Browse the repository at this point in the history
concurrentMigrationIsNotAllowed test can not be return anymore as it was relying on the non indepotent behaviour to ensure tasks was run sequentially. However, using the taskManager, we ensure task, and thus migrations can run sequentially.
  • Loading branch information
chibenwa committed Jan 4, 2018
1 parent 955afb0 commit e4af1a8
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 79 deletions.
Expand Up @@ -58,7 +58,6 @@ public Optional<SchemaVersion> getLatestVersion() {


public Migration upgradeToVersion(SchemaVersion newVersion) { public Migration upgradeToVersion(SchemaVersion newVersion) {
SchemaVersion currentVersion = getCurrentVersion().orElse(DEFAULT_VERSION); SchemaVersion currentVersion = getCurrentVersion().orElse(DEFAULT_VERSION);
assertMigrationNeeded(newVersion, currentVersion);


Migration migrationCombination = IntStream.range(currentVersion.getValue(), newVersion.getValue()) Migration migrationCombination = IntStream.range(currentVersion.getValue(), newVersion.getValue())
.boxed() .boxed()
Expand All @@ -69,13 +68,6 @@ public Migration upgradeToVersion(SchemaVersion newVersion) {
return new MigrationTask(migrationCombination, newVersion); return new MigrationTask(migrationCombination, newVersion);
} }


private void assertMigrationNeeded(SchemaVersion newVersion, SchemaVersion currentVersion) {
boolean needMigration = currentVersion.isBefore(newVersion);
if (!needMigration) {
throw new IllegalStateException("Current version is already up to date");
}
}

private SchemaVersion validateVersionNumber(SchemaVersion versionNumber) { private SchemaVersion validateVersionNumber(SchemaVersion versionNumber) {
if (!allMigrationClazz.containsKey(versionNumber)) { if (!allMigrationClazz.containsKey(versionNumber)) {
String message = String.format("Can not migrate to %d. No migration class registered.", versionNumber.getValue()); String message = String.format("Can not migrate to %d. No migration class registered.", versionNumber.getValue());
Expand All @@ -94,7 +86,7 @@ private Migration toMigration(SchemaVersion version) {
SchemaVersion newVersion = version.next(); SchemaVersion newVersion = version.next();
SchemaVersion currentVersion = getCurrentVersion().orElse(DEFAULT_VERSION); SchemaVersion currentVersion = getCurrentVersion().orElse(DEFAULT_VERSION);
if (currentVersion.isAfterOrEquals(newVersion)) { if (currentVersion.isAfterOrEquals(newVersion)) {
return Migration.Result.PARTIAL; return Migration.Result.COMPLETED;
} }


LOG.info("Migrating to version {} ", newVersion); LOG.info("Migrating to version {} ", newVersion);
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang.NotImplementedException;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
import org.apache.james.backends.cassandra.versions.SchemaVersion; import org.apache.james.backends.cassandra.versions.SchemaVersion;
import org.apache.james.task.Task;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
Expand Down Expand Up @@ -91,12 +92,11 @@ public void getLatestVersionShouldReturnTheLatestVersion() throws Exception {
} }


@Test @Test
public void upgradeToVersionShouldThrowWhenCurrentVersionIsUpToDate() throws Exception { public void upgradeToVersionShouldNotThrowWhenCurrentVersionIsUpToDate() throws Exception {
expectedException.expect(IllegalStateException.class);

when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION))); when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));


testee.upgradeToVersion(OLDER_VERSION).run(); assertThat(testee.upgradeToVersion(OLDER_VERSION).run())
.isEqualTo(Task.Result.COMPLETED);
} }


@Test @Test
Expand All @@ -109,12 +109,12 @@ public void upgradeToVersionShouldUpdateToVersion() throws Exception {
} }


@Test @Test
public void upgradeToLastVersionShouldThrowWhenVersionIsUpToDate() throws Exception { public void upgradeToLastVersionShouldNotThrowWhenVersionIsUpToDate() throws Exception {
expectedException.expect(IllegalStateException.class);


when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION))); when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION)));


testee.upgradeToLastVersion().run(); assertThat(testee.upgradeToLastVersion().run())
.isEqualTo(Task.Result.COMPLETED);
} }


@Test @Test
Expand Down
Expand Up @@ -31,17 +31,13 @@


import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;


import org.apache.james.CassandraJmapTestRule; import org.apache.james.CassandraJmapTestRule;
import org.apache.james.DockerCassandraRule; import org.apache.james.DockerCassandraRule;
import org.apache.james.GuiceJamesServer; import org.apache.james.GuiceJamesServer;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
import org.apache.james.modules.MailboxProbeImpl; import org.apache.james.modules.MailboxProbeImpl;
import org.apache.james.probe.DataProbe; import org.apache.james.probe.DataProbe;
import org.apache.james.task.TaskManager;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.apache.james.utils.DataProbeImpl; import org.apache.james.utils.DataProbeImpl;
import org.apache.james.utils.WebAdminGuiceProbe; import org.apache.james.utils.WebAdminGuiceProbe;
import org.apache.james.webadmin.routes.DomainsRoutes; import org.apache.james.webadmin.routes.DomainsRoutes;
Expand All @@ -54,7 +50,6 @@
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;


import com.jayway.restassured.RestAssured; import com.jayway.restassured.RestAssured;
import com.jayway.restassured.builder.RequestSpecBuilder; import com.jayway.restassured.builder.RequestSpecBuilder;
Expand Down Expand Up @@ -275,40 +270,6 @@ public void postShouldDoMigrationAndUpdateToTheLatestVersion() throws Exception
.body(is("{\"version\":" + CassandraSchemaVersionManager.MAX_VERSION.getValue() + "}")); .body(is("{\"version\":" + CassandraSchemaVersionManager.MAX_VERSION.getValue() + "}"));
} }


@Test
public void concurrentMigrationIsNotAllowed() throws Exception {
ConcurrentLinkedQueue<String> taskIds = new ConcurrentLinkedQueue<>();
int threadCount = 2;
int operationCount = 1;
new ConcurrentTestRunner(threadCount, operationCount, (a, b) -> {
String migrationId = with()
.port(webAdminGuiceProbe.getWebAdminPort())
.post(UPGRADE_TO_LATEST_VERSION)
.jsonPath()
.get("taskId");
taskIds.add(migrationId);
}).run()
.awaitTermination(1, TimeUnit.MINUTES);

String id1 = taskIds.poll();
String id2 = taskIds.poll();
String status1 = with()
.port(webAdminGuiceProbe.getWebAdminPort())
.get("/tasks/" + id1 + "/await")
.jsonPath()
.get("status");
String status2 = with()
.port(webAdminGuiceProbe.getWebAdminPort())
.get("/tasks/" + id2 + "/await")
.jsonPath()
.get("status");

assertThat(ImmutableList.of(status1, status2))
.containsOnly(
TaskManager.Status.COMPLETED.getValue(),
TaskManager.Status.FAILED.getValue());
}

@Test @Test
public void addressGroupsEndpointShouldHandleRequests() throws Exception { public void addressGroupsEndpointShouldHandleRequests() throws Exception {
dataProbe.addAddressMapping("group", "domain.com", "user1@domain.com"); dataProbe.addAddressMapping("group", "domain.com", "user1@domain.com");
Expand Down
Expand Up @@ -201,26 +201,38 @@ public void postShouldDoMigrationToNewVersion() throws Exception {
} }


@Test @Test
public void postShouldNotDoMigrationWhenCurrentVersionIsNewerThan() throws Exception { public void postShouldCreateTaskWhenCurrentVersionIsNewerThan() throws Exception {
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION))); when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));


Map<String, Object> errors = given() String taskId = given()
.body(String.valueOf(OLDER_VERSION.getValue())) .body(String.valueOf(OLDER_VERSION.getValue()))
.with() .with()
.post("/upgrade") .post("/upgrade")
.jsonPath()
.get("taskId");

given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then() .then()
.statusCode(HttpStatus.CONFLICT_409) .body("status", is("completed"));
.contentType(ContentType.JSON) }
.extract()
.body() @Test
public void postShouldNotUpdateVersionWhenCurrentVersionIsNewerThan() throws Exception {
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(CURRENT_VERSION)));

String taskId = given()
.body(String.valueOf(OLDER_VERSION.getValue()))
.with()
.post("/upgrade")
.jsonPath() .jsonPath()
.getMap("."); .get("taskId");


assertThat(errors) with()
.containsEntry("statusCode", HttpStatus.CONFLICT_409) .basePath(TasksRoutes.BASE)
.containsEntry("type", "WrongState") .get(taskId + "/await");
.containsEntry("message", "The migration requested can not be performed")
.containsEntry("cause", "Current version is already up to date");


verify(schemaVersionDAO, times(1)).getCurrentSchemaVersion(); verify(schemaVersionDAO, times(1)).getCurrentSchemaVersion();
verifyNoMoreInteractions(schemaVersionDAO); verifyNoMoreInteractions(schemaVersionDAO);
Expand Down Expand Up @@ -280,24 +292,34 @@ public void createdTaskShouldHaveDetails() throws Exception {
} }


@Test @Test
public void postShouldNotDoMigrationToLatestVersionWhenItIsUpToDate() throws Exception { public void postShouldCreateTaskWhenItIsUpToDate() throws Exception {
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION))); when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION)));


Map<String, Object> errors = when() String taskId = with()
.post("/upgrade/latest") .post("/upgrade/latest")
.jsonPath()
.get("taskId");

given()
.basePath(TasksRoutes.BASE)
.when()
.get(taskId + "/await")
.then() .then()
.statusCode(HttpStatus.CONFLICT_409) .body("status", is("completed"));
.contentType(ContentType.JSON) }
.extract()
.body() @Test
public void postShouldNotUpdateVersionWhenItIsUpToDate() throws Exception {
when(schemaVersionDAO.getCurrentSchemaVersion()).thenReturn(CompletableFuture.completedFuture(Optional.of(LATEST_VERSION)));

String taskId = with()
.post("/upgrade/latest")
.jsonPath() .jsonPath()
.getMap("."); .get("taskId");


assertThat(errors) with()
.containsEntry("statusCode", HttpStatus.CONFLICT_409) .basePath(TasksRoutes.BASE)
.containsEntry("type", "WrongState") .get(taskId + "/await");
.containsEntry("message", "The migration requested can not be performed")
.containsEntry("cause", "Current version is already up to date");


verify(schemaVersionDAO, times(1)).getCurrentSchemaVersion(); verify(schemaVersionDAO, times(1)).getCurrentSchemaVersion();
verifyNoMoreInteractions(schemaVersionDAO); verifyNoMoreInteractions(schemaVersionDAO);
Expand Down

0 comments on commit e4af1a8

Please sign in to comment.