Skip to content

Commit

Permalink
MAILBOX-373 Refactor CassandraEventDeadLetter
Browse files Browse the repository at this point in the history
  • Loading branch information
hoangdat authored and chibenwa committed Mar 20, 2019
1 parent b44d1bf commit e51448c
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 59 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -39,35 +39,36 @@ public CassandraEventDeadLetters(CassandraEventDeadLettersDAO cassandraEventDead
} }


@Override @Override
public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent) { public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent, InsertionId insertionId) {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL); Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL);
Preconditions.checkArgument(insertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL);


return cassandraEventDeadLettersDAO.store(registeredGroup, failDeliveredEvent) return cassandraEventDeadLettersDAO.store(registeredGroup, failDeliveredEvent, insertionId)
.then(cassandraEventDeadLettersGroupDAO.storeGroup(registeredGroup)); .then(cassandraEventDeadLettersGroupDAO.storeGroup(registeredGroup));
} }


@Override @Override
public Mono<Void> remove(Group registeredGroup, Event.EventId failDeliveredEventId) { public Mono<Void> remove(Group registeredGroup, InsertionId failDeliveredInsertionId) {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL); Preconditions.checkArgument(failDeliveredInsertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL);


return cassandraEventDeadLettersDAO.removeEvent(registeredGroup, failDeliveredEventId); return cassandraEventDeadLettersDAO.removeEvent(registeredGroup, failDeliveredInsertionId);
} }


@Override @Override
public Mono<Event> failedEvent(Group registeredGroup, Event.EventId failDeliveredEventId) { public Mono<Event> failedEvent(Group registeredGroup, InsertionId failDeliveredInsertionId) {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);
Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL); Preconditions.checkArgument(failDeliveredInsertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL);


return cassandraEventDeadLettersDAO.retrieveFailedEvent(registeredGroup, failDeliveredEventId); return cassandraEventDeadLettersDAO.retrieveFailedEvent(registeredGroup, failDeliveredInsertionId);
} }


@Override @Override
public Flux<Event.EventId> failedEventIds(Group registeredGroup) { public Flux<InsertionId> failedIds(Group registeredGroup) {
Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL);


return cassandraEventDeadLettersDAO.retrieveEventIdsWithGroup(registeredGroup); return cassandraEventDeadLettersDAO.retrieveInsertionIdsWithGroup(registeredGroup);
} }


@Override @Override
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT; import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT;
import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.EVENT_ID;
import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.GROUP; import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.GROUP;
import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.INSERTION_ID;
import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.TABLE_NAME; import static org.apache.james.mailbox.events.tables.CassandraEventDeadLettersTable.TABLE_NAME;


import javax.inject.Inject; import javax.inject.Inject;
Expand All @@ -36,7 +36,6 @@


import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session; import com.datastax.driver.core.Session;
import com.github.fge.lambdas.Throwing;


import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
Expand All @@ -46,7 +45,6 @@ public class CassandraEventDeadLettersDAO {
private final EventSerializer eventSerializer; private final EventSerializer eventSerializer;
private final PreparedStatement insertStatement; private final PreparedStatement insertStatement;
private final PreparedStatement deleteStatement; private final PreparedStatement deleteStatement;
private final PreparedStatement selectAllGroupStatement;
private final PreparedStatement selectEventStatement; private final PreparedStatement selectEventStatement;
private final PreparedStatement selectEventIdsWithGroupStatement; private final PreparedStatement selectEventIdsWithGroupStatement;


Expand All @@ -56,73 +54,61 @@ public class CassandraEventDeadLettersDAO {
this.eventSerializer = eventSerializer; this.eventSerializer = eventSerializer;
this.insertStatement = prepareInsertStatement(session); this.insertStatement = prepareInsertStatement(session);
this.deleteStatement = prepareDeleteStatement(session); this.deleteStatement = prepareDeleteStatement(session);
this.selectAllGroupStatement = prepareSelectAllGroupStatement(session);
this.selectEventStatement = prepareSelectEventStatement(session); this.selectEventStatement = prepareSelectEventStatement(session);
this.selectEventIdsWithGroupStatement = prepareSelectEventIdsWithGroupStatement(session); this.selectEventIdsWithGroupStatement = prepareSelectInsertionIdsWithGroupStatement(session);
} }


private PreparedStatement prepareInsertStatement(Session session) { private PreparedStatement prepareInsertStatement(Session session) {
return session.prepare(insertInto(TABLE_NAME) return session.prepare(insertInto(TABLE_NAME)
.value(GROUP, bindMarker(GROUP)) .value(GROUP, bindMarker(GROUP))
.value(EVENT_ID, bindMarker(EVENT_ID)) .value(INSERTION_ID, bindMarker(INSERTION_ID))
.value(EVENT, bindMarker(EVENT))); .value(EVENT, bindMarker(EVENT)));
} }


private PreparedStatement prepareDeleteStatement(Session session) { private PreparedStatement prepareDeleteStatement(Session session) {
return session.prepare(delete() return session.prepare(delete()
.from(TABLE_NAME) .from(TABLE_NAME)
.where(eq(GROUP, bindMarker(GROUP))) .where(eq(GROUP, bindMarker(GROUP)))
.and(eq(EVENT_ID, bindMarker(EVENT_ID)))); .and(eq(INSERTION_ID, bindMarker(INSERTION_ID))));
}

private PreparedStatement prepareSelectAllGroupStatement(Session session) {
return session.prepare(select(GROUP)
.from(TABLE_NAME));
} }


private PreparedStatement prepareSelectEventStatement(Session session) { private PreparedStatement prepareSelectEventStatement(Session session) {
return session.prepare(select(EVENT) return session.prepare(select(EVENT)
.from(TABLE_NAME) .from(TABLE_NAME)
.where(eq(GROUP, bindMarker(GROUP))) .where(eq(GROUP, bindMarker(GROUP)))
.and(eq(EVENT_ID, bindMarker(EVENT_ID)))); .and(eq(INSERTION_ID, bindMarker(INSERTION_ID))));
} }


private PreparedStatement prepareSelectEventIdsWithGroupStatement(Session session) { private PreparedStatement prepareSelectInsertionIdsWithGroupStatement(Session session) {
return session.prepare(select(EVENT_ID) return session.prepare(select(INSERTION_ID)
.from(TABLE_NAME) .from(TABLE_NAME)
.where(eq(GROUP, bindMarker(GROUP)))); .where(eq(GROUP, bindMarker(GROUP))));
} }


Mono<Void> store(Group group, Event failedEvent) { Mono<Void> store(Group group, Event failedEvent, EventDeadLetters.InsertionId insertionId) {
return executor.executeVoid(insertStatement.bind() return executor.executeVoid(insertStatement.bind()
.setString(GROUP, group.asString()) .setString(GROUP, group.asString())
.setUUID(EVENT_ID, failedEvent.getEventId().getId()) .setUUID(INSERTION_ID, insertionId.getId())
.setString(EVENT, eventSerializer.toJson(failedEvent))); .setString(EVENT, eventSerializer.toJson(failedEvent)));
} }


Mono<Void> removeEvent(Group group, Event.EventId failedEventId) { Mono<Void> removeEvent(Group group, EventDeadLetters.InsertionId failedInsertionId) {
return executor.executeVoid(deleteStatement.bind() return executor.executeVoid(deleteStatement.bind()
.setString(GROUP, group.asString()) .setString(GROUP, group.asString())
.setUUID(EVENT_ID, failedEventId.getId())); .setUUID(INSERTION_ID, failedInsertionId.getId()));
} }


Mono<Event> retrieveFailedEvent(Group group, Event.EventId failedEventId) { Mono<Event> retrieveFailedEvent(Group group, EventDeadLetters.InsertionId insertionId) {
return executor.executeSingleRow(selectEventStatement.bind() return executor.executeSingleRow(selectEventStatement.bind()
.setString(GROUP, group.asString()) .setString(GROUP, group.asString())
.setUUID(EVENT_ID, failedEventId.getId())) .setUUID(INSERTION_ID, insertionId.getId()))
.map(row -> deserializeEvent(row.getString(EVENT))); .map(row -> deserializeEvent(row.getString(EVENT)));
} }


Flux<Event.EventId> retrieveEventIdsWithGroup(Group group) { Flux<EventDeadLetters.InsertionId> retrieveInsertionIdsWithGroup(Group group) {
return executor.executeRows(selectEventIdsWithGroupStatement.bind() return executor.executeRows(selectEventIdsWithGroupStatement.bind()
.setString(GROUP, group.asString())) .setString(GROUP, group.asString()))
.map(row -> Event.EventId.of(row.getUUID(EVENT_ID))); .map(row -> EventDeadLetters.InsertionId.of(row.getUUID(INSERTION_ID)));
}

Flux<Group> retrieveAllGroups() {
return executor.executeRows(selectAllGroupStatement.bind())
.map(Throwing.function(row -> Group.deserialize(row.getString(GROUP))))
.distinct();
} }


private Event deserializeEvent(String serializedEvent) { private Event deserializeEvent(String serializedEvent) {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface CassandraEventDeadLettersModule {
SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION))) SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION)))
.statement(statement -> statement .statement(statement -> statement
.addPartitionKey(CassandraEventDeadLettersTable.GROUP, DataType.text()) .addPartitionKey(CassandraEventDeadLettersTable.GROUP, DataType.text())
.addClusteringColumn(CassandraEventDeadLettersTable.EVENT_ID, DataType.uuid()) .addClusteringColumn(CassandraEventDeadLettersTable.INSERTION_ID, DataType.uuid())
.addColumn(CassandraEventDeadLettersTable.EVENT, DataType.text())) .addColumn(CassandraEventDeadLettersTable.EVENT, DataType.text()))
.table(CassandraEventDeadLettersGroupTable.TABLE_NAME) .table(CassandraEventDeadLettersGroupTable.TABLE_NAME)
.comment("Projection table for retrieving groups for all failed events") .comment("Projection table for retrieving groups for all failed events")
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ public interface CassandraEventDeadLettersTable {
String TABLE_NAME = "event_dead_letters"; String TABLE_NAME = "event_dead_letters";


String GROUP = "group"; String GROUP = "group";
String EVENT_ID = "eventId"; String INSERTION_ID = "insertionId";
String EVENT = "event"; String EVENT = "event";
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_1; import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_1;
import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_2; import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_2;
import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_3; import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_3;
import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_1;
import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_2;
import static org.apache.james.mailbox.events.EventDeadLettersContract.EVENT_ID_3;
import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_A; import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_A;
import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_B; import static org.apache.james.mailbox.events.EventDeadLettersContract.GROUP_B;
import static org.apache.james.mailbox.events.EventDeadLettersContract.INSERTION_ID_1;
import static org.apache.james.mailbox.events.EventDeadLettersContract.INSERTION_ID_2;
import static org.apache.james.mailbox.events.EventDeadLettersContract.INSERTION_ID_3;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;


import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraCluster;
Expand All @@ -53,52 +53,52 @@ void setUp(CassandraCluster cassandraCluster) {


@Test @Test
void removeEventShouldSucceededWhenRemoveStoredEvent() { void removeEventShouldSucceededWhenRemoveStoredEvent() {
cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block(); cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();


cassandraEventDeadLettersDAO.removeEvent(GROUP_A, EVENT_ID_1).block(); cassandraEventDeadLettersDAO.removeEvent(GROUP_A, INSERTION_ID_1).block();


assertThat(cassandraEventDeadLettersDAO assertThat(cassandraEventDeadLettersDAO
.retrieveEventIdsWithGroup(GROUP_A) .retrieveInsertionIdsWithGroup(GROUP_A)
.collectList().block()) .collectList().block())
.isEmpty(); .isEmpty();
} }


@Test @Test
void retrieveFailedEventShouldReturnEmptyWhenDefault() { void retrieveFailedEventShouldReturnEmptyWhenDefault() {
assertThat(cassandraEventDeadLettersDAO assertThat(cassandraEventDeadLettersDAO
.retrieveFailedEvent(GROUP_A, EVENT_ID_1) .retrieveFailedEvent(GROUP_A, INSERTION_ID_1)
.blockOptional().isPresent()) .blockOptional().isPresent())
.isFalse(); .isFalse();
} }


@Test @Test
void retrieveFailedEventShouldReturnStoredEvent() { void retrieveFailedEventShouldReturnStoredEvent() {
cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1).block(); cassandraEventDeadLettersDAO.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block(); cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2, INSERTION_ID_2).block();


assertThat(cassandraEventDeadLettersDAO assertThat(cassandraEventDeadLettersDAO
.retrieveFailedEvent(GROUP_B, EVENT_ID_2) .retrieveFailedEvent(GROUP_B, INSERTION_ID_2)
.blockOptional().get()) .blockOptional().get())
.isEqualTo(EVENT_2); .isEqualTo(EVENT_2);
} }


@Test @Test
void retrieveEventIdsWithGroupShouldReturnEmptyWhenDefault() { void retrieveInsertionIdsWithGroupShouldReturnEmptyWhenDefault() {
assertThat(cassandraEventDeadLettersDAO assertThat(cassandraEventDeadLettersDAO
.retrieveEventIdsWithGroup(GROUP_A) .retrieveInsertionIdsWithGroup(GROUP_A)
.collectList().block()) .collectList().block())
.isEmpty(); .isEmpty();
} }


@Test @Test
void retrieveEventIdsWithGroupShouldReturnStoredEventId() { void retrieveInsertionIdsWithGroupShouldReturnStoredInsertionId() {
cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1).block(); cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_1, INSERTION_ID_1).block();
cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2).block(); cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_2, INSERTION_ID_2).block();
cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3).block(); cassandraEventDeadLettersDAO.store(GROUP_B, EVENT_3, INSERTION_ID_3).block();


assertThat(cassandraEventDeadLettersDAO assertThat(cassandraEventDeadLettersDAO
.retrieveEventIdsWithGroup(GROUP_B) .retrieveInsertionIdsWithGroup(GROUP_B)
.collectList().block()) .collectList().block())
.containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3); .containsOnly(INSERTION_ID_1, INSERTION_ID_2, INSERTION_ID_3);
} }
} }

0 comments on commit e51448c

Please sign in to comment.