Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxied requests query other nodes in parallel #2779

Merged
merged 8 commits into from Sep 8, 2016
Expand Up @@ -163,6 +163,9 @@ public abstract class BaseConfiguration {
@Parameter(value = "web_tls_key_password")
private String webTlsKeyPassword;

@Parameter(value = "proxied_requests_thread_pool_size", required = true, validator = PositiveIntegerValidator.class)
private int proxiedRequestsThreadPoolSize = 32;

public String getRestUriScheme() {
return getUriScheme(isRestEnableTls());
}
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.graylog2.shared.rest.resources.system.RemoteDeflectorResource;

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand All @@ -38,6 +39,7 @@
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;

@RequiresAuthentication
Expand All @@ -48,8 +50,9 @@ public class ClusterDeflectorResource extends ProxiedResource {
@Inject
public ClusterDeflectorResource(@Context HttpHeaders httpHeaders,
NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider) {
super(httpHeaders, nodeService, remoteInterfaceProvider);
RemoteInterfaceProvider remoteInterfaceProvider,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
}

@POST
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.graylog2.shared.security.RestPermissions;

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
Expand All @@ -50,6 +51,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;

@RequiresAuthentication
@Api(value = "Cluster/InputState", description = "Cluster-wide input states")
Expand All @@ -59,8 +61,9 @@ public class ClusterInputStatesResource extends ProxiedResource {
@Inject
public ClusterInputStatesResource(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
@Context HttpHeaders httpHeaders) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider);
@Context HttpHeaders httpHeaders,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
}

@GET
Expand Down
Expand Up @@ -36,6 +36,7 @@
import retrofit2.Response;

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
Expand All @@ -45,6 +46,7 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.concurrent.ExecutorService;

import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;

Expand All @@ -58,8 +60,9 @@ public class ClusterJournalResource extends ProxiedResource {
@Inject
public ClusterJournalResource(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
@Context HttpHeaders httpHeaders) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider);
@Context HttpHeaders httpHeaders,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
}

@GET
Expand Down
Expand Up @@ -36,6 +36,7 @@
import retrofit2.Response;

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
Expand All @@ -45,6 +46,7 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.concurrent.ExecutorService;

import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;

Expand All @@ -58,8 +60,9 @@ public class ClusterLoadBalancerStatusResource extends ProxiedResource {
@Inject
public ClusterLoadBalancerStatusResource(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
@Context HttpHeaders httpHeaders) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider);
@Context HttpHeaders httpHeaders,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
}

@PUT
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.hibernate.validator.constraints.NotEmpty;

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
Expand All @@ -46,6 +47,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

@RequiresAuthentication
@Api(value = "Cluster/System/Loggers", description = "Cluster-wide access to internal Graylog loggers")
Expand All @@ -54,8 +56,9 @@ public class ClusterLoggersResource extends ProxiedResource {
@Inject
public ClusterLoggersResource(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
@Context HttpHeaders httpHeaders) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider);
@Context HttpHeaders httpHeaders,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
}

@GET
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.graylog2.shared.rest.resources.system.RemoteMetricsResource;

import javax.inject.Inject;
import javax.inject.Named;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.ws.rs.POST;
Expand All @@ -44,6 +45,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

@RequiresAuthentication
@Api(value = "Cluster/Metrics", description = "Cluster-wide Internal Graylog metrics")
Expand All @@ -54,8 +56,9 @@ public class ClusterMetricsResource extends ProxiedResource {
@Inject
public ClusterMetricsResource(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
@Context HttpHeaders httpHeaders) {
super(httpHeaders, nodeService, remoteInterfaceProvider);
@Context HttpHeaders httpHeaders,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
}

@POST
Expand Down
Expand Up @@ -38,6 +38,7 @@
import retrofit2.Response;

import javax.inject.Inject;
import javax.inject.Named;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.ws.rs.GET;
Expand All @@ -50,6 +51,7 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.concurrent.ExecutorService;

import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;

Expand All @@ -61,8 +63,9 @@ public class ClusterNodeMetricsResource extends ProxiedResource {
@Inject
public ClusterNodeMetricsResource(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
@Context HttpHeaders httpHeaders) {
super(httpHeaders, nodeService, remoteInterfaceProvider);
@Context HttpHeaders httpHeaders,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
}

private RemoteMetricsResource getResourceForNode(String nodeId) throws NodeNotFoundException {
Expand Down
Expand Up @@ -36,6 +36,7 @@
import retrofit2.Response;

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
Expand All @@ -49,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

@RequiresAuthentication
@Api(value = "Cluster/Jobs", description = "Cluster-wide System Jobs")
Expand All @@ -59,8 +61,9 @@ public class ClusterSystemJobResource extends ProxiedResource {
@Inject
public ClusterSystemJobResource(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
@Context HttpHeaders httpHeaders) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider);
@Context HttpHeaders httpHeaders,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
}

@GET
Expand Down
Expand Up @@ -34,6 +34,7 @@
import retrofit2.Response;

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
Expand All @@ -43,6 +44,7 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.concurrent.ExecutorService;

import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;

Expand All @@ -56,8 +58,9 @@ public class ClusterSystemPluginResource extends ProxiedResource {
@Inject
public ClusterSystemPluginResource(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
@Context HttpHeaders httpHeaders) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider);
@Context HttpHeaders httpHeaders,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
}

@GET
Expand Down
Expand Up @@ -34,6 +34,7 @@
import retrofit2.Response;

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
Expand All @@ -43,6 +44,7 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.concurrent.ExecutorService;

import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;

Expand All @@ -56,8 +58,9 @@ public class ClusterSystemProcessingResource extends ProxiedResource {
@Inject
public ClusterSystemProcessingResource(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
@Context HttpHeaders httpHeaders) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider);
@Context HttpHeaders httpHeaders,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
}

private RemoteSystemProcessingResource getRemoteSystemProcessingResource(String nodeId) throws NodeNotFoundException {
Expand Down
Expand Up @@ -38,6 +38,7 @@
import retrofit2.Response;

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
Expand All @@ -49,6 +50,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;

Expand All @@ -62,8 +64,9 @@ public class ClusterSystemResource extends ProxiedResource {
@Inject
public ClusterSystemResource(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
@Context HttpHeaders httpHeaders) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider);
@Context HttpHeaders httpHeaders,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
}

@GET
Expand Down
Expand Up @@ -35,6 +35,7 @@
import retrofit2.Response;

import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
Expand All @@ -44,6 +45,7 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.concurrent.ExecutorService;

import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
Expand All @@ -58,8 +60,9 @@ public class ClusterSystemShutdownResource extends ProxiedResource {
@Inject
public ClusterSystemShutdownResource(NodeService nodeService,
RemoteInterfaceProvider remoteInterfaceProvider,
@Context HttpHeaders httpHeaders) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider);
@Context HttpHeaders httpHeaders,
@Named("proxiedRequestsExecutorService") ExecutorService executorService) throws NodeNotFoundException {
super(httpHeaders, nodeService, remoteInterfaceProvider, executorService);
}

@POST
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.graylog2.shared.bindings.providers.MetricRegistryProvider;
import org.graylog2.shared.bindings.providers.NodeIdProvider;
import org.graylog2.shared.bindings.providers.OkHttpClientProvider;
import org.graylog2.shared.bindings.providers.ProxiedRequestsExecutorService;
import org.graylog2.shared.bindings.providers.ServiceManagerProvider;
import org.graylog2.shared.bindings.providers.SystemOkHttpClientProvider;
import org.graylog2.shared.buffers.InputBufferImpl;
Expand All @@ -44,6 +45,7 @@
import org.jboss.netty.util.HashedWheelTimer;

import javax.activation.MimetypesFileTypeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

public class GenericBindings extends AbstractModule {
Expand Down Expand Up @@ -77,5 +79,7 @@ protected void configure() {
bind(OkHttpClient.class).annotatedWith(Names.named("systemHttpClient")).toProvider(SystemOkHttpClientProvider.class).asEagerSingleton();

bind(MimetypesFileTypeMap.class).toInstance(new MimetypesFileTypeMap());

bind(ExecutorService.class).annotatedWith(Names.named("proxiedRequestsExecutorService")).toProvider(ProxiedRequestsExecutorService.class).asEagerSingleton();
}
}