Skip to content

Commit

Permalink
use local operation shortcut in DistributingDownstream
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Apr 21, 2015
1 parent aa22f8b commit 420e429
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 685 deletions.
80 changes: 26 additions & 54 deletions sql/src/main/java/io/crate/action/job/TransportJobAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import io.crate.core.collections.Bucket;
import io.crate.exceptions.Exceptions;
import io.crate.executor.transport.DefaultTransportResponseHandler;
import io.crate.executor.transport.ResponseForwarder;
import io.crate.executor.transport.NodeAction;
import io.crate.executor.transport.NodeActionRequestHandler;
import io.crate.executor.transport.Transports;
import io.crate.jobs.JobContextService;
import io.crate.jobs.JobExecutionContext;
import io.crate.jobs.PageDownstreamContext;
Expand All @@ -47,16 +49,12 @@
import io.crate.planner.node.dql.CollectNode;
import io.crate.planner.node.dql.MergeNode;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;

import javax.annotation.Nonnull;
Expand All @@ -65,10 +63,9 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

@Singleton
public class TransportJobAction {
public class TransportJobAction implements NodeAction<JobRequest, JobResponse> {

private static final ESLogger LOGGER = Loggers.getLogger(TransportJobAction.class);

Expand All @@ -77,9 +74,8 @@ public class TransportJobAction {
private static final String COLLECT_EXECUTOR = ThreadPool.Names.SEARCH;


private final Transports transports;
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterService clusterService;

private final CircuitBreaker circuitBreaker;
private final ExecutionNodesExecutingVisitor executionNodeVisitor;
Expand All @@ -88,57 +84,42 @@ public class TransportJobAction {

@Inject
public TransportJobAction(TransportService transportService,
Transports transports,
JobContextService jobContextService,
ResultProviderFactory resultProviderFactory,
PageDownstreamFactory pageDownstreamFactory,
ClusterService clusterService,
StreamerVisitor streamerVisitor,
ThreadPool threadPool,
CrateCircuitBreakerService breakerService,
StatsTables statsTables,
MapSideDataCollectOperation collectOperationHandler) {
this.transports = transports;
this.threadPool = threadPool;
this.circuitBreaker = breakerService.getBreaker(CrateCircuitBreakerService.QUERY_BREAKER);
this.clusterService = clusterService;
this.statsTables = statsTables;
this.collectOperationHandler = collectOperationHandler;
this.transportService = transportService;
transportService.registerHandler(ACTION_NAME, new JobInitHandler());
transportService.registerHandler(ACTION_NAME, new NodeActionRequestHandler<JobRequest, JobResponse>(this) {
@Override
public JobRequest newInstance() {
return new JobRequest();
}
});
this.executionNodeVisitor = new ExecutionNodesExecutingVisitor(
jobContextService, pageDownstreamFactory, resultProviderFactory, streamerVisitor);

}

public void execute(String node, final JobRequest request, final ActionListener<JobResponse> listener) {
ClusterState clusterState = clusterService.state();
if (node.equals("_local") || node.equals(clusterState.nodes().localNodeId())) {
try {
threadPool.executor(EXECUTOR).execute(new Runnable() {
transports.executeLocalOrWithTransport(this, node, request, listener,
new DefaultTransportResponseHandler<JobResponse>(listener, EXECUTOR) {
@Override
public void run() {
nodeOperation(request, listener);
public JobResponse newInstance() {
return new JobResponse();
}
});
} catch (RejectedExecutionException e) {
LOGGER.error("error executing jobinit locally on node [{}]", e, node);
listener.onFailure(e);
}
} else {
transportService.sendRequest(
clusterState.nodes().get(node),
ACTION_NAME,
request,
new DefaultTransportResponseHandler<JobResponse>(listener, EXECUTOR) {
@Override
public JobResponse newInstance() {
return new JobResponse();
}
}
);
}
}

private void nodeOperation(final JobRequest request, final ActionListener<JobResponse> actionListener) {
@Override
public void nodeOperation(final JobRequest request, final ActionListener<JobResponse> actionListener) {
List<ListenableFuture<Bucket>> executionFutures = new ArrayList<>(request.executionNodes().size());
for (ExecutionNode executionNode : request.executionNodes()) {
try {
Expand Down Expand Up @@ -181,23 +162,14 @@ public void onFailure(@Nonnull Throwable t) {
});
}

private class JobInitHandler extends BaseTransportRequestHandler<JobRequest> {

@Override
public JobRequest newInstance() {
return new JobRequest();
}

@Override
public void messageReceived(JobRequest request, TransportChannel channel) throws Exception {
ActionListener<JobResponse> actionListener = ResponseForwarder.forwardTo(channel);
nodeOperation(request, actionListener);
}
@Override
public String actionName() {
return ACTION_NAME;
}

@Override
public String executor() {
return EXECUTOR;
}
@Override
public String executorName() {
return EXECUTOR;
}

private static class VisitorContext {
Expand Down

This file was deleted.

33 changes: 33 additions & 0 deletions sql/src/main/java/io/crate/executor/transport/NodeAction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;

public interface NodeAction<TRequest extends TransportRequest, TResponse extends TransportResponse> {

String actionName();
String executorName();
void nodeOperation(TRequest request, ActionListener<TResponse> listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;

public abstract class NodeActionRequestHandler<TRequest extends TransportRequest, TResponse extends TransportResponse>
implements TransportRequestHandler<TRequest> {

private final NodeAction<TRequest, TResponse> nodeAction;

public NodeActionRequestHandler(NodeAction<TRequest, TResponse> nodeAction) {
this.nodeAction = nodeAction;
}

@Override
public void messageReceived(TRequest request, TransportChannel channel) throws Exception {
ActionListener<TResponse> actionListener = ResponseForwarder.forwardTo(channel);
nodeAction.nodeOperation(request, actionListener);
}

@Override
public String executor() {
return nodeAction.executorName();
}

@Override
public boolean isForceExecution() {
return false;
}
}
Loading

0 comments on commit 420e429

Please sign in to comment.