-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Change the insert of rebuild published events to a batch insert
- Loading branch information
amckenzie
committed
Nov 19, 2019
1 parent
4a59ac0
commit 3f2b28c
Showing
24 changed files
with
920 additions
and
280 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
17 changes: 17 additions & 0 deletions
17
...a/uk/gov/justice/services/eventsourcing/publishedevent/jdbc/PublishedEventStatements.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package uk.gov.justice.services.eventsourcing.publishedevent.jdbc; | ||
|
||
public interface PublishedEventStatements { | ||
|
||
String TRUNCATE_PUBLISHED_EVENT = "TRUNCATE published_event"; | ||
String TRUNCATE_PREPUBLISH_QUEUE = "TRUNCATE pre_publish_queue"; | ||
|
||
String INSERT_INTO_PUBLISHED_EVENT_SQL = "INSERT into published_event (" + | ||
"id, stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number) " + | ||
"VALUES " + | ||
"(?, ?, ?, ?, ?, ?, ?, ?, ?)"; | ||
|
||
String SELECT_FROM_PUBLISHED_EVENT_QUERY = | ||
"SELECT stream_id, position_in_stream, name, payload, metadata, date_created, event_number, previous_event_number " + | ||
"FROM published_event " + | ||
"WHERE id = ?"; | ||
} |
13 changes: 13 additions & 0 deletions
13
.../java/uk/gov/justice/services/eventsourcing/publishedevent/rebuild/ActiveEventFilter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild; | ||
|
||
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event; | ||
|
||
import java.util.Set; | ||
import java.util.UUID; | ||
|
||
public class ActiveEventFilter { | ||
|
||
public boolean isActiveEvent(final Event event, final Set<UUID> activeStreamIds) { | ||
return activeStreamIds.contains(event.getStreamId()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
78 changes: 78 additions & 0 deletions
78
.../justice/services/eventsourcing/publishedevent/rebuild/BatchedPublishedEventInserter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild; | ||
|
||
import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp; | ||
import static uk.gov.justice.services.eventsourcing.publishedevent.jdbc.PublishedEventStatements.INSERT_INTO_PUBLISHED_EVENT_SQL; | ||
|
||
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event; | ||
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.MissingEventNumberException; | ||
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; | ||
import uk.gov.justice.services.eventsourcing.util.io.Closer; | ||
import uk.gov.justice.services.jdbc.persistence.DataAccessException; | ||
|
||
import java.sql.Connection; | ||
import java.sql.PreparedStatement; | ||
import java.sql.SQLException; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
import javax.sql.DataSource; | ||
|
||
public class BatchedPublishedEventInserter implements AutoCloseable { | ||
|
||
private final Closer closer; | ||
|
||
private PreparedStatement preparedStatement; | ||
private Connection connection; | ||
|
||
public BatchedPublishedEventInserter(final Closer closer) { | ||
this.closer = closer; | ||
} | ||
|
||
public void prepareForInserts(final DataSource eventStoreDataSource) { | ||
try { | ||
connection = eventStoreDataSource.getConnection(); | ||
preparedStatement = connection.prepareStatement(INSERT_INTO_PUBLISHED_EVENT_SQL); | ||
} catch (final SQLException e) { | ||
throw new DataAccessException("Failed to prepare statement for batch insert of PublishedEvents", e); | ||
} | ||
} | ||
|
||
public PublishedEvent addToBatch(final PublishedEvent publishedEvent) { | ||
|
||
try { | ||
preparedStatement.setObject(1, publishedEvent.getId()); | ||
preparedStatement.setObject(2, publishedEvent.getStreamId()); | ||
preparedStatement.setLong(3, publishedEvent.getPositionInStream()); | ||
preparedStatement.setString(4, publishedEvent.getName()); | ||
preparedStatement.setString(5, publishedEvent.getPayload()); | ||
preparedStatement.setString(6, publishedEvent.getMetadata()); | ||
preparedStatement.setObject(7, toSqlTimestamp(publishedEvent.getCreatedAt())); | ||
preparedStatement.setLong(8, publishedEvent.getEventNumber().orElseThrow(() -> new MissingEventNumberException("Event with id '%s' does not have an event number"))); | ||
preparedStatement.setLong(9, publishedEvent.getPreviousEventNumber()); | ||
|
||
preparedStatement.addBatch(); | ||
|
||
return publishedEvent; | ||
|
||
} catch (final SQLException e) { | ||
throw new DataAccessException("Failed to add PublishedEvent to batch", e); | ||
} | ||
} | ||
|
||
public void insertBatch() { | ||
|
||
try { | ||
preparedStatement.executeBatch(); | ||
} catch (final SQLException e) { | ||
throw new DataAccessException("Failed to insert batch of PublishedEvents", e); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
closer.closeQuietly(preparedStatement); | ||
closer.closeQuietly(connection); | ||
} | ||
} |
24 changes: 24 additions & 0 deletions
24
...e/services/eventsourcing/publishedevent/rebuild/BatchedPublishedEventInserterFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild; | ||
|
||
import uk.gov.justice.services.eventsourcing.source.core.EventStoreDataSourceProvider; | ||
import uk.gov.justice.services.eventsourcing.util.io.Closer; | ||
|
||
import javax.inject.Inject; | ||
|
||
public class BatchedPublishedEventInserterFactory { | ||
|
||
@Inject | ||
private EventStoreDataSourceProvider eventStoreDataSourceProvider; | ||
|
||
@Inject | ||
private Closer closer; | ||
|
||
public BatchedPublishedEventInserter createInitialised() { | ||
|
||
final BatchedPublishedEventInserter batchedPublishedEventInserter = new BatchedPublishedEventInserter(closer); | ||
|
||
batchedPublishedEventInserter.prepareForInserts(eventStoreDataSourceProvider.getDefaultDataSource()); | ||
|
||
return batchedPublishedEventInserter; | ||
} | ||
} |
17 changes: 17 additions & 0 deletions
17
.../java/uk/gov/justice/services/eventsourcing/publishedevent/rebuild/EventNumberGetter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild; | ||
|
||
import static java.lang.String.format; | ||
|
||
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event; | ||
|
||
public class EventNumberGetter { | ||
|
||
public Long eventNumberFrom(final Event event) { | ||
|
||
return event | ||
.getEventNumber() | ||
.orElseThrow(() -> new RebuildException(format( | ||
"No eventNumber found for event with id '%s'", | ||
event.getId()))); | ||
} | ||
} |
45 changes: 0 additions & 45 deletions
45
.../uk/gov/justice/services/eventsourcing/publishedevent/rebuild/PublishedEventInserter.java
This file was deleted.
Oops, something went wrong.
50 changes: 50 additions & 0 deletions
50
...k/gov/justice/services/eventsourcing/publishedevent/rebuild/PublishedEventsRebuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package uk.gov.justice.services.eventsourcing.publishedevent.rebuild; | ||
|
||
import static java.util.stream.Collectors.toList; | ||
|
||
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.Event; | ||
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent; | ||
|
||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.stream.Stream; | ||
|
||
import javax.inject.Inject; | ||
|
||
public class PublishedEventsRebuilder { | ||
|
||
@Inject | ||
private EventNumberGetter eventNumberGetter; | ||
|
||
@Inject | ||
private BatchedPublishedEventInserterFactory batchedPublishedEventInserterFactory; | ||
|
||
@Inject | ||
private ActiveEventFilter activeEventFilter; | ||
|
||
@Inject | ||
private RebuildPublishedEventFactory rebuildPublishedEventFactory; | ||
|
||
@SuppressWarnings("squid:S3864") | ||
public List<PublishedEvent> rebuild( | ||
final Stream<Event> eventStream, | ||
final AtomicLong previousEventNumber, | ||
final AtomicLong currentEventNumber, | ||
final Set<UUID> activeStreamIds) { | ||
|
||
try (final BatchedPublishedEventInserter batchedPublishedEventInserter = batchedPublishedEventInserterFactory.createInitialised()) { | ||
final List<PublishedEvent> publishedEvents = eventStream | ||
.peek(event -> currentEventNumber.set(eventNumberGetter.eventNumberFrom(event))) | ||
.filter(event -> activeEventFilter.isActiveEvent(event, activeStreamIds)) | ||
.map(event -> rebuildPublishedEventFactory.createPublishedEventFrom(event, previousEventNumber)) | ||
.map(batchedPublishedEventInserter::addToBatch) | ||
.collect(toList()); | ||
|
||
batchedPublishedEventInserter.insertBatch(); | ||
|
||
return publishedEvents; | ||
} | ||
} | ||
} |
Oops, something went wrong.