diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 4366789342c12..4d44c6eaa0654 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -1154,7 +1154,7 @@ public Map queryFields() { localNodeFactory, settingsModule.getClusterSettings(), taskManager, - telemetryProvider.getTracer(), + telemetryProvider, nodeEnvironment.nodeId(), linkedProjectConfigService, projectResolver diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 0b8c59b3c1957..1e7285169d826 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -40,6 +40,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ClusterConnectionManager; @@ -120,7 +121,7 @@ TransportService newTransportService( Function localNodeFactory, ClusterSettings clusterSettings, TaskManager taskManager, - Tracer tracer, + TelemetryProvider telemetryProvider, String nodeId, LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver @@ -135,6 +136,7 @@ TransportService newTransportService( new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()), taskManager, linkedProjectConfigService, + telemetryProvider, projectResolver ); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index bec802147ea90..31d73822ecd9d 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -65,6 +65,7 @@ public final class RemoteClusterService extends RemoteClusterAware private static final Logger logger = LogManager.getLogger(RemoteClusterService.class); public static final String REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME = "cluster:internal/remote_cluster/handshake"; + public static final String CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME = "es.projects.linked.connections.error.total"; private final boolean isRemoteClusterClient; private final boolean isSearchNode; @@ -101,6 +102,11 @@ public boolean isRemoteClusterServerEnabled() { * the functionality to do it the right way is not yet ready -- replace this code when it's ready. */ this.crossProjectEnabled = settings.getAsBoolean("serverless.cross_project.enabled", false); + if (transportService != null) { + transportService.getTelemetryProvider() + .getMeterRegistry() + .registerLongCounter(CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME, "linked project connection attempt failure count", "count"); + } } public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 9cde5c086ab39..3986788ac4a4e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -20,6 +20,8 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.Nullable; +import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.metric.LongCounter; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; @@ -30,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -70,6 +73,11 @@ public Writeable.Reader getReader() { } } + enum ConnectionAttempt { + initial, + reconnect + } + private final int maxPendingConnectionListeners; protected final Logger logger = LogManager.getLogger(getClass()); @@ -78,6 +86,7 @@ public Writeable.Reader getReader() { private final Object mutex = new Object(); private List> listeners = new ArrayList<>(); private final AtomicBoolean initialConnectionAttempted = new AtomicBoolean(false); + private final LongCounter connectionAttemptFailures; protected final TransportService transportService; protected final RemoteConnectionManager connectionManager; @@ -92,9 +101,15 @@ public Writeable.Reader getReader() { this.transportService = transportService; this.connectionManager = connectionManager; this.maxPendingConnectionListeners = config.maxPendingConnectionListeners(); + this.connectionAttemptFailures = lookupConnectionFailureMetric(transportService.getTelemetryProvider()); connectionManager.addListener(this); } + private LongCounter lookupConnectionFailureMetric(TelemetryProvider telemetryProvider) { + final var meterRegistry = telemetryProvider == null ? null : telemetryProvider.getMeterRegistry(); + return meterRegistry == null ? null : meterRegistry.getLongCounter(RemoteClusterService.CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME); + } + static ConnectionProfile buildConnectionProfile(LinkedProjectConfig config, String transportProfile) { ConnectionProfile.Builder builder = new ConnectionProfile.Builder().setConnectTimeout(config.transportConnectTimeout()) .setHandshakeTimeout(config.transportConnectTimeout()) @@ -221,7 +236,21 @@ private void connectionAttemptCompleted(@Nullable Exception e) { logger.debug(msgSupplier); } else { logger.warn(msgSupplier, e); - // TODO: ES-12695: Increment either the initial or retry connection failure metric. + if (connectionAttemptFailures != null) { + connectionAttemptFailures.incrementBy( + 1, + Map.of( + "linked_project_id", + linkedProjectId.toString(), + "linked_project_alias", + clusterAlias, + "attempt", + (isInitialAttempt ? ConnectionAttempt.initial : ConnectionAttempt.reconnect).toString(), + "strategy", + strategyType().toString() + ) + ); + } } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 785a56a41ddbe..f8d2ec9d0adac 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -50,6 +50,7 @@ import org.elasticsearch.node.ReportingService; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -140,6 +141,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { volatile String[] tracerLogExclude; private final LinkedProjectConfigService linkedProjectConfigService; + private final TelemetryProvider telemetryProvider; private final RemoteClusterService remoteClusterService; /** @@ -277,6 +279,7 @@ public TransportService( connectionManager, taskManger, new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE), + TelemetryProvider.NOOP, DefaultProjectResolver.INSTANCE ); } @@ -292,6 +295,7 @@ public TransportService( ConnectionManager connectionManager, TaskManager taskManger, LinkedProjectConfigService linkedProjectConfigService, + TelemetryProvider telemetryProvider, ProjectResolver projectResolver ) { this.transport = transport; @@ -308,6 +312,7 @@ public TransportService( this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); this.enableStackOverflowAvoidance = ENABLE_STACK_OVERFLOW_AVOIDANCE.get(settings); this.linkedProjectConfigService = linkedProjectConfigService; + this.telemetryProvider = telemetryProvider; remoteClusterService = new RemoteClusterService(settings, this, projectResolver); responseHandlers = transport.getResponseHandlers(); if (clusterSettings != null) { @@ -354,6 +359,10 @@ void setTracerLogExclude(List tracerLogExclude) { this.tracerLogExclude = tracerLogExclude.toArray(Strings.EMPTY_ARRAY); } + public TelemetryProvider getTelemetryProvider() { + return telemetryProvider; + } + @Override protected void doStart() { transport.setMessageListener(this); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 0a5cfd670dcf6..aa4260a1cd20c 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -20,6 +20,8 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EnumSerializationTestUtils; import org.elasticsearch.test.MockLog; @@ -28,12 +30,15 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Set; + import static org.elasticsearch.test.MockLog.assertThatLogger; import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS; import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_CONNECTION_MODE; import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS; import static org.elasticsearch.transport.RemoteClusterSettings.toConfig; import static org.elasticsearch.transport.RemoteConnectionStrategy.buildConnectionProfile; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; public class RemoteConnectionStrategyTests extends ESTestCase { @@ -194,7 +199,7 @@ public void testConnectionStrategySerialization() { value = "org.elasticsearch.transport.RemoteConnectionStrategyTests.FakeConnectionStrategy:DEBUG", reason = "logging verification" ) - public void testConnectionAttemptLogging() { + public void testConnectionAttemptMetricsAndLogging() { final var originProjectId = randomUniqueProjectId(); final var linkedProjectId = randomUniqueProjectId(); final var alias = randomAlphanumericOfLength(10); @@ -208,8 +213,13 @@ public void testConnectionAttemptLogging() { new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class), threadContext) ) ) { + assert transportService.getTelemetryProvider() != null; + final var meterRegistry = transportService.getTelemetryProvider().getMeterRegistry(); + assert meterRegistry instanceof RecordingMeterRegistry; + final var metricRecorder = ((RecordingMeterRegistry) meterRegistry).getRecorder(); + for (boolean shouldConnectFail : new boolean[] { true, false }) { - for (boolean isIntialConnectAttempt : new boolean[] { true, false }) { + for (boolean isInitialConnectAttempt : new boolean[] { true, false }) { final var strategy = new FakeConnectionStrategy( originProjectId, linkedProjectId, @@ -217,7 +227,7 @@ public void testConnectionAttemptLogging() { transportService, connectionManager ); - if (isIntialConnectAttempt == false) { + if (isInitialConnectAttempt == false) { waitForConnect(strategy); } strategy.setShouldConnectFail(shouldConnectFail); @@ -228,7 +238,7 @@ public void testConnectionAttemptLogging() { shouldConnectFail ? "failed to connect" : "successfully connected", linkedProjectId, alias, - isIntialConnectAttempt ? "the initial connection" : "a reconnection" + isInitialConnectAttempt ? "the initial connection" : "a reconnection" ); assertThatLogger(() -> { if (shouldConnectFail) { @@ -243,12 +253,30 @@ public void testConnectionAttemptLogging() { + expectedLogLevel + " after a " + (shouldConnectFail ? "failed" : "successful") - + (isIntialConnectAttempt ? " initial connection attempt" : " reconnection attempt"), + + (isInitialConnectAttempt ? " initial connection attempt" : " reconnection attempt"), strategy.getClass().getCanonicalName(), expectedLogLevel, expectedLogMessage ) ); + if (shouldConnectFail) { + metricRecorder.collect(); + final var counterName = RemoteClusterService.CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME; + final var measurements = metricRecorder.getMeasurements(InstrumentType.LONG_COUNTER, counterName); + assertFalse(measurements.isEmpty()); + final var measurement = measurements.getLast(); + assertThat(measurement.getLong(), equalTo(1L)); + final var attributes = measurement.attributes(); + final var keySet = Set.of("linked_project_id", "linked_project_alias", "attempt", "strategy"); + final var expectedAttemptType = isInitialConnectAttempt + ? RemoteConnectionStrategy.ConnectionAttempt.initial + : RemoteConnectionStrategy.ConnectionAttempt.reconnect; + assertThat(attributes.keySet(), equalTo(keySet)); + assertThat(attributes.get("linked_project_id"), equalTo(linkedProjectId.toString())); + assertThat(attributes.get("linked_project_alias"), equalTo(alias)); + assertThat(attributes.get("attempt"), equalTo(expectedAttemptType.toString())); + assertThat(attributes.get("strategy"), equalTo(strategy.strategyType().toString())); + } } } } diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index e7668269c0073..aa9f9c8de5ace 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -45,6 +45,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockHttpTransport; @@ -174,7 +175,7 @@ protected TransportService newTransportService( Function localNodeFactory, ClusterSettings clusterSettings, TaskManager taskManager, - Tracer tracer, + TelemetryProvider telemetryProvider, String nodeId, LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver @@ -194,7 +195,7 @@ protected TransportService newTransportService( localNodeFactory, clusterSettings, taskManager, - tracer, + telemetryProvider, nodeId, linkedProjectConfigService, projectResolver @@ -209,6 +210,7 @@ protected TransportService newTransportService( clusterSettings, MockTransportService.createTaskManager(settings, threadPool, taskManager.getTaskHeaders(), Tracer.NOOP, nodeId), linkedProjectConfigService, + telemetryProvider, projectResolver ); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 2c0a3dd267638..2c1ea9d329162 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -46,6 +46,9 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchModule; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.telemetry.RecordingMeterRegistry; +import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; @@ -264,6 +267,19 @@ public MockTransportService( clusterSettings, createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP, nodeId), new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE), + new TelemetryProvider() { + final MeterRegistry meterRegistry = new RecordingMeterRegistry(); + + @Override + public Tracer getTracer() { + return Tracer.NOOP; + } + + @Override + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } + }, DefaultProjectResolver.INSTANCE ); } @@ -277,6 +293,7 @@ public MockTransportService( @Nullable ClusterSettings clusterSettings, TaskManager taskManager, LinkedProjectConfigService linkedProjectConfigService, + TelemetryProvider telemetryProvider, ProjectResolver projectResolver ) { super( @@ -289,6 +306,7 @@ public MockTransportService( new StubbableConnectionManager(new ClusterConnectionManager(settings, transport, threadPool.getThreadContext())), taskManager, linkedProjectConfigService, + telemetryProvider, projectResolver ); this.original = transport.getDelegate();