Skip to content

Commit

Permalink
HSEARCH-4358 Centralize the definition of clocks, used in all agents
Browse files Browse the repository at this point in the history
  • Loading branch information
yrodiere committed Nov 30, 2021
1 parent a406313 commit c16e22b
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
Expand Up @@ -74,7 +74,7 @@ public final class OutboxPollingEventProcessor {
.withDefault( HibernateOrmMapperOutboxPollingSettings.Defaults.COORDINATION_EVENT_PROCESSOR_RETRY_DELAY )
.build();

public static Factory factory(AutomaticIndexingMappingContext mapping,
public static Factory factory(AutomaticIndexingMappingContext mapping, Clock clock,
ConfigurationPropertySource configurationSource) {
Duration pollingInterval = POLLING_INTERVAL.getAndTransform( configurationSource, Duration::ofMillis );
Duration pulseInterval = PULSE_INTERVAL.getAndTransform( configurationSource,
Expand All @@ -87,7 +87,7 @@ public static Factory factory(AutomaticIndexingMappingContext mapping,
Integer transactionTimeout = TRANSACTION_TIMEOUT.get( configurationSource )
.orElse( null );

return new Factory( mapping, pollingInterval, pulseInterval, pulseExpiration, batchSize, retryDelay,
return new Factory( mapping, clock, pollingInterval, pulseInterval, pulseExpiration, batchSize, retryDelay,
transactionTimeout );
}

Expand All @@ -108,17 +108,19 @@ private static Duration checkPulseExpiration(Duration pulseExpiration, Duration

public static class Factory {
private final AutomaticIndexingMappingContext mapping;
private final Clock clock;
private final Duration pollingInterval;
private final Duration pulseInterval;
private final Duration pulseExpiration;
private final int batchSize;
private final int retryDelay;
private final Integer transactionTimeout;

private Factory(AutomaticIndexingMappingContext mapping,
private Factory(AutomaticIndexingMappingContext mapping, Clock clock,
Duration pollingInterval, Duration pulseInterval, Duration pulseExpiration,
int batchSize, int retryDelay, Integer transactionTimeout) {
this.mapping = mapping;
this.clock = clock;
this.pollingInterval = pollingInterval;
this.pulseInterval = pulseInterval;
this.pulseExpiration = pulseExpiration;
Expand All @@ -133,7 +135,7 @@ public OutboxPollingEventProcessor create(ScheduledExecutorService scheduledExec
String agentName = NAME_PREFIX
+ ( shardAssignmentOrNull == null ? "" : " - " + shardAssignmentOrNull.assignedShardIndex );
OutboxPollingEventProcessorClusterLink clusterLink = new OutboxPollingEventProcessorClusterLink(
agentName, mapping.failureHandler(), Clock.systemUTC(),
agentName, mapping.failureHandler(), clock,
finderProvider, pollingInterval, pulseInterval, pulseExpiration, shardAssignmentOrNull );

return new OutboxPollingEventProcessor( agentName, this, scheduledExecutor,
Expand Down
Expand Up @@ -158,7 +158,8 @@ private void initializeEventProcessors(CoordinationStrategyStartContext context)
shardAssignmentOrNulls = Collections.singletonList( null );
}

OutboxPollingEventProcessor.Factory factory = OutboxPollingEventProcessor.factory( context.mapping(), configurationSource );
OutboxPollingEventProcessor.Factory factory = OutboxPollingEventProcessor.factory( context.mapping(),
context.clock(), configurationSource );

scheduledExecutor = context.threadPoolProvider()
.newScheduledExecutor( shardAssignmentOrNulls.size(), OutboxPollingEventProcessor.NAME_PREFIX );
Expand Down
Expand Up @@ -6,6 +6,8 @@
*/
package org.hibernate.search.mapper.orm.coordination.common.spi;

import java.time.Clock;

import org.hibernate.search.engine.cfg.ConfigurationPropertySource;
import org.hibernate.search.engine.environment.bean.BeanResolver;
import org.hibernate.search.engine.environment.thread.spi.ThreadPoolProvider;
Expand Down Expand Up @@ -42,6 +44,11 @@ public interface CoordinationStrategyStartContext {
*/
ThreadPoolProvider threadPoolProvider();

/**
* @return A {@link Clock} to be used for coordination between nodes.
*/
Clock clock();

/**
* @return The mapping, providing all information and operations necessary
* for background processing of automatic indexing events.
Expand Down
Expand Up @@ -6,6 +6,8 @@
*/
package org.hibernate.search.mapper.orm.mapping.impl;

import java.time.Clock;

import org.hibernate.search.engine.cfg.ConfigurationPropertySource;
import org.hibernate.search.engine.environment.bean.BeanResolver;
import org.hibernate.search.engine.environment.thread.spi.ThreadPoolProvider;
Expand Down Expand Up @@ -48,6 +50,11 @@ public ThreadPoolProvider threadPoolProvider() {
return delegate.threadPoolProvider();
}

@Override
public Clock clock() {
return Clock.systemUTC();
}

@Override
public AutomaticIndexingMappingContext mapping() {
return mapping;
Expand Down

0 comments on commit c16e22b

Please sign in to comment.