Skip to content

Commit

Permalink
HSEARCH-4283 Filter outbox events by their status
Browse files Browse the repository at this point in the history
  • Loading branch information
fax4ever committed Jan 4, 2022
1 parent faf9311 commit 6424b9b
Showing 1 changed file with 20 additions and 6 deletions.
Expand Up @@ -16,6 +16,10 @@
import org.hibernate.query.Query;

public final class DefaultOutboxEventFinder implements OutboxEventFinder {

private static OutboxEventAndPredicate basePredicateFilter = OutboxEventAndPredicate.of(
new ProcessAfterFilter(), new ProcessPendingFilter() );

public static final class Provider implements OutboxEventFinderProvider {
@Override
public DefaultOutboxEventFinder create(Optional<OutboxEventPredicate> predicate) {
Expand All @@ -24,23 +28,20 @@ public DefaultOutboxEventFinder create(Optional<OutboxEventPredicate> predicate)
}

public static String createQueryString(Optional<OutboxEventPredicate> predicate) {
ProcessAfterFilter processAfterFilter = new ProcessAfterFilter();
OutboxEventPredicate combined = ( predicate.isPresent() ) ?
OutboxEventAndPredicate.of( predicate.get(), processAfterFilter ) :
processAfterFilter;
OutboxEventAndPredicate.of( predicate.get(), basePredicateFilter ) :
basePredicateFilter;

return "select e from " + ENTITY_NAME + " e where " + combined.queryPart( "e" ) + " order by e.id";
}

public static Query<OutboxEvent> createQuery(Session session, int maxResults,
String queryString, Optional<OutboxEventPredicate> predicate) {
ProcessAfterFilter processAfterFilter = new ProcessAfterFilter();
Query<OutboxEvent> query = session.createQuery( queryString, OutboxEvent.class );

if ( predicate.isPresent() ) {
predicate.get().setParams( query );
}
processAfterFilter.setParams( query );
basePredicateFilter.setParams( query );

query.setMaxResults( maxResults );
return query;
Expand Down Expand Up @@ -72,4 +73,17 @@ public void setParams(Query<OutboxEvent> query) {
query.setParameter( "now", Instant.now() );
}
}

private static class ProcessPendingFilter implements OutboxEventPredicate {

@Override
public String queryPart(String eventAlias) {
return eventAlias + ".status = :status";
}

@Override
public void setParams(Query<OutboxEvent> query) {
query.setParameter( "status", OutboxEvent.Status.PENDING );
}
}
}

0 comments on commit 6424b9b

Please sign in to comment.