Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ TransportService newTransportService(
TaskManager taskManager,
Tracer tracer
) {
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskManager, tracer);
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskManager);
}

HttpServerTransport newHttpTransport(PluginsService pluginsService, NetworkModule networkModule) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.tracing.Tracer;

import java.io.IOException;
import java.util.concurrent.Executor;
Expand All @@ -31,7 +30,6 @@ public class RequestHandlerRegistry<Request extends TransportRequest> implements
private final boolean canTripCircuitBreaker;
private final Executor executor;
private final TaskManager taskManager;
private final Tracer tracer;
private final Writeable.Reader<Request> requestReader;
private final TransportActionStatsTracker statsTracker = new TransportActionStatsTracker();

Expand All @@ -42,8 +40,7 @@ public RequestHandlerRegistry(
TransportRequestHandler<Request> handler,
Executor executor,
boolean forceExecution,
boolean canTripCircuitBreaker,
Tracer tracer
boolean canTripCircuitBreaker
) {
this.action = action;
this.requestReader = requestReader;
Expand All @@ -52,7 +49,6 @@ public RequestHandlerRegistry(
this.canTripCircuitBreaker = canTripCircuitBreaker;
this.executor = executor;
this.taskManager = taskManager;
this.tracer = tracer;
}

public String getAction() {
Expand Down Expand Up @@ -112,8 +108,7 @@ public static <R extends TransportRequest> RequestHandlerRegistry<R> replaceHand
handler,
registry.executor,
registry.forceExecution,
registry.canTripCircuitBreaker,
registry.tracer
registry.canTripCircuitBreaker
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -135,7 +134,6 @@ protected boolean removeEldestEntry(Map.Entry<Long, TimeoutInfoHolder> eldest) {
// tracer log

private final Logger tracerLog;
private final Tracer tracer;

volatile String[] tracerLogInclude;
volatile String[] tracerLogExclude;
Expand Down Expand Up @@ -205,18 +203,6 @@ public String toString() {
}
};

public TransportService(
Settings settings,
Transport transport,
ThreadPool threadPool,
TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders
) {
this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders, Tracer.NOOP);
}

/**
* Build the service.
*
Expand All @@ -230,8 +216,7 @@ public TransportService(
TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings,
TaskManager taskManager,
Tracer tracer
TaskManager taskManager
) {
this(
settings,
Expand All @@ -241,8 +226,7 @@ public TransportService(
localNodeFactory,
clusterSettings,
new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()),
taskManager,
tracer
taskManager
);
}

Expand All @@ -254,8 +238,7 @@ public TransportService(
TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders,
Tracer tracer
Set<String> taskHeaders
) {
this(
settings,
Expand All @@ -265,8 +248,7 @@ public TransportService(
localNodeFactory,
clusterSettings,
new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()),
new TaskManager(settings, threadPool, taskHeaders),
tracer
new TaskManager(settings, threadPool, taskHeaders)
);
}

Expand All @@ -279,15 +261,13 @@ public TransportService(
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings,
ConnectionManager connectionManager,
TaskManager taskManger,
Tracer tracer
TaskManager taskManger
) {
this.transport = transport;
transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings));
this.threadPool = threadPool;
this.localNodeFactory = localNodeFactory;
this.connectionManager = connectionManager;
this.tracer = tracer;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings));
setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings));
Expand Down Expand Up @@ -1220,8 +1200,7 @@ public <Request extends TransportRequest> void registerRequestHandler(
handler,
executor,
false,
true,
tracer
true
);
transport.registerRequestHandler(reg);
}
Expand Down Expand Up @@ -1253,8 +1232,7 @@ public <Request extends TransportRequest> void registerRequestHandler(
handler,
executor,
forceExecution,
canTripCircuitBreaker,
tracer
canTripCircuitBreaker
);
transport.registerRequestHandler(reg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) {
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundTransportAddressDiscoveryNodeFunction,
null,
taskManager,
Tracer.NOOP
taskManager
);
taskManager.setTaskCancellationService(new TaskCancellationService(transportService));
transportService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
Expand Down Expand Up @@ -74,8 +73,7 @@ public void testJoinDeduplication() {
x -> localNode,
clusterSettings,
new ClusterConnectionManager(Settings.EMPTY, capturingTransport, threadPool.getThreadContext()),
taskManger,
Tracer.NOOP
taskManger
);
JoinHelper joinHelper = new JoinHelper(
null,
Expand Down Expand Up @@ -241,8 +239,7 @@ public void testJoinFailureOnUnhealthyNodes() {
x -> localNode,
clusterSettings,
new ClusterConnectionManager(Settings.EMPTY, capturingTransport, threadPool.getThreadContext()),
taskManger,
Tracer.NOOP
taskManger
);
AtomicReference<StatusInfo> nodeHealthServiceStatus = new AtomicReference<>(new StatusInfo(UNHEALTHY, "unhealthy-info"));
JoinHelper joinHelper = new JoinHelper(
Expand Down Expand Up @@ -319,8 +316,7 @@ public void testLatestStoredStateFailure() {
x -> localNode,
clusterSettings,
new ClusterConnectionManager(Settings.EMPTY, capturingTransport, threadPool.getThreadContext()),
taskManger,
Tracer.NOOP
taskManger
);
JoinHelper joinHelper = new JoinHelper(
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
Expand Down Expand Up @@ -235,8 +234,7 @@ public void setup() {
boundTransportAddress -> localNode,
null,
connectionManager,
new TaskManager(settings, threadPool, emptySet()),
Tracer.NOOP
new TaskManager(settings, threadPool, emptySet())
);

transportService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.snapshots.EmptySnapshotsInfoService;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -231,8 +230,7 @@ protected ExecutorService createThreadPoolExecutor() {
.address(boundAddress.publishAddress())
.build(),
clusterSettings,
Collections.emptySet(),
Tracer.NOOP
Collections.emptySet()
) {
@Override
public Transport.Connection getConnection(DiscoveryNode node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.TransportVersionUtils;
Expand Down Expand Up @@ -114,8 +113,7 @@ public void testPing() throws Exception {
(request, channel, task) -> channelCaptor.set(channel),
EsExecutors.DIRECT_EXECUTOR_SERVICE,
false,
true,
Tracer.NOOP
true
);
requestHandlers.registerHandler(registry);

Expand Down Expand Up @@ -167,8 +165,7 @@ public TestResponse read(StreamInput in) throws IOException {
},
EsExecutors.DIRECT_EXECUTOR_SERVICE,
false,
true,
Tracer.NOOP
true
);
requestHandlers.registerHandler(registry);
String requestValue = randomAlphaOfLength(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.CloseableConnection;
import org.elasticsearch.transport.ClusterConnectionManager;
Expand Down Expand Up @@ -74,8 +73,7 @@ public TransportService createTransportService(
localNodeFactory,
clusterSettings,
connectionManager,
new TaskManager(settings, threadPool, taskHeaders),
Tracer.NOOP
new TaskManager(settings, threadPool, taskHeaders)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,7 @@ private MockTransportService(
localNodeFactory,
clusterSettings,
new StubbableConnectionManager(new ClusterConnectionManager(settings, transport, threadPool.getThreadContext())),
taskManager,
Tracer.NOOP
taskManager
);
this.original = transport.getDelegate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -76,8 +75,7 @@ public TransportService createTransportService(
interceptor,
localNodeFactory,
clusterSettings,
new TaskManager(settings, threadPool, taskHeaders),
Tracer.NOOP
new TaskManager(settings, threadPool, taskHeaders)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.mocksocket.MockSocket;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.DefaultBuiltInExecutorBuilders;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -111,8 +110,7 @@ public void onRequestReceived(long requestId, String action) {
(request, channel, task) -> channel.sendResponse(ActionResponse.Empty.INSTANCE),
EsExecutors.DIRECT_EXECUTOR_SERVICE,
false,
true,
Tracer.NOOP
true
)
);
securityNettyTransport.start();
Expand Down