Skip to content

Commit

Permalink
Separate out some node constructor object bindings that don't referen…
Browse files Browse the repository at this point in the history
…ce anything else (#101123)

Make sure instance field objects are initialized as early as possible
  • Loading branch information
thecoop committed Nov 13, 2023
1 parent 54afb1b commit 8293f3a
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 116 deletions.
Expand Up @@ -91,7 +91,18 @@ public class HealthPeriodicLogger implements ClusterStateListener, Closeable, Sc
* @param client the client used to call the Health Service.
* @param healthService the Health Service, where the actual Health API logic lives.
*/
public HealthPeriodicLogger(Settings settings, ClusterService clusterService, Client client, HealthService healthService) {
public static HealthPeriodicLogger create(
Settings settings,
ClusterService clusterService,
Client client,
HealthService healthService
) {
HealthPeriodicLogger logger = new HealthPeriodicLogger(settings, clusterService, client, healthService);
logger.registerListeners();
return logger;
}

private HealthPeriodicLogger(Settings settings, ClusterService clusterService, Client client, HealthService healthService) {
this.settings = settings;
this.clusterService = clusterService;
this.client = client;
Expand All @@ -101,11 +112,8 @@ public HealthPeriodicLogger(Settings settings, ClusterService clusterService, Cl
this.enabled = ENABLED_SETTING.get(settings);
}

/**
* Initializer method to avoid the publication of a self reference in the constructor.
*/
public void init() {
if (this.enabled) {
private void registerListeners() {
if (enabled) {
clusterService.addListener(this);
}
clusterService.getClusterSettings().addSettingsUpdateConsumer(ENABLED_SETTING, this::enable);
Expand Down
228 changes: 120 additions & 108 deletions server/src/main/java/org/elasticsearch/node/NodeConstruction.java
Expand Up @@ -64,6 +64,7 @@
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Key;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.DeprecationCategory;
Expand Down Expand Up @@ -204,7 +205,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -575,7 +575,7 @@ private void construct(

SearchModule searchModule = new SearchModule(settings, pluginsService.filterPlugins(SearchPlugin.class).toList());
IndexSearcher.setMaxClauseCount(SearchUtils.calculateMaxClauseValue(threadPool));
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(
namedWriteableRegistry = new NamedWriteableRegistry(
Stream.of(
NetworkModule.getNamedWriteables().stream(),
IndicesModule.getNamedWriteables().stream(),
Expand All @@ -586,7 +586,7 @@ private void construct(
inferenceServiceRegistry.getNamedWriteables().stream()
).flatMap(Function.identity()).toList()
);
NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(
xContentRegistry = new NamedXContentRegistry(
Stream.of(
NetworkModule.getNamedXContents().stream(),
IndicesModule.getNamedXContents().stream(),
Expand Down Expand Up @@ -1066,33 +1066,40 @@ record PluginServiceInstances(
);
clusterService.addListener(pluginShutdownService);

final RecoveryPlannerService recoveryPlannerService = getRecoveryPlannerService(threadPool, clusterService, repositoryService);
final DesiredNodesSettingsValidator desiredNodesSettingsValidator = new DesiredNodesSettingsValidator();
List<ReloadablePlugin> reloadablePlugins = pluginsService.filterPlugins(ReloadablePlugin.class).toList();
pluginsService.filterPlugins(ReloadAwarePlugin.class).forEach(p -> p.setReloadCallback(wrapPlugins(reloadablePlugins)));

final MasterHistoryService masterHistoryService = new MasterHistoryService(transportService, threadPool, clusterService);
final CoordinationDiagnosticsService coordinationDiagnosticsService = new CoordinationDiagnosticsService(
clusterService,
transportService,
discoveryModule.getCoordinator(),
masterHistoryService
);
final HealthService healthService = createHealthService(clusterService, coordinationDiagnosticsService, threadPool);
HealthPeriodicLogger healthPeriodicLogger = createHealthPeriodicLogger(clusterService, settings, client, healthService);
healthPeriodicLogger.init();
HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, featureService, settings);
LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(
settings,
clusterService,
nodeService,
threadPool,
client,
featureService
modules.add(
loadDiagnosticServices(settings, discoveryModule.getCoordinator(), clusterService, transportService, featureService, threadPool)
);
HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService);
HealthApiStats healthApiStats = new HealthApiStats();

List<ReloadablePlugin> reloadablePlugins = pluginsService.filterPlugins(ReloadablePlugin.class).toList();
pluginsService.filterPlugins(ReloadAwarePlugin.class).forEach(p -> p.setReloadCallback(wrapPlugins(reloadablePlugins)));
modules.add(b -> {
RecoveryPlannerService recoveryPlannerService = getRecoveryPlannerService(threadPool, clusterService, repositoryService);
serviceProvider.processRecoverySettings(pluginsService, settingsModule.getClusterSettings(), recoverySettings);
SnapshotFilesProvider snapshotFilesProvider = new SnapshotFilesProvider(repositoryService);
var peerRecovery = new PeerRecoverySourceService(
transportService,
indicesService,
clusterService,
recoverySettings,
recoveryPlannerService
);
resourcesToClose.add(peerRecovery);
b.bind(PeerRecoverySourceService.class).toInstance(peerRecovery);
b.bind(PeerRecoveryTargetService.class)
.toInstance(
new PeerRecoveryTargetService(
client,
threadPool,
transportService,
recoverySettings,
clusterService,
snapshotFilesProvider
)
);
});

modules.add(loadPluginComponents(pluginComponents));

modules.add(b -> {
b.bind(NodeService.class).toInstance(nodeService);
Expand Down Expand Up @@ -1136,44 +1143,7 @@ record PluginServiceInstances(
b.bind(FeatureService.class).toInstance(featureService);
b.bind(Coordinator.class).toInstance(discoveryModule.getCoordinator());
b.bind(Reconfigurator.class).toInstance(discoveryModule.getReconfigurator());
{
serviceProvider.processRecoverySettings(pluginsService, settingsModule.getClusterSettings(), recoverySettings);
final SnapshotFilesProvider snapshotFilesProvider = new SnapshotFilesProvider(repositoryService);
b.bind(PeerRecoverySourceService.class)
.toInstance(
new PeerRecoverySourceService(
transportService,
indicesService,
clusterService,
recoverySettings,
recoveryPlannerService
)
);
b.bind(PeerRecoveryTargetService.class)
.toInstance(
new PeerRecoveryTargetService(
client,
threadPool,
transportService,
recoverySettings,
clusterService,
snapshotFilesProvider
)
);
}
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
pluginComponents.forEach(p -> {
if (p instanceof PluginComponentBinding<?, ?> pcb) {
@SuppressWarnings("unchecked")
Class<Object> clazz = (Class<Object>) pcb.inter();
b.bind(clazz).toInstance(pcb.impl());

} else {
@SuppressWarnings("unchecked")
Class<Object> clazz = (Class<Object>) p.getClass();
b.bind(clazz).toInstance(p);
}
});
b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
Expand All @@ -1188,19 +1158,11 @@ record PluginServiceInstances(
b.bind(PluginShutdownService.class).toInstance(pluginShutdownService);
b.bind(ExecutorSelector.class).toInstance(executorSelector);
b.bind(IndexSettingProviders.class).toInstance(indexSettingProviders);
b.bind(DesiredNodesSettingsValidator.class).toInstance(desiredNodesSettingsValidator);
b.bind(HealthService.class).toInstance(healthService);
b.bind(MasterHistoryService.class).toInstance(masterHistoryService);
b.bind(CoordinationDiagnosticsService.class).toInstance(coordinationDiagnosticsService);
b.bind(DesiredNodesSettingsValidator.class).toInstance(new DesiredNodesSettingsValidator());
b.bind(HealthNodeTaskExecutor.class).toInstance(healthNodeTaskExecutor);
b.bind(HealthMetadataService.class).toInstance(healthMetadataService);
b.bind(LocalHealthMonitor.class).toInstance(localHealthMonitor);
b.bind(HealthInfoCache.class).toInstance(nodeHealthOverview);
b.bind(HealthApiStats.class).toInstance(healthApiStats);
b.bind(Tracer.class).toInstance(tracer);
b.bind(FileSettingsService.class).toInstance(fileSettingsService);
b.bind(WriteLoadForecaster.class).toInstance(writeLoadForecaster);
b.bind(HealthPeriodicLogger.class).toInstance(healthPeriodicLogger);
b.bind(CompatibilityVersions.class).toInstance(compatibilityVersions);
b.bind(InferenceServiceRegistry.class).toInstance(inferenceServiceRegistry);
});
Expand All @@ -1214,22 +1176,99 @@ record PluginServiceInstances(

injector = modules.createInjector();

// We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there.
// The search for viable copies is triggered by an allocation attempt (i.e. a reroute) and is performed asynchronously. When it
// completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation
// service needs access to the existing shards allocators (e.g. the GatewayAllocator) which need to be able to trigger a
// reroute, which needs to call into the allocation service. We close the loop here:
clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));
postInjection(clusterModule, actionModule, clusterService, transportService, featureService);
}

private Module loadDiagnosticServices(
Settings settings,
Coordinator coordinator,
ClusterService clusterService,
TransportService transportService,
FeatureService featureService,
ThreadPool threadPool
) {

MasterHistoryService masterHistoryService = new MasterHistoryService(transportService, threadPool, clusterService);
CoordinationDiagnosticsService coordinationDiagnosticsService = new CoordinationDiagnosticsService(
clusterService,
transportService,
coordinator,
masterHistoryService
);

var serverHealthIndicatorServices = Stream.of(
new StableMasterHealthIndicatorService(coordinationDiagnosticsService, clusterService),
new RepositoryIntegrityHealthIndicatorService(clusterService),
new DiskHealthIndicatorService(clusterService),
new ShardsCapacityHealthIndicatorService(clusterService)
);
var pluginHealthIndicatorServices = pluginsService.filterPlugins(HealthPlugin.class)
.flatMap(plugin -> plugin.getHealthIndicatorServices().stream());

HealthService healthService = new HealthService(
Stream.concat(serverHealthIndicatorServices, pluginHealthIndicatorServices).toList(),
threadPool
);
HealthPeriodicLogger healthPeriodicLogger = HealthPeriodicLogger.create(settings, clusterService, client, healthService);
HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, featureService, settings);
LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(
settings,
clusterService,
nodeService,
threadPool,
client,
featureService
);
HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService);

return b -> {
b.bind(HealthService.class).toInstance(healthService);
b.bind(MasterHistoryService.class).toInstance(masterHistoryService);
b.bind(CoordinationDiagnosticsService.class).toInstance(coordinationDiagnosticsService);
b.bind(HealthMetadataService.class).toInstance(healthMetadataService);
b.bind(LocalHealthMonitor.class).toInstance(localHealthMonitor);
b.bind(HealthInfoCache.class).toInstance(nodeHealthOverview);
b.bind(HealthApiStats.class).toInstance(new HealthApiStats());
b.bind(HealthPeriodicLogger.class).toInstance(healthPeriodicLogger);
};
}

private Module loadPluginComponents(Collection<?> pluginComponents) {
List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream().map(p -> {
if (p instanceof PluginComponentBinding<?, ?> pcb) {
return pcb.impl();
}
return p;
}).filter(p -> p instanceof LifecycleComponent).map(p -> (LifecycleComponent) p).toList();
resourcesToClose.addAll(pluginLifecycleComponents);
resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
this.pluginLifecycleComponents = pluginLifecycleComponents;

return b -> pluginComponents.forEach(p -> {
if (p instanceof PluginComponentBinding<?, ?> pcb) {
@SuppressWarnings("unchecked")
Class<Object> clazz = (Class<Object>) pcb.inter();
b.bind(clazz).toInstance(pcb.impl());
} else {
@SuppressWarnings("unchecked")
Class<Object> clazz = (Class<Object>) p.getClass();
b.bind(clazz).toInstance(p);
}
});
}

private void postInjection(
ClusterModule clusterModule,
ActionModule actionModule,
ClusterService clusterService,
TransportService transportService,
FeatureService featureService
) {
// We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there.
// The search for viable copies is triggered by an allocation attempt (i.e. a reroute) and is performed asynchronously. When it
// completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation
// service needs access to the existing shards allocators (e.g. the GatewayAllocator) which need to be able to trigger a
// reroute, which needs to call into the allocation service. We close the loop here:
clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));

// Due to Java's type erasure with generics, the injector can't give us exactly what we need, and we have
// to resort to some evil casting.
Expand All @@ -1246,8 +1285,6 @@ record PluginServiceInstances(
transportService.getRemoteClusterService(),
namedWriteableRegistry
);
this.namedWriteableRegistry = namedWriteableRegistry;
this.xContentRegistry = xContentRegistry;

logger.debug("initializing HTTP handlers ...");
actionModule.initRestHandlers(() -> clusterService.state().nodesIfRecovered(), f -> {
Expand Down Expand Up @@ -1297,31 +1334,6 @@ private static ReloadablePlugin wrapPlugins(List<ReloadablePlugin> reloadablePlu
};
}

private HealthService createHealthService(
ClusterService clusterService,
CoordinationDiagnosticsService coordinationDiagnosticsService,
ThreadPool threadPool
) {
var serverHealthIndicatorServices = Stream.of(
new StableMasterHealthIndicatorService(coordinationDiagnosticsService, clusterService),
new RepositoryIntegrityHealthIndicatorService(clusterService),
new DiskHealthIndicatorService(clusterService),
new ShardsCapacityHealthIndicatorService(clusterService)
);
var pluginHealthIndicatorServices = pluginsService.filterPlugins(HealthPlugin.class)
.flatMap(plugin -> plugin.getHealthIndicatorServices().stream());
return new HealthService(Stream.concat(serverHealthIndicatorServices, pluginHealthIndicatorServices).toList(), threadPool);
}

private static HealthPeriodicLogger createHealthPeriodicLogger(
ClusterService clusterService,
Settings settings,
NodeClient client,
HealthService healthService
) {
return new HealthPeriodicLogger(settings, clusterService, client, healthService);
}

private RecoveryPlannerService getRecoveryPlannerService(
ThreadPool threadPool,
ClusterService clusterService,
Expand Down
Expand Up @@ -446,8 +446,7 @@ private HealthPeriodicLogger createAndInitHealthPeriodicLogger(
HealthService testHealthService,
boolean enabled
) {
testHealthPeriodicLogger = new HealthPeriodicLogger(Settings.EMPTY, clusterService, this.client, testHealthService);
testHealthPeriodicLogger.init();
testHealthPeriodicLogger = HealthPeriodicLogger.create(Settings.EMPTY, clusterService, this.client, testHealthService);
if (enabled) {
clusterSettings.applySettings(Settings.builder().put(HealthPeriodicLogger.ENABLED_SETTING.getKey(), true).build());
}
Expand Down

0 comments on commit 8293f3a

Please sign in to comment.