Skip to content

Commit

Permalink
MAILBOX-339 Implementing migration to remove UDT primary key
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa authored and mbaechler committed May 28, 2018
1 parent 11fed81 commit 9a515e0
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

public class CassandraSchemaVersionManager {
public static final SchemaVersion MIN_VERSION = new SchemaVersion(2);
public static final SchemaVersion MAX_VERSION = new SchemaVersion(5);
public static final SchemaVersion MAX_VERSION = new SchemaVersion(6);
public static final SchemaVersion DEFAULT_VERSION = MIN_VERSION;

private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSchemaVersionManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.james.mailbox.cassandra.mail;

import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.driver.core.querybuilder.QueryBuilder.count;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
Expand Down Expand Up @@ -53,13 +54,17 @@

public class CassandraMailboxPathDAOImpl implements CassandraMailboxPathDAO {

private static final int FIRST_CELL = 0;

private final CassandraAsyncExecutor cassandraAsyncExecutor;
private final MailboxBaseTupleUtil mailboxBaseTupleUtil;
private final CassandraUtils cassandraUtils;
private final PreparedStatement delete;
private final PreparedStatement insert;
private final PreparedStatement select;
private final PreparedStatement selectAllForUser;
private final PreparedStatement selectAll;
private final PreparedStatement countAll;

@Inject
public CassandraMailboxPathDAOImpl(Session session, CassandraTypesProvider typesProvider, CassandraUtils cassandraUtils) {
Expand All @@ -69,7 +74,9 @@ public CassandraMailboxPathDAOImpl(Session session, CassandraTypesProvider types
this.insert = prepareInsert(session);
this.delete = prepareDelete(session);
this.select = prepareSelect(session);
this.selectAllForUser = prepareSelectAllForUser(session);
this.selectAll = prepareSelectAll(session);
this.countAll = prepareCountAll(session);
}

@VisibleForTesting
Expand Down Expand Up @@ -99,12 +106,22 @@ private PreparedStatement prepareSelect(Session session) {
.and(eq(MAILBOX_NAME, bindMarker(MAILBOX_NAME))));
}

private PreparedStatement prepareSelectAll(Session session) {
private PreparedStatement prepareSelectAllForUser(Session session) {
return session.prepare(select(FIELDS)
.from(TABLE_NAME)
.where(eq(NAMESPACE_AND_USER, bindMarker(NAMESPACE_AND_USER))));
}

private PreparedStatement prepareSelectAll(Session session) {
return session.prepare(select(FIELDS)
.from(TABLE_NAME));
}

private PreparedStatement prepareCountAll(Session session) {
return session.prepare(select(count(NAMESPACE_AND_USER))
.from(TABLE_NAME));
}

public CompletableFuture<Optional<CassandraIdAndPath>> retrieveId(MailboxPath mailboxPath) {
return cassandraAsyncExecutor.executeSingleRow(
select.bind()
Expand All @@ -118,7 +135,7 @@ public CompletableFuture<Optional<CassandraIdAndPath>> retrieveId(MailboxPath ma
@Override
public CompletableFuture<Stream<CassandraIdAndPath>> listUserMailboxes(String namespace, String user) {
return cassandraAsyncExecutor.execute(
selectAll.bind()
selectAllForUser.bind()
.setUDTValue(NAMESPACE_AND_USER, mailboxBaseTupleUtil.createMailboxBaseUDT(namespace, user)))
.thenApply(resultSet -> cassandraUtils.convertToStream(resultSet)
.map(this::fromRowToCassandraIdAndPath)
Expand Down Expand Up @@ -182,4 +199,15 @@ public CompletableFuture<Void> delete(MailboxPath mailboxPath) {
.setString(MAILBOX_NAME, mailboxPath.getName()));
}

public CompletableFuture<Stream<CassandraIdAndPath>> readAll() {
return cassandraAsyncExecutor.execute(selectAll.bind())
.thenApply(cassandraUtils::convertToStream)
.thenApply(stream -> stream.map(this::fromRowToCassandraIdAndPath));
}

public CompletableFuture<Long> countAll() {
return cassandraAsyncExecutor.executeSingleRow(countAll.bind())
.thenApply(optional -> optional.map(row -> row.getLong(FIRST_CELL)).orElse(0L));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.cassandra.mail.migration;

import java.util.Optional;
import java.util.function.Supplier;

import javax.inject.Inject;

import org.apache.james.backends.cassandra.migration.Migration;
import org.apache.james.mailbox.cassandra.mail.CassandraIdAndPath;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathDAOImpl;
import org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV2DAO;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MailboxPathV2Migration implements Migration {

public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
private final Supplier<Long> countSupplier;
private final long initialCount;

public AdditionalInformation(Supplier<Long> countSupplier) {
this.countSupplier = countSupplier;
this.initialCount = countSupplier.get();
}

public long getRemainingCount() {
return countSupplier.get();
}

public long getInitialCount() {
return initialCount;
}
}

public static final Logger LOGGER = LoggerFactory.getLogger(MailboxPathV2Migration.class);
private final CassandraMailboxPathDAOImpl daoV1;
private final CassandraMailboxPathV2DAO daoV2;
private final AdditionalInformation additionalInformation;

@Inject
public MailboxPathV2Migration(CassandraMailboxPathDAOImpl daoV1, CassandraMailboxPathV2DAO daoV2) {
this.daoV1 = daoV1;
this.daoV2 = daoV2;
this.additionalInformation = new AdditionalInformation(() -> daoV1.countAll().join());
}

@Override
public Result run() {
try {
return daoV1.readAll()
.join()
.map(this::migrate)
.reduce(Result.COMPLETED, Task::combine);
} catch (Exception e) {
LOGGER.error("Error while performing migration", e);
return Result.PARTIAL;
}
}

public Result migrate(CassandraIdAndPath idAndPath) {
try {
daoV2.save(idAndPath.getMailboxPath(), idAndPath.getCassandraId()).join();

daoV1.delete(idAndPath.getMailboxPath()).join();
return Result.COMPLETED;
} catch (Exception e) {
LOGGER.error("Error while performing migration for path {}", idAndPath.getMailboxPath(), e);
return Result.PARTIAL;
}
}

@Override
public String type() {
return "Cassandra_mailboxPathV2Migration";
}

@Override
public Optional<TaskExecutionDetails.AdditionalInformation> details() {
return Optional.of(additionalInformation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,59 @@

package org.apache.james.mailbox.cassandra.mail;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.Test;

import com.github.steveash.guavate.Guavate;

public class CassandraMailboxPathDAOImplTest extends CassandraMailboxPathDAOTest {

@Override
CassandraMailboxPathDAO testee() {
return new CassandraMailboxPathDAOImpl(cassandra.getConf(), cassandra.getTypesProvider());
}

@Test
public void countAllShouldReturnEntryCount() {
testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join();
testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).join();
testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).join();

CassandraMailboxPathDAOImpl daoV1 = (CassandraMailboxPathDAOImpl) testee;

assertThat(daoV1.countAll().join())
.isEqualTo(3);
}

@Test
public void countAllShouldReturnZeroByDefault() {
CassandraMailboxPathDAOImpl daoV1 = (CassandraMailboxPathDAOImpl) testee;

assertThat(daoV1.countAll().join())
.isEqualTo(0);
}

@Test
public void readAllShouldReturnAllStoredData() {
testee.save(USER_INBOX_MAILBOXPATH, INBOX_ID).join();
testee.save(USER_OUTBOX_MAILBOXPATH, OUTBOX_ID).join();
testee.save(OTHER_USER_MAILBOXPATH, otherMailboxId).join();

CassandraMailboxPathDAOImpl daoV1 = (CassandraMailboxPathDAOImpl) testee;

assertThat(daoV1.readAll().join().collect(Guavate.toImmutableList()))
.containsOnly(
new CassandraIdAndPath(INBOX_ID, USER_INBOX_MAILBOXPATH),
new CassandraIdAndPath(OUTBOX_ID, USER_OUTBOX_MAILBOXPATH),
new CassandraIdAndPath(otherMailboxId, OTHER_USER_MAILBOXPATH));
}

@Test
public void readAllShouldReturnEmptyByDefault() {
CassandraMailboxPathDAOImpl daoV1 = (CassandraMailboxPathDAOImpl) testee;

assertThat(daoV1.readAll().join().collect(Guavate.toImmutableList()))
.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
import nl.jqno.equalsverifier.EqualsVerifier;

public abstract class CassandraMailboxPathDAOTest {
private static final String USER = "user";
private static final String OTHER_USER = "other";
private static final CassandraId INBOX_ID = CassandraId.timeBased();
private static final CassandraId OUTBOX_ID = CassandraId.timeBased();
private static final CassandraId otherMailboxId = CassandraId.timeBased();
protected static final String USER = "user";
protected static final String OTHER_USER = "other";
protected static final CassandraId INBOX_ID = CassandraId.timeBased();
protected static final CassandraId OUTBOX_ID = CassandraId.timeBased();
protected static final CassandraId otherMailboxId = CassandraId.timeBased();

public static final MailboxPath USER_INBOX_MAILBOXPATH = MailboxPath.forUser(USER, "INBOX");
public static final CassandraIdAndPath INBOX_ID_AND_PATH = new CassandraIdAndPath(INBOX_ID, USER_INBOX_MAILBOXPATH);
Expand All @@ -53,15 +53,14 @@ public abstract class CassandraMailboxPathDAOTest {

protected CassandraCluster cassandra;

private CassandraMailboxPathDAO testee;
protected CassandraMailboxPathDAO testee;

abstract CassandraMailboxPathDAO testee();

@Before
public void setUp() throws Exception {
cassandra = CassandraCluster.create(new CassandraMailboxModule(), cassandraServer.getIp(), cassandraServer.getBindingPort());
testee = testee();

}

@After
Expand Down

0 comments on commit 9a515e0

Please sign in to comment.