Skip to content

Commit

Permalink
implement fetch-projection
Browse files Browse the repository at this point in the history
  • Loading branch information
seut committed Mar 12, 2015
1 parent 4e69d58 commit 862b989
Show file tree
Hide file tree
Showing 33 changed files with 1,911 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void start() {
clusterService, settings, transportActionProvider, symbolVisitor,
mergeNode, ramAccountingContext);
final AtomicInteger countdown = new AtomicInteger(upstreamResults.size());
statsTables.operationStarted(operationId, mergeNode.contextId(), mergeNode.id());
statsTables.operationStarted(operationId, mergeNode.jobId(), mergeNode.id());

Futures.addCallback(mergeOperation.result(), new FutureCallback<Bucket>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.UUID;

public class NodeCloseContextRequest extends TransportRequest {

private UUID jobId;

public NodeCloseContextRequest() {
}

public NodeCloseContextRequest(UUID jobId) {
this.jobId = jobId;
}

public void jobId(UUID jobId) {
this.jobId = jobId;
}

public UUID jobId() {
return jobId;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = new UUID(in.readLong(), in.readLong());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(jobId.getMostSignificantBits());
out.writeLong(jobId.getLeastSignificantBits());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport;

import org.elasticsearch.transport.TransportResponse;

public class NodeCloseContextResponse extends TransportResponse {

}
41 changes: 13 additions & 28 deletions sql/src/main/java/io/crate/executor/transport/NodeFetchRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@

package io.crate.executor.transport;

import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.cursors.IntCursor;
import io.crate.planner.symbol.Symbol;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class NodeFetchRequest extends TransportRequest {

private UUID jobId;
private Map<Integer, IntArrayList> jobSearchContextDocIds = new HashMap<>();
private List<Long> jobSearchContextDocIds;
private List<Symbol> toFetchSymbols;
private boolean closeContext = true;

Expand All @@ -49,16 +49,11 @@ public UUID jobId() {
return jobId;
}

public void addDocId(int jobSearchContextId, int docId) {
IntArrayList docIds = jobSearchContextDocIds.get(jobSearchContextId);
if (docIds == null) {
docIds = new IntArrayList();
jobSearchContextDocIds.put(jobSearchContextId, docIds);
}
docIds.add(docId);
public void jobSearchContextDocIds(List<Long> jobSearchContextDocIds) {
this.jobSearchContextDocIds = jobSearchContextDocIds;
}

public Map<Integer, IntArrayList> jobSearchContextDocIds() {
public List<Long> jobSearchContextDocIds() {
return jobSearchContextDocIds;
}

Expand All @@ -82,16 +77,10 @@ public boolean closeContext() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = new UUID(in.readLong(), in.readLong());
int mapSize = in.readVInt();
jobSearchContextDocIds = new HashMap<>(mapSize);
for (int i = 0; i < mapSize; i++) {
Integer jobSearchContextId = in.readVInt();
int docIdsSize = in.readVInt();
IntArrayList docIds = new IntArrayList(docIdsSize);
for (int j = 0; j < docIdsSize; j++) {
docIds.add(in.readVInt());
}
jobSearchContextDocIds.put(jobSearchContextId, docIds);
int listSize = in.readVInt();
jobSearchContextDocIds = new ArrayList<>(listSize);
for (int i = 0; i < listSize; i++) {
jobSearchContextDocIds.add(in.readVLong());
}
int symbolsSize = in.readVInt();
toFetchSymbols = new ArrayList<>(symbolsSize);
Expand All @@ -107,12 +96,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(jobId.getMostSignificantBits());
out.writeLong(jobId.getLeastSignificantBits());
out.writeVInt(jobSearchContextDocIds.size());
for (Map.Entry<Integer, IntArrayList> entry : jobSearchContextDocIds.entrySet()) {
out.writeVInt(entry.getKey());
out.writeVInt(entry.getValue().size());
for (IntCursor cursor : entry.getValue()) {
out.writeVInt(cursor.value);
}
for (Long jobSearchContextDocId : jobSearchContextDocIds) {
out.writeVLong(jobSearchContextDocId);
}
out.writeVInt(toFetchSymbols.size());
for (Symbol symbol : toFetchSymbols) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class TransportActionProvider {

private final Provider<TransportCollectNodeAction> transportCollectNodeActionProvider;
private final Provider<TransportMergeNodeAction> transportMergeNodeActionProvider;
private final Provider<TransportFetchNodeAction> transportFetchNodeActionProvider;
private final Provider<TransportCloseContextNodeAction> transportCloseContextNodeActionProvider;

private final Provider<TransportCreateIndexAction> transportCreateIndexActionProvider;
private final Provider<TransportDeleteIndexAction> transportDeleteIndexActionProvider;
Expand All @@ -69,6 +71,8 @@ public class TransportActionProvider {
@Inject
public TransportActionProvider(Provider<TransportCollectNodeAction> transportCollectNodeActionProvider,
Provider<TransportMergeNodeAction> transportMergeNodeActionProvider,
Provider<TransportFetchNodeAction> transportFetchNodeActionProvider,
Provider<TransportCloseContextNodeAction> transportCloseContextNodeActionProvider,
Provider<TransportCreateIndexAction> transportCreateIndexActionProvider,
Provider<TransportDeleteIndexAction> transportDeleteIndexActionProvider,
Provider<TransportGetIndexTemplatesAction> transportGetIndexTemplatesActionProvider,
Expand Down Expand Up @@ -100,6 +104,8 @@ public TransportActionProvider(Provider<TransportCollectNodeAction> transportCol
this.transportShardUpsertActionProvider = transportShardUpsertActionProvider;
this.transportCollectNodeActionProvider = transportCollectNodeActionProvider;
this.transportMergeNodeActionProvider = transportMergeNodeActionProvider;
this.transportFetchNodeActionProvider = transportFetchNodeActionProvider;
this.transportCloseContextNodeActionProvider = transportCloseContextNodeActionProvider;
this.transportPutMappingActionProvider = transportPutMappingActionProvider;
this.transportRefreshActionProvider = transportRefreshActionProvider;
this.transportUpdateSettingsActionProvider = transportUpdateSettingsActionProvider;
Expand Down Expand Up @@ -166,6 +172,14 @@ public TransportMergeNodeAction transportMergeNodeAction() {
return transportMergeNodeActionProvider.get();
}

public TransportFetchNodeAction transportFetchNodeAction() {
return transportFetchNodeActionProvider.get();
}

public TransportCloseContextNodeAction transportCloseContextNodeAction() {
return transportCloseContextNodeActionProvider.get();
}

public TransportPutMappingAction transportPutMappingAction() {
return transportPutMappingActionProvider.get();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport;

import com.google.common.base.Preconditions;
import io.crate.exceptions.Exceptions;
import io.crate.operation.collect.CollectContextService;
import io.crate.operation.collect.StatsTables;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;

import java.util.UUID;

@Singleton
public class TransportCloseContextNodeAction {

private final String transportAction = "crate/sql/node/context/close";
private final TransportService transportService;
private final ClusterService clusterService;
private final StatsTables statsTables;
private final CollectContextService collectContextService;
private final String executorName = ThreadPool.Names.SEARCH;
private final ThreadPool threadPool;

@Inject
public TransportCloseContextNodeAction(TransportService transportService,
ThreadPool threadPool,
ClusterService clusterService,
StatsTables statsTables,
CollectContextService collectContextService) {
this.transportService = transportService;
this.clusterService = clusterService;
this.statsTables = statsTables;
this.collectContextService = collectContextService;
this.threadPool = threadPool;

transportService.registerHandler(transportAction, new TransportHandler());
}

public void execute(
String targetNode,
NodeCloseContextRequest request,
ActionListener<NodeCloseContextResponse> listener) {
new AsyncAction(targetNode, request, listener).start();
}

private void nodeOperation(final NodeCloseContextRequest request,
final ActionListener<NodeCloseContextResponse> response) {
final UUID operationId = UUID.randomUUID();
statsTables.operationStarted(operationId, request.jobId(), "closeContext");

try {
collectContextService.closeContext(request.jobId());
statsTables.operationFinished(operationId, null, 0);
response.onResponse(new NodeCloseContextResponse());
} catch (Exception e) {
statsTables.operationFinished(operationId, Exceptions.messageOf(e), 0);
response.onFailure(e);
}
}

private class AsyncAction {

private final NodeCloseContextRequest request;
private final ActionListener<NodeCloseContextResponse> listener;
private final DiscoveryNode node;
private final String nodeId;
private final ClusterState clusterState;

private AsyncAction(String nodeId, NodeCloseContextRequest request, ActionListener<NodeCloseContextResponse> listener) {
Preconditions.checkNotNull(nodeId, "nodeId is null");
clusterState = clusterService.state();
node = clusterState.nodes().get(nodeId);
Preconditions.checkNotNull(node, "DiscoveryNode for id '%s' not found in cluster state", nodeId);

this.nodeId = nodeId;
this.request = request;
this.listener = listener;
}

private void start() {
if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) {
threadPool.executor(executorName).execute(new Runnable() {
@Override
public void run() {
nodeOperation(request, listener);
}
});
} else {
transportService.sendRequest(
node,
transportAction,
request,
new DefaultTransportResponseHandler<NodeCloseContextResponse>(listener, executorName) {
@Override
public NodeCloseContextResponse newInstance() {
return new NodeCloseContextResponse();
}
}
);
}
}

}

private class TransportHandler extends BaseTransportRequestHandler<NodeCloseContextRequest> {

@Override
public NodeCloseContextRequest newInstance() {
return new NodeCloseContextRequest();
}

@Override
public void messageReceived(final NodeCloseContextRequest request, final TransportChannel channel) throws Exception {
ActionListener<NodeCloseContextResponse> actionListener = ResponseForwarder.forwardTo(channel);
nodeOperation(request, actionListener);
}

@Override
public String executor() {
return executorName;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public ImmutableList<Task> visitMergeNode(@Nullable MergeNode node, UUID jobId)
if (node == null) {
return ImmutableList.of();
}
node.contextId(jobId);
node.jobId(jobId);
if (node.executionNodes().isEmpty()) {
return singleTask(new LocalMergeTask(
jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ protected void configure() {
bind(SymbolBasedTransportShardUpsertAction.class).asEagerSingleton();
bind(TransportShardUpsertAction.class).asEagerSingleton();
bind(TransportFetchNodeAction.class).asEagerSingleton();
bind(TransportCloseContextNodeAction.class).asEagerSingleton();

bind(CrateResultSorter.class).asEagerSingleton();

Expand Down
Loading

0 comments on commit 862b989

Please sign in to comment.