Skip to content

Commit

Permalink
MAILBOX-339 Rolling migration for mailboxPathV2
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa authored and mbaechler committed May 28, 2018
1 parent 9a515e0 commit 7b38d06
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
Expand Up @@ -26,6 +26,7 @@
import java.util.StringTokenizer; import java.util.StringTokenizer;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Function; import java.util.function.Function;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
Expand Down Expand Up @@ -108,12 +109,30 @@ private CompletableFuture<Optional<SimpleMailbox>> fromPreviousTable(MailboxPath
cassandraIdOptional cassandraIdOptional
.map(CassandraIdAndPath::getCassandraId) .map(CassandraIdAndPath::getCassandraId)
.map(this::retrieveMailbox) .map(this::retrieveMailbox)
.orElse(CompletableFuture.completedFuture(Optional.empty()))); .orElse(CompletableFuture.completedFuture(Optional.empty())))
.thenCompose(maybeMailbox -> maybeMailbox.map(this::migrate)
.orElse(CompletableFuture.completedFuture(maybeMailbox)));
} catch (CompletionException e) { } catch (CompletionException e) {
throw DriverExceptionHelper.handleStorageException(e); throw DriverExceptionHelper.handleStorageException(e);
} }
} }


private CompletableFuture<Optional<SimpleMailbox>> migrate(SimpleMailbox mailbox) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return mailboxPathV2DAO.save(mailbox.generateAssociatedPath(), mailboxId)
.thenCompose(success -> deleteIfSuccess(mailbox, success))
.thenApply(any -> Optional.of(mailbox));
}

private CompletionStage<Void> deleteIfSuccess(SimpleMailbox mailbox, boolean success) {
if (success) {
return mailboxPathDAO.delete(mailbox.generateAssociatedPath());
}
LOGGER.info("Concurrent execution lead to data race while migrating {} to 'mailboxPathV2DAO'.",
mailbox.generateAssociatedPath());
return CompletableFuture.completedFuture(null);
}

@Override @Override
public Mailbox findMailboxById(MailboxId id) throws MailboxException { public Mailbox findMailboxById(MailboxId id) throws MailboxException {
CassandraId mailboxId = (CassandraId) id; CassandraId mailboxId = (CassandraId) id;
Expand Down
Expand Up @@ -108,15 +108,16 @@ public void newValuesShouldNotBeSavedInOldDAO() throws Exception {
} }


@Test @Test
public void readingOldValuesShouldNotMigrateThem() throws Exception { public void readingOldValuesShouldMigrateThem() throws Exception {
daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).join(); daoV1.save(MAILBOX_PATH_1, MAILBOX_ID_1).join();
mailboxDAO.save(MAILBOX_1).join(); mailboxDAO.save(MAILBOX_1).join();


mailboxMapper.findMailboxByPath(MAILBOX_PATH_1); mailboxMapper.findMailboxByPath(MAILBOX_PATH_1);


SoftAssertions softly = new SoftAssertions(); SoftAssertions softly = new SoftAssertions();
softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).join()).isNotEmpty(); softly.assertThat(daoV1.retrieveId(MAILBOX_PATH_1).join()).isEmpty();
softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).join()).isEmpty(); softly.assertThat(daoV2.retrieveId(MAILBOX_PATH_1).join())
.contains(new CassandraIdAndPath(MAILBOX_ID_1, MAILBOX_PATH_1));
softly.assertAll(); softly.assertAll();
} }


Expand Down

0 comments on commit 7b38d06

Please sign in to comment.