Skip to content

Commit

Permalink
HSEARCH-4132 Introduce indexing strategy for outbox table
Browse files Browse the repository at this point in the history
  • Loading branch information
fax4ever authored and yrodiere committed Mar 8, 2021
1 parent f7371db commit 2a7048a
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 0 deletions.
@@ -0,0 +1,67 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.mapper.orm.outbox.impl;

import java.util.Arrays;
import java.util.Objects;

import org.hibernate.search.mapper.pojo.route.DocumentRoutesDescriptor;
import org.hibernate.search.util.common.serialization.spi.SerializationUtils;

public final class OutboxEvent {

private final String entityName;
private final String serializedId;
private final byte[] serializedRoutes;

public OutboxEvent(String entityName, String serializedId, DocumentRoutesDescriptor routesDescriptor) {
this.entityName = entityName;
this.serializedId = serializedId;
this.serializedRoutes = SerializationUtils.serialize( routesDescriptor );
}

public String getEntityName() {
return entityName;
}

public String getSerializedId() {
return serializedId;
}

public byte[] getSerializedRoutes() {
return serializedRoutes;
}

@Override
public boolean equals(Object o) {
if ( this == o ) {
return true;
}
if ( o == null || getClass() != o.getClass() ) {
return false;
}
OutboxEvent that = (OutboxEvent) o;
return Objects.equals( entityName, that.entityName ) && Objects.equals(
serializedId, that.serializedId ) && Arrays.equals( serializedRoutes, that.serializedRoutes );
}

@Override
public int hashCode() {
int result = Objects.hash( entityName, serializedId );
result = 31 * result + Arrays.hashCode( serializedRoutes );
return result;
}

@Override
public String toString() {
return "OutboxEvent{" +
"entityName='" + entityName + '\'' +
", serializedId='" + serializedId + '\'' +
", serializedRoutes=" + Arrays.toString( serializedRoutes ) +
'}';
}
}
@@ -0,0 +1,39 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.mapper.orm.outbox.impl;

import java.util.concurrent.CompletableFuture;

import org.hibernate.search.mapper.orm.automaticindexing.spi.AutomaticIndexingConfigurationContext;
import org.hibernate.search.mapper.orm.automaticindexing.spi.AutomaticIndexingStrategy;
import org.hibernate.search.mapper.orm.automaticindexing.spi.AutomaticIndexingStrategyPreStopContext;
import org.hibernate.search.mapper.orm.automaticindexing.spi.AutomaticIndexingStrategyStartContext;

public class OutboxTableAutomaticIndexingStrategy implements AutomaticIndexingStrategy {

@Override
public void configure(AutomaticIndexingConfigurationContext context) {
context.sendIndexingEventsTo( ctx -> new OutboxTableSendingPlan( ctx.session() ), true );
}

@Override
public CompletableFuture<?> start(AutomaticIndexingStrategyStartContext context) {
// Nothing to do
return CompletableFuture.completedFuture( null );
}

@Override
public CompletableFuture<?> preStop(AutomaticIndexingStrategyPreStopContext context) {
// Nothing to do
return CompletableFuture.completedFuture( null );
}

@Override
public void stop() {
// Nothing to do
}
}
@@ -0,0 +1,76 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.mapper.orm.outbox.impl;

import static org.hibernate.search.mapper.orm.outbox.impl.OutboxAdditionalJaxbMappingProducer.ENTITY_ID_PROPERTY_NAME;
import static org.hibernate.search.mapper.orm.outbox.impl.OutboxAdditionalJaxbMappingProducer.ENTITY_NAME_PROPERTY_NAME;
import static org.hibernate.search.mapper.orm.outbox.impl.OutboxAdditionalJaxbMappingProducer.OUTBOX_ENTITY_NAME;
import static org.hibernate.search.mapper.orm.outbox.impl.OutboxAdditionalJaxbMappingProducer.ROUTE_PROPERTY_NAME;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.hibernate.Session;
import org.hibernate.search.engine.backend.common.spi.EntityReferenceFactory;
import org.hibernate.search.engine.backend.common.spi.MultiEntityOperationExecutionReport;
import org.hibernate.search.mapper.orm.automaticindexing.spi.AutomaticIndexingQueueEventSendingPlan;
import org.hibernate.search.mapper.pojo.route.DocumentRoutesDescriptor;

public class OutboxTableSendingPlan implements AutomaticIndexingQueueEventSendingPlan {

private final Session session;
private final Set<OutboxEvent> events = new HashSet<>();

public OutboxTableSendingPlan(Session session) {
this.session = session;
}

@Override
public void add(String entityName, Object identifier, String serializedId, DocumentRoutesDescriptor routes) {
events.add( new OutboxEvent( entityName, serializedId, routes ) );
}

@Override
public void addOrUpdate(String entityName, Object identifier, String serializedId,
DocumentRoutesDescriptor routes) {
events.add( new OutboxEvent( entityName, serializedId, routes ) );
}

@Override
public void delete(String entityName, Object identifier, String serializedId, DocumentRoutesDescriptor routes) {
events.add( new OutboxEvent( entityName, serializedId, routes ) );
}

@Override
public void discard() {
events.clear();
}

@Override
public <R> CompletableFuture<MultiEntityOperationExecutionReport<R>> sendAndReport(
EntityReferenceFactory<R> entityReferenceFactory) {
MultiEntityOperationExecutionReport.Builder<R> builder = MultiEntityOperationExecutionReport.builder();
for ( OutboxEvent event : events ) {
try {
HashMap<String, Object> entityData = new HashMap<>();
entityData.put( ENTITY_NAME_PROPERTY_NAME, event.getEntityName() );
entityData.put( ENTITY_ID_PROPERTY_NAME, event.getSerializedId() );
entityData.put( ROUTE_PROPERTY_NAME, event.getSerializedRoutes() );

session.persist( OUTBOX_ENTITY_NAME, entityData );
}
catch (RuntimeException e) {
builder.throwable( e );
builder.failingEntityReference( entityReferenceFactory, event.getEntityName(), event.getSerializedId() );
}
}
session.flush();
return CompletableFuture.completedFuture( builder.build() );
}
}

0 comments on commit 2a7048a

Please sign in to comment.