Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event to event indexer test #133

Merged
merged 3 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
- Publishing of events no longer use a trigger on the event_log table
- Filestore now does hard delete when deleting files rather than just marking as deleted
- Fix hibernate incompatibility issue while running ITs in embedded wildfly
- Fix classDefNotFoundError logged by wildfly-maven-plugin while shutting down embedded wildfly server
- Fix classDefNotFoundError logged by wildfly-maven-plugin while shutting down embedded wildfly server
- Add ITs for validating REPLAY_EVENT_TO_EVENT_LISTENER and REPLAY_EVENT_TO_EVENT_INDEXER command processing

## [7.0.0] - 2021-09-27
### Changed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package uk.gov.justice.services.example.cakeshop.provider;

import uk.gov.justice.services.core.interceptor.InterceptorChainEntry;
import uk.gov.justice.services.core.interceptor.InterceptorChainEntryProvider;
import uk.gov.justice.services.event.source.subscriptions.interceptors.SubscriptionEventInterceptor;

import java.util.ArrayList;
import java.util.List;

public class ExampleEventIndexerInterceptorChainProvider implements InterceptorChainEntryProvider {

private final List<InterceptorChainEntry> interceptorChainEntries = new ArrayList<InterceptorChainEntry>();

public ExampleEventIndexerInterceptorChainProvider() {
interceptorChainEntries.add(new InterceptorChainEntry(1000, SubscriptionEventInterceptor.class));
}

@Override
public String component() {
return "EVENT_INDEXER";
}

@Override
public List<InterceptorChainEntry> interceptorChainTypes() {
return interceptorChainEntries;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package uk.gov.justice.services.example.cakeshop.provider;

import org.junit.Test;
import uk.gov.justice.services.core.interceptor.InterceptorChainEntry;

import java.util.List;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

public class ExampleEventIndexerInterceptorChainProviderTest {

@Test
public void shouldCreateInterceptorChainEntriesWithSubscriptionEventInterceptor() {

final List<InterceptorChainEntry> interceptorChainEntries = new ExampleEventIndexerInterceptorChainProvider().interceptorChainTypes();

assertThat(interceptorChainEntries.size(), is(1));

final InterceptorChainEntry interceptorChainEntry = interceptorChainEntries.get(0);
assertThat(interceptorChainEntry.getInterceptorType().getName(), is("uk.gov.justice.services.event.source.subscriptions.interceptors.SubscriptionEventInterceptor"));
assertThat(interceptorChainEntry.getPriority(), is(1000));
}

@Test
public void shouldReturnComponentName() {
assertThat(new ExampleEventIndexerInterceptorChainProvider().component(), is("EVENT_INDEXER"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void shouldSuccessfullyProcessManyUpdatesToSameRecipeId() throws Exceptio
.put(eventFactory.renameRecipeEntity("Final Name"));

new Poller().pollUntilFound(() -> {
System.out.printf("Polling for query response body to contain 'Final Name' for recipeId: %s", recipeId);
if (querier.queryForRecipe(recipeId).body().contains("Final Name")) {
return of(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {
System.out.println("Waiting for events to publish...");

final Optional<Integer> processedEventCount = longPoller.pollUntilFound(() -> {
final int eventCount = processedEventCounter.countProcessedEvents();
final int eventCount = processedEventCounter.countProcessedEventsForEventListener();
System.out.printf("Polling processed_event table. Expected events count: %d, found: %d", totalEvents, eventCount);
if (eventCount == totalEvents) {
return of(eventCount);
}
Expand All @@ -112,9 +113,8 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {
runCatchup();

final Optional<Integer> numberOfReplayedEvents = longPoller.pollUntilFound(() -> {
final int eventCount = processedEventCounter.countProcessedEvents();
System.out.println(format("%s events in processed_event table", eventCount));

final int eventCount = processedEventCounter.countProcessedEventsForEventListener();
System.out.printf("Polling processed_event table. Expected events count: %d, found: %d", totalEvents, eventCount);
if (eventCount == totalEvents) {
return of(eventCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static javax.ws.rs.core.Response.Status.ACCEPTED;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static uk.gov.justice.services.eventstore.management.commands.EventCatchupCommand.CATCHUP;
import static uk.gov.justice.services.example.cakeshop.it.params.CakeShopMediaTypes.ADD_RECIPE_MEDIA_TYPE;
Expand Down Expand Up @@ -71,7 +72,7 @@ public class EventHealingIT {
private final DatabaseCleaner databaseCleaner = new DatabaseCleaner();
private final SequenceSetter sequenceSetter = new SequenceSetter();

private final Poller poller = new Poller();
private final Poller poller = new Poller(50, 1000);

private Client client;

Expand Down Expand Up @@ -116,15 +117,18 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {
assertThat(response.getStatus(), isStatus(ACCEPTED));
}

poller.pollUntilFound(() -> {
final int eventCount = processedEventFinder.countProcessedEvents();
final Optional<Integer> result = poller.pollUntilFound(() -> {
final int eventCount = processedEventFinder.countProcessedEventsForEventListener();
System.out.printf("Polling processed_event table. Expected events count: %d, found: %d", numberOfRecipes, eventCount);
if (eventCount == numberOfRecipes) {
return of(eventCount);
}

return empty();
});

assertTrue(result.isPresent());

removeRecipesFromViewStore(3, findRecipeIdForEventNumber(3));
removeRecipesFromViewStore(5, findRecipeIdForEventNumber(5));
removeRecipesFromViewStore(6, findRecipeIdForEventNumber(6));
Expand All @@ -133,7 +137,8 @@ public void shouldReplayAndFindRecipesInViewStore() throws Exception {
runCatchup();

final Optional<Integer> numberOfEventsInProcessedEventTable = poller.pollUntilFound(() -> {
final int eventCount = processedEventFinder.countProcessedEvents();
final int eventCount = processedEventFinder.countProcessedEventsForEventListener();
System.out.printf("Polling processed_event table. Expected events count: %d, found: %d", numberOfRecipes, eventCount);
if (eventCount == numberOfRecipes) {
return of(eventCount);
}
Expand Down Expand Up @@ -189,7 +194,8 @@ private void cleanViewstoreTables() {
"recipe",
"cake",
"cake_order",
"processed_event"
"processed_event",
"stream_status"
);

databaseCleaner.cleanStreamBufferTable(contextName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class EventValidationIT {

private final TestSystemCommanderClientFactory systemCommanderClientFactory = new TestSystemCommanderClientFactory();

private final Poller poller = new Poller();
private final Poller poller = new Poller(100, 1000);
private final BatchEventInserter batchEventInserter = new BatchEventInserter(eventStoreDataSource, BATCH_INSERT_SIZE);

private PublishedEventCounter publishedEventCounter = new PublishedEventCounter(eventStoreDataSource);
Expand Down Expand Up @@ -134,6 +134,7 @@ public void shouldFailIfAnyEventsAreInvalid() throws Exception {
private void waitForEventsToPublish(final int totalEvents) {
final Optional<Integer> publishedEventCount = poller.pollUntilFound(() -> {
final int eventCount = publishedEventCounter.countPublishedEvents();
System.out.printf("Polling published_event table. Expected events count: %d, found: %d", totalEvents, eventCount);
if (eventCount == totalEvents) {
return of(eventCount);
}
Expand Down Expand Up @@ -229,6 +230,7 @@ private List<UUID> getEventIds() throws Exception {
private Optional<SystemCommandStatus> commandNoLongerInProgress(final SystemCommanderMBean systemCommanderMBean, final UUID commandId) {

final SystemCommandStatus systemCommandStatus = systemCommanderMBean.getCommandStatus(commandId);
System.out.printf("Polling for command state to be COMMAND_COMPLETE||COMMAND_FAILED for commandId: %s", commandId);

final CommandState commandState = systemCommandStatus.getCommandState();
if (commandState == COMMAND_COMPLETE || commandState == COMMAND_FAILED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ private List<PublishedEvent> getPublishedEvents(final long startNumber) {

final List<PublishedEvent> events = doGetPublishedEvents();

System.out.printf("Polling published_event table. Expected events count: %d, found: %d", 3, events.size());
if (events.size() == 3) {
final Optional<Long> eventNumber = events.get(0).getEventNumber();
if(eventNumber.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
import javax.sql.DataSource;

import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

@Ignore("Temporarily ignoring until the command is implemented in event-store. 10 April 2024")
public class SendSingleEventToEventIndexerIT {

private static final String HOST = getHost();
Expand Down Expand Up @@ -77,7 +75,10 @@ public void shouldReplaySingleEventToEventIndexerUsingTheReplayEventToEventIndex
}

final Optional<ProcessedEvent> processedEvent = poller.pollUntilFound(
() -> processedEventFinder.findProcessedEvent(publishedEvent.getId())
() -> {
System.out.printf("Polling processed_event table for existence of event id: %s", publishedEvent.getId());
return processedEventFinder.findProcessedEvent(publishedEvent.getId());
}
);

if (processedEvent.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static uk.gov.justice.services.test.utils.common.host.TestHostProvider.getHost;

import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import uk.gov.justice.services.eventsourcing.repository.jdbc.event.PublishedEvent;
import uk.gov.justice.services.example.cakeshop.it.helpers.DatabaseManager;
Expand All @@ -31,7 +30,6 @@

import javax.sql.DataSource;

@Ignore("Temporarily ignoring until the command is implemented in event-store. 10 April 2024")
public class SendSingleEventToEventListenerIT {

private static final String HOST = getHost();
Expand Down Expand Up @@ -76,7 +74,10 @@ public void shouldReplaySingleEventToEventListenerUsingTheReplayEventToEventList
}

final Optional<ProcessedEvent> processedEvent = poller.pollUntilFound(
() -> processedEventFinder.findProcessedEvent(publishedEvent.getId())
() -> {
System.out.printf("Polling processed_event table for existence of event id: %s", publishedEvent.getId());
return processedEventFinder.findProcessedEvent(publishedEvent.getId());
}
);

if (processedEvent.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public void cleanup() throws Exception {
final UUID commandId = systemCommanderMBean.call(UNSUSPEND);

final Optional<SystemCommandStatus> unsuspendStatus = poller.pollUntilFound(() -> {
System.out.printf("Polling for command state to be COMMAND_COMPLETE for commandId: %s", commandId);
final SystemCommandStatus commandStatus = systemCommanderMBean.getCommandStatus(commandId);
if (commandStatus.getCommandState() == COMMAND_COMPLETE) {
return of(commandStatus);
Expand Down Expand Up @@ -121,6 +122,7 @@ public void shouldNotReturnRecipesAfterSuspending() throws Exception {
final UUID commandId = systemCommanderMBean.call(SUSPEND);

final Optional<SystemCommandStatus> suspendStatus = poller.pollUntilFound(() -> {
System.out.printf("Polling for command state to be COMMAND_COMPLETE for commandId: %s", commandId);
final SystemCommandStatus commandStatus = systemCommanderMBean.getCommandStatus(commandId);
if (commandStatus.getCommandState() == COMMAND_COMPLETE) {
return of(commandStatus);
Expand Down Expand Up @@ -160,6 +162,7 @@ public void shouldQueryForRecipesAfterUnShuttering() throws Exception {
final UUID suspendCommandId = systemCommanderMBean.call(SUSPEND);

final Optional<SystemCommandStatus> suspendStatus = poller.pollUntilFound(() -> {
System.out.printf("Polling for command state to be COMMAND_COMPLETE for commandId: %s", suspendCommandId);
final SystemCommandStatus commandStatus = systemCommanderMBean.getCommandStatus(suspendCommandId);
if (commandStatus.getCommandState() == COMMAND_COMPLETE) {
return of(commandStatus);
Expand All @@ -186,6 +189,7 @@ public void shouldQueryForRecipesAfterUnShuttering() throws Exception {
final UUID unsuspendCommandId = systemCommanderMBean.call(UNSUSPEND);

final Optional<SystemCommandStatus> unsuspendStatus = poller.pollUntilFound(() -> {
System.out.printf("Polling for command state to be COMMAND_COMPLETE for commandId: %s", unsuspendCommandId);
final SystemCommandStatus commandStatus = systemCommanderMBean.getCommandStatus(unsuspendCommandId);
if (commandStatus.getCommandState() == COMMAND_COMPLETE) {
return of(commandStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,20 @@ public int countProcessedEvents() {
throw new RuntimeException("Failed to run query '" + sql + "' against the view store", e);
}
}

public int countProcessedEventsForEventListener() {

final String sql = "SELECT COUNT (*) FROM processed_event where component = 'EVENT_LISTENER'";

try(final Connection connection = viewStoreDataSource.getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
final ResultSet resultSet = preparedStatement.executeQuery()) {

resultSet.next();

return resultSet.getInt(1);
} catch (final SQLException e) {
throw new RuntimeException("Failed to run query '" + sql + "' against the view store", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@
<driver>postgres</driver>
<pool>
<min-pool-size>3</min-pool-size>
<max-pool-size>3</max-pool-size>
<max-pool-size>6</max-pool-size>
<prefill>true</prefill>
</pool>
<security>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
<properties>
<cpp.repo.name>cake-shop</cpp.repo.name>

<framework-libraries.version>8.0.4</framework-libraries.version>
<framework-libraries.version>8.0.7</framework-libraries.version>
<framework.version>8.10.0-M2</framework.version>
<event-store.version>8.10.0-M3</event-store.version>
<event-store.version>8.10.0-M4</event-store.version>
</properties>

<dependencyManagement>
Expand Down