Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ public Map<String, String> queryFields() {
localNodeFactory,
settingsModule.getClusterSettings(),
taskManager,
telemetryProvider.getTracer(),
telemetryProvider,
nodeEnvironment.nodeId(),
linkedProjectConfigService,
projectResolver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ClusterConnectionManager;
Expand Down Expand Up @@ -120,7 +121,7 @@ TransportService newTransportService(
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
TaskManager taskManager,
Tracer tracer,
TelemetryProvider telemetryProvider,
String nodeId,
LinkedProjectConfigService linkedProjectConfigService,
ProjectResolver projectResolver
Expand All @@ -135,6 +136,7 @@ TransportService newTransportService(
new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()),
taskManager,
linkedProjectConfigService,
telemetryProvider,
projectResolver
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public final class RemoteClusterService extends RemoteClusterAware
private static final Logger logger = LogManager.getLogger(RemoteClusterService.class);

public static final String REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME = "cluster:internal/remote_cluster/handshake";
public static final String CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME = "es.projects.linked.connections.error.total";

private final boolean isRemoteClusterClient;
private final boolean isSearchNode;
Expand Down Expand Up @@ -101,6 +102,11 @@ public boolean isRemoteClusterServerEnabled() {
* the functionality to do it the right way is not yet ready -- replace this code when it's ready.
*/
this.crossProjectEnabled = settings.getAsBoolean("serverless.cross_project.enabled", false);
if (transportService != null) {
transportService.getTelemetryProvider()
.getMeterRegistry()
.registerLongCounter(CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME, "linked project connection attempt failure count", "count");
}
}

public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
Expand All @@ -30,6 +32,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -70,6 +73,11 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
}
}

enum ConnectionAttempt {
initial,
reconnect
}

private final int maxPendingConnectionListeners;

protected final Logger logger = LogManager.getLogger(getClass());
Expand All @@ -78,6 +86,7 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
private final Object mutex = new Object();
private List<ActionListener<Void>> listeners = new ArrayList<>();
private final AtomicBoolean initialConnectionAttempted = new AtomicBoolean(false);
private final LongCounter connectionAttemptFailures;

protected final TransportService transportService;
protected final RemoteConnectionManager connectionManager;
Expand All @@ -92,9 +101,15 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
this.transportService = transportService;
this.connectionManager = connectionManager;
this.maxPendingConnectionListeners = config.maxPendingConnectionListeners();
this.connectionAttemptFailures = lookupConnectionFailureMetric(transportService.getTelemetryProvider());
connectionManager.addListener(this);
}

private LongCounter lookupConnectionFailureMetric(TelemetryProvider telemetryProvider) {
final var meterRegistry = telemetryProvider == null ? null : telemetryProvider.getMeterRegistry();
return meterRegistry == null ? null : meterRegistry.getLongCounter(RemoteClusterService.CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME);
}

static ConnectionProfile buildConnectionProfile(LinkedProjectConfig config, String transportProfile) {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder().setConnectTimeout(config.transportConnectTimeout())
.setHandshakeTimeout(config.transportConnectTimeout())
Expand Down Expand Up @@ -221,7 +236,21 @@ private void connectionAttemptCompleted(@Nullable Exception e) {
logger.debug(msgSupplier);
} else {
logger.warn(msgSupplier, e);
// TODO: ES-12695: Increment either the initial or retry connection failure metric.
if (connectionAttemptFailures != null) {
connectionAttemptFailures.incrementBy(
1,
Map.of(
"linked_project_id",
linkedProjectId.toString(),
"linked_project_alias",
clusterAlias,
"attempt",
(isInitialAttempt ? ConnectionAttempt.initial : ConnectionAttempt.reconnect).toString(),
"strategy",
strategyType().toString()
)
);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -140,6 +141,7 @@ protected boolean removeEldestEntry(Map.Entry<Long, TimeoutInfoHolder> eldest) {
volatile String[] tracerLogExclude;

private final LinkedProjectConfigService linkedProjectConfigService;
private final TelemetryProvider telemetryProvider;
private final RemoteClusterService remoteClusterService;

/**
Expand Down Expand Up @@ -277,6 +279,7 @@ public TransportService(
connectionManager,
taskManger,
new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE),
TelemetryProvider.NOOP,
DefaultProjectResolver.INSTANCE
);
}
Expand All @@ -292,6 +295,7 @@ public TransportService(
ConnectionManager connectionManager,
TaskManager taskManger,
LinkedProjectConfigService linkedProjectConfigService,
TelemetryProvider telemetryProvider,
ProjectResolver projectResolver
) {
this.transport = transport;
Expand All @@ -308,6 +312,7 @@ public TransportService(
this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings);
this.enableStackOverflowAvoidance = ENABLE_STACK_OVERFLOW_AVOIDANCE.get(settings);
this.linkedProjectConfigService = linkedProjectConfigService;
this.telemetryProvider = telemetryProvider;
remoteClusterService = new RemoteClusterService(settings, this, projectResolver);
responseHandlers = transport.getResponseHandlers();
if (clusterSettings != null) {
Expand Down Expand Up @@ -354,6 +359,10 @@ void setTracerLogExclude(List<String> tracerLogExclude) {
this.tracerLogExclude = tracerLogExclude.toArray(Strings.EMPTY_ARRAY);
}

public TelemetryProvider getTelemetryProvider() {
return telemetryProvider;
}

@Override
protected void doStart() {
transport.setMessageListener(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EnumSerializationTestUtils;
import org.elasticsearch.test.MockLog;
Expand All @@ -28,12 +30,15 @@
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Set;

import static org.elasticsearch.test.MockLog.assertThatLogger;
import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS;
import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_CONNECTION_MODE;
import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS;
import static org.elasticsearch.transport.RemoteClusterSettings.toConfig;
import static org.elasticsearch.transport.RemoteConnectionStrategy.buildConnectionProfile;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

public class RemoteConnectionStrategyTests extends ESTestCase {
Expand Down Expand Up @@ -194,7 +199,7 @@ public void testConnectionStrategySerialization() {
value = "org.elasticsearch.transport.RemoteConnectionStrategyTests.FakeConnectionStrategy:DEBUG",
reason = "logging verification"
)
public void testConnectionAttemptLogging() {
public void testConnectionAttemptMetricsAndLogging() {
final var originProjectId = randomUniqueProjectId();
final var linkedProjectId = randomUniqueProjectId();
final var alias = randomAlphanumericOfLength(10);
Expand All @@ -208,16 +213,21 @@ public void testConnectionAttemptLogging() {
new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class), threadContext)
)
) {
assert transportService.getTelemetryProvider() != null;
final var meterRegistry = transportService.getTelemetryProvider().getMeterRegistry();
assert meterRegistry instanceof RecordingMeterRegistry;
final var metricRecorder = ((RecordingMeterRegistry) meterRegistry).getRecorder();
Comment on lines +216 to +219
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my other comments about static fields. If we create two TransportService instances representing two nodes and test connection strategies for each of them, the static fields will make RemoteConnectionStrategy objects from different TransportService instance share the same metrics give incorrect results.


for (boolean shouldConnectFail : new boolean[] { true, false }) {
for (boolean isIntialConnectAttempt : new boolean[] { true, false }) {
for (boolean isInitialConnectAttempt : new boolean[] { true, false }) {
final var strategy = new FakeConnectionStrategy(
originProjectId,
linkedProjectId,
alias,
transportService,
connectionManager
);
if (isIntialConnectAttempt == false) {
if (isInitialConnectAttempt == false) {
waitForConnect(strategy);
}
strategy.setShouldConnectFail(shouldConnectFail);
Expand All @@ -228,7 +238,7 @@ public void testConnectionAttemptLogging() {
shouldConnectFail ? "failed to connect" : "successfully connected",
linkedProjectId,
alias,
isIntialConnectAttempt ? "the initial connection" : "a reconnection"
isInitialConnectAttempt ? "the initial connection" : "a reconnection"
);
assertThatLogger(() -> {
if (shouldConnectFail) {
Expand All @@ -243,12 +253,30 @@ public void testConnectionAttemptLogging() {
+ expectedLogLevel
+ " after a "
+ (shouldConnectFail ? "failed" : "successful")
+ (isIntialConnectAttempt ? " initial connection attempt" : " reconnection attempt"),
+ (isInitialConnectAttempt ? " initial connection attempt" : " reconnection attempt"),
strategy.getClass().getCanonicalName(),
expectedLogLevel,
expectedLogMessage
)
);
if (shouldConnectFail) {
metricRecorder.collect();
final var counterName = RemoteClusterService.CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME;
final var measurements = metricRecorder.getMeasurements(InstrumentType.LONG_COUNTER, counterName);
assertFalse(measurements.isEmpty());
final var measurement = measurements.getLast();
assertThat(measurement.getLong(), equalTo(1L));
final var attributes = measurement.attributes();
final var keySet = Set.of("linked_project_id", "linked_project_alias", "attempt", "strategy");
final var expectedAttemptType = isInitialConnectAttempt
? RemoteConnectionStrategy.ConnectionAttempt.initial
: RemoteConnectionStrategy.ConnectionAttempt.reconnect;
assertThat(attributes.keySet(), equalTo(keySet));
assertThat(attributes.get("linked_project_id"), equalTo(linkedProjectId.toString()));
assertThat(attributes.get("linked_project_alias"), equalTo(alias));
assertThat(attributes.get("attempt"), equalTo(expectedAttemptType.toString()));
assertThat(attributes.get("strategy"), equalTo(strategy.strategyType().toString()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockHttpTransport;
Expand Down Expand Up @@ -174,7 +175,7 @@ protected TransportService newTransportService(
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
TaskManager taskManager,
Tracer tracer,
TelemetryProvider telemetryProvider,
String nodeId,
LinkedProjectConfigService linkedProjectConfigService,
ProjectResolver projectResolver
Expand All @@ -194,7 +195,7 @@ protected TransportService newTransportService(
localNodeFactory,
clusterSettings,
taskManager,
tracer,
telemetryProvider,
nodeId,
linkedProjectConfigService,
projectResolver
Expand All @@ -209,6 +210,7 @@ protected TransportService newTransportService(
clusterSettings,
MockTransportService.createTaskManager(settings, threadPool, taskManager.getTaskHeaders(), Tracer.NOOP, nodeId),
linkedProjectConfigService,
telemetryProvider,
projectResolver
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -264,6 +267,19 @@ public MockTransportService(
clusterSettings,
createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP, nodeId),
new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE),
new TelemetryProvider() {
final MeterRegistry meterRegistry = new RecordingMeterRegistry();

@Override
public Tracer getTracer() {
return Tracer.NOOP;
}

@Override
public MeterRegistry getMeterRegistry() {
return meterRegistry;
}
},
DefaultProjectResolver.INSTANCE
);
}
Expand All @@ -277,6 +293,7 @@ public MockTransportService(
@Nullable ClusterSettings clusterSettings,
TaskManager taskManager,
LinkedProjectConfigService linkedProjectConfigService,
TelemetryProvider telemetryProvider,
ProjectResolver projectResolver
) {
super(
Expand All @@ -289,6 +306,7 @@ public MockTransportService(
new StubbableConnectionManager(new ClusterConnectionManager(settings, transport, threadPool.getThreadContext())),
taskManager,
linkedProjectConfigService,
telemetryProvider,
projectResolver
);
this.original = transport.getDelegate();
Expand Down