Skip to content

Commit

Permalink
Refactor DatabaseNodeService as a cluster state listener. (#91567) (#…
Browse files Browse the repository at this point in the history
…91596)

This PR changes the DatabaseNodeService to use a ClusterStateListener which will always run after 
the cluster state is globally applied instead of using a listener on the IngestService which is run before 
the cluster state is fully published.
  • Loading branch information
jbaiera committed Nov 15, 2022
1 parent c240141 commit 01ff029
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 8 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/91567.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 91567
summary: Refactor `DatabaseNodeService` as a cluster state listener
area: Ingest Node
type: bug
issues:
- 86999
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoI
configDatabases,
Runnable::run
);
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class), mock(ClusterService.class));
return databaseNodeService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.ingest.IngestService;
Expand Down Expand Up @@ -113,7 +115,12 @@ public final class DatabaseNodeService implements Closeable {
this.genericExecutor = genericExecutor;
}

public void initialize(String nodeId, ResourceWatcherService resourceWatcher, IngestService ingestServiceArg) throws IOException {
public void initialize(
String nodeId,
ResourceWatcherService resourceWatcher,
IngestService ingestServiceArg,
ClusterService clusterService
) throws IOException {
configDatabases.initialize(resourceWatcher);
geoipTmpDirectory = geoipTmpBaseDirectory.resolve(nodeId);
Files.walkFileTree(geoipTmpDirectory, new FileVisitor<>() {
Expand Down Expand Up @@ -150,8 +157,8 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
Files.createDirectories(geoipTmpDirectory);
}
LOGGER.debug("initialized database node service, using geoip-databases directory [{}]", geoipTmpDirectory);
ingestServiceArg.addIngestClusterStateListener(this::checkDatabases);
this.ingestService = ingestServiceArg;
clusterService.addListener(event -> checkDatabases(event.state()));
}

public DatabaseReaderLazyLoader getDatabase(String name) {
Expand Down Expand Up @@ -184,6 +191,10 @@ public void close() throws IOException {
}

void checkDatabases(ClusterState state) {
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
return;
}

DiscoveryNode localNode = state.nodes().getLocalNode();
if (localNode.isIngestNode() == false) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public Collection<Object> createComponents(
) {
try {
String nodeId = nodeEnvironment.nodeId();
databaseRegistry.get().initialize(nodeId, resourceWatcherService, ingestService.get());
databaseRegistry.get().initialize(nodeId, resourceWatcherService, ingestService.get(), clusterService);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.io.Streams;
Expand Down Expand Up @@ -106,6 +107,7 @@ public class DatabaseNodeServiceTests extends ESTestCase {
private DatabaseNodeService databaseNodeService;
private ResourceWatcherService resourceWatcherService;
private IngestService ingestService;
private ClusterService clusterService;

@Before
public void setup() throws IOException {
Expand All @@ -121,9 +123,10 @@ public void setup() throws IOException {

client = mock(Client.class);
ingestService = mock(IngestService.class);
clusterService = mock(ClusterService.class);
geoIpTmpDir = createTempDir();
databaseNodeService = new DatabaseNodeService(geoIpTmpDir, client, cache, configDatabases, Runnable::run);
databaseNodeService.initialize("nodeId", resourceWatcherService, ingestService);
databaseNodeService.initialize("nodeId", resourceWatcherService, ingestService, clusterService);
}

@After
Expand Down Expand Up @@ -295,14 +298,17 @@ public void testUpdateDatabase() throws Exception {
databaseNodeService.updateDatabase("_name", "_md5", geoIpTmpDir.resolve("some-file"));

// Updating the first time may trigger a reload.
verify(ingestService, times(1)).addIngestClusterStateListener(any());
verify(clusterService, times(1)).addListener(any());
verify(ingestService, times(1)).getPipelineWithProcessorType(any(), any());
verify(ingestService, times(numPipelinesToBeReloaded)).reloadPipeline(anyString());
verifyNoMoreInteractions(clusterService);
verifyNoMoreInteractions(ingestService);
reset(clusterService);
reset(ingestService);

// Subsequent updates shouldn't trigger a reload.
databaseNodeService.updateDatabase("_name", "_md5", geoIpTmpDir.resolve("some-file"));
verifyNoMoreInteractions(clusterService);
verifyNoMoreInteractions(ingestService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void loadDatabaseReaders() throws IOException {
copyDatabaseFiles(geoIpConfigDir, configDatabases);
geoipTmpDir = createTempDir();
databaseNodeService = new DatabaseNodeService(geoipTmpDir, client, cache, configDatabases, Runnable::run);
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class), mock(ClusterService.class));
clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
}
Expand Down Expand Up @@ -361,7 +361,7 @@ public void testLoadingCustomDatabase() throws IOException {
Client client = mock(Client.class);
GeoIpCache cache = new GeoIpCache(1000);
DatabaseNodeService databaseNodeService = new DatabaseNodeService(createTempDir(), client, cache, configDatabases, Runnable::run);
databaseNodeService.initialize("nodeId", resourceWatcherService, mock(IngestService.class));
databaseNodeService.initialize("nodeId", resourceWatcherService, mock(IngestService.class), mock(ClusterService.class));
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseNodeService, clusterService);
for (DatabaseReaderLazyLoader lazyLoader : configDatabases.getConfigDatabases().values()) {
assertNull(lazyLoader.databaseReader.get());
Expand Down

0 comments on commit 01ff029

Please sign in to comment.