From 7247413e3843d801789542b490f76c2a42082cb5 Mon Sep 17 00:00:00 2001 From: Lorenzo Dematte Date: Wed, 30 Oct 2024 13:04:40 +0100 Subject: [PATCH 1/4] Add ClusterStateSupplier interface + standard implementation --- .../cluster/ClusterStateSupplier.java | 27 ++++++++++++++ .../cluster/SafeClusterStateSupplier.java | 37 +++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/cluster/ClusterStateSupplier.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/SafeClusterStateSupplier.java diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateSupplier.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateSupplier.java new file mode 100644 index 0000000000000..5c6c525e9b7ae --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateSupplier.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import java.util.Optional; +import java.util.function.Function; + +/** + * Utility to access {@link ClusterState} only when it is "ready", with a fallback if it's not. The definition of "ready" is left to the + * class implementations. + */ +public interface ClusterStateSupplier { + Optional getClusterStateIfReady(); + + default T withCurrentClusterState(Function clusterStateFunction, T fallbackIfNotReady) { + var x = getClusterStateIfReady(); + return x.map(clusterStateFunction).orElse(fallbackIfNotReady); + } +} + diff --git a/server/src/main/java/org/elasticsearch/cluster/SafeClusterStateSupplier.java b/server/src/main/java/org/elasticsearch/cluster/SafeClusterStateSupplier.java new file mode 100644 index 0000000000000..d75f716b3fd27 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/SafeClusterStateSupplier.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import java.util.Optional; + +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; + +/** + * Utility to access {@link ClusterState} only when it is "ready", where "ready" means initialized and recovered. + */ +public class SafeClusterStateSupplier implements ClusterStateSupplier, ClusterStateListener { + private volatile ClusterState currentClusterState; + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (isInitialized() || event.state().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { + currentClusterState = event.state(); + } + } + + private boolean isInitialized() { + return currentClusterState != null; + } + + @Override + public Optional getClusterStateIfReady() { + return Optional.ofNullable(currentClusterState); + } +} From a645d43d8eb798721943a45ce7184842285306a8 Mon Sep 17 00:00:00 2001 From: Lorenzo Dematte Date: Wed, 30 Oct 2024 14:29:37 +0100 Subject: [PATCH 2/4] Spotless --- .../java/org/elasticsearch/cluster/ClusterStateSupplier.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateSupplier.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateSupplier.java index 5c6c525e9b7ae..d8dc7ed70fc31 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateSupplier.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateSupplier.java @@ -24,4 +24,3 @@ default T withCurrentClusterState(Function clusterStateFunc return x.map(clusterStateFunction).orElse(fallbackIfNotReady); } } - From 531ce989bf138032cb4b60952dd06a2c72050b59 Mon Sep 17 00:00:00 2001 From: Lorenzo Dematte Date: Wed, 30 Oct 2024 15:07:09 +0100 Subject: [PATCH 3/4] Make ClusterStateSupplier a Supplier>; improve javadoc --- .../org/elasticsearch/cluster/ClusterStateSupplier.java | 7 +++---- .../elasticsearch/cluster/SafeClusterStateSupplier.java | 9 +++++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateSupplier.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateSupplier.java index d8dc7ed70fc31..61bb049ffd5c5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateSupplier.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateSupplier.java @@ -11,16 +11,15 @@ import java.util.Optional; import java.util.function.Function; +import java.util.function.Supplier; /** * Utility to access {@link ClusterState} only when it is "ready", with a fallback if it's not. The definition of "ready" is left to the * class implementations. */ -public interface ClusterStateSupplier { - Optional getClusterStateIfReady(); - +public interface ClusterStateSupplier extends Supplier> { default T withCurrentClusterState(Function clusterStateFunction, T fallbackIfNotReady) { - var x = getClusterStateIfReady(); + var x = get(); return x.map(clusterStateFunction).orElse(fallbackIfNotReady); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/SafeClusterStateSupplier.java b/server/src/main/java/org/elasticsearch/cluster/SafeClusterStateSupplier.java index d75f716b3fd27..7a6cf2d3e20c4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SafeClusterStateSupplier.java +++ b/server/src/main/java/org/elasticsearch/cluster/SafeClusterStateSupplier.java @@ -14,7 +14,12 @@ import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; /** - * Utility to access {@link ClusterState} only when it is "ready", where "ready" means initialized and recovered. + * Utility to access {@link ClusterState} only when it is "ready", where "ready" means that we received a first clusterChanged event + * with no global block of type {@code STATE_NOT_RECOVERED_BLOCK} + * This guarantees that: + * - the initial cluster state has been set (see + * {@link org.elasticsearch.cluster.service.ClusterApplierService#setInitialState(ClusterState)}); + * - the initial recovery process has completed. */ public class SafeClusterStateSupplier implements ClusterStateSupplier, ClusterStateListener { private volatile ClusterState currentClusterState; @@ -31,7 +36,7 @@ private boolean isInitialized() { } @Override - public Optional getClusterStateIfReady() { + public Optional get() { return Optional.ofNullable(currentClusterState); } } From 1fd80f2a4fbc216eeb69bd7a8588cc268fe258c6 Mon Sep 17 00:00:00 2001 From: Lorenzo Dematte Date: Mon, 4 Nov 2024 16:38:04 +0100 Subject: [PATCH 4/4] Addressed PR comments --- .../org/elasticsearch/cluster/SafeClusterStateSupplier.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/SafeClusterStateSupplier.java b/server/src/main/java/org/elasticsearch/cluster/SafeClusterStateSupplier.java index 7a6cf2d3e20c4..b12ef3d78f864 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SafeClusterStateSupplier.java +++ b/server/src/main/java/org/elasticsearch/cluster/SafeClusterStateSupplier.java @@ -26,6 +26,8 @@ public class SafeClusterStateSupplier implements ClusterStateSupplier, ClusterSt @Override public void clusterChanged(ClusterChangedEvent event) { + // In this default implementation, "ready" is really "is cluster state available", which after the initial recovery it should be. + // If you need a different condition, feel free to add a different implementation of ClusterStateSupplier if (isInitialized() || event.state().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { currentClusterState = event.state(); }