Skip to content

Commit

Permalink
replace collectnode action with first attempt of JobAction
Browse files Browse the repository at this point in the history
  • Loading branch information
msbt committed Mar 31, 2015
1 parent fda023f commit 148131d
Show file tree
Hide file tree
Showing 27 changed files with 812 additions and 505 deletions.
103 changes: 103 additions & 0 deletions sql/src/main/java/io/crate/action/job/JobRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.action.job;

import io.crate.planner.node.ExecutionNode;
import io.crate.planner.node.ExecutionNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;

public class JobRequest extends TransportRequest {

public static final int NO_DIRECT_RETURN = -1;

private UUID jobId;
private List<ExecutionNode> executionNodes;
private int returnResultFromNode = NO_DIRECT_RETURN;

protected JobRequest() {
}

public JobRequest(UUID jobId, List<ExecutionNode> executionNodes) {
this.jobId = jobId;
this.executionNodes = executionNodes;
}

public JobRequest(UUID jobId, List<ExecutionNode> executionNodes, int returnResultFromNode) {
assert returnResultFromNode < executionNodes.size() : "invalid returnResultFromNode index";
this.jobId = jobId;
this.executionNodes = executionNodes;
this.returnResultFromNode = returnResultFromNode;
}

public UUID jobId() {
return jobId;
}

public Collection<ExecutionNode> executionNodes() {
return this.executionNodes;
}

/**
* @return the index of the executionNode from which the result should be
* directly returned in the {@link JobResponse}.
*/
public int returnResultFromNode() {
return returnResultFromNode;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

jobId = new UUID(in.readLong(), in.readLong());

int numExecutionNodes = in.readVInt();
executionNodes = new ArrayList<>(numExecutionNodes);
for (int i = 0; i < numExecutionNodes; i++) {
ExecutionNode node = ExecutionNodes.fromStream(in);
executionNodes.add(node);
}
returnResultFromNode = in.readInt();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

out.writeLong(jobId.getMostSignificantBits());
out.writeLong(jobId.getLeastSignificantBits());

out.writeVInt(executionNodes.size());
for (ExecutionNode executionNode : executionNodes) {
ExecutionNodes.toStream(out, executionNode);
}
out.writeInt(returnResultFromNode);
}
}
82 changes: 82 additions & 0 deletions sql/src/main/java/io/crate/action/job/JobResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.action.job;

import com.google.common.base.Optional;
import io.crate.Streamer;
import io.crate.core.collections.Bucket;
import io.crate.executor.transport.StreamBucket;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportResponse;

import javax.annotation.Nonnull;
import java.io.IOException;

public class JobResponse extends TransportResponse {

private Optional<Bucket> directResponse = Optional.absent();
private Streamer<?>[] streamers = null;

public JobResponse() {
}

public JobResponse(@Nonnull Bucket bucket, @Nonnull Streamer<?>[] streamers) {
this.directResponse = Optional.of(bucket);
if (bucket instanceof StreamBucket) {
((StreamBucket) bucket).streamers(streamers);
}
this.streamers = streamers;
}

public Optional<Bucket> directResponse() {
return directResponse;
}

public void streamers(Streamer<?>[] streamers) {
Bucket directResponse = directResponse().orNull();
if (directResponse != null && directResponse instanceof StreamBucket) {
assert streamers != null;
((StreamBucket) directResponse).streamers(streamers);
}
this.streamers = streamers;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
StreamBucket bucket = new StreamBucket(streamers);
bucket.readFrom(in);
directResponse = Optional.<Bucket>of(bucket);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(directResponse.isPresent());
if (directResponse.isPresent()) {
StreamBucket.writeBucket(out, streamers, directResponse.get());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.action.job;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import io.crate.breaker.RamAccountingContext;
import io.crate.core.collections.Bucket;
import io.crate.exceptions.Exceptions;
import io.crate.executor.transport.distributed.DistributingDownstream;
import io.crate.executor.transport.distributed.SingleBucketBuilder;
import io.crate.operation.collect.DistributingCollectOperation;
import io.crate.operation.collect.NonDistributingCollectOperation;
import io.crate.operation.collect.StatsTables;
import io.crate.planner.node.dql.CollectNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.UUID;

/**
* executes map side collect operations
*/
@Singleton
public class MapSideCollectOperationDispatcher {

private static final ESLogger LOGGER = Loggers.getLogger(MapSideCollectOperationDispatcher.class);

private final DistributingCollectOperation distributingCollectOperation;
private final NonDistributingCollectOperation nonDistributingCollectOperation;
private final StatsTables statsTables;

@Inject
public MapSideCollectOperationDispatcher(StatsTables statsTables,
DistributingCollectOperation distributingCollectOperation,
NonDistributingCollectOperation nonDistributingCollectOperation) {
this.distributingCollectOperation = distributingCollectOperation;
this.nonDistributingCollectOperation = nonDistributingCollectOperation;
this.statsTables = statsTables;
}

public void executeCollect(UUID jobId,
CollectNode collectNode,
final RamAccountingContext ramAccountingContext,
final SettableFuture<Bucket> directResultFuture) {
final UUID operationId = UUID.randomUUID();
statsTables.operationStarted(operationId, jobId, collectNode.name());
try {
if (collectNode.hasDownstreams()) {
DistributingDownstream downstream = distributingCollectOperation.createDownstream(collectNode);
Futures.addCallback(downstream, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
statsTables.operationFinished(operationId, null, ramAccountingContext.totalBytes());
ramAccountingContext.close();
directResultFuture.set(Bucket.EMPTY);
}

@Override
public void onFailure(Throwable t) {
statsTables.operationFinished(operationId, Exceptions.messageOf(t),
ramAccountingContext.totalBytes());
ramAccountingContext.close();
directResultFuture.setException(t);
}
});
distributingCollectOperation.collect(collectNode, downstream, ramAccountingContext);
} else {
SingleBucketBuilder downstream = nonDistributingCollectOperation.createDownstream(collectNode);
Futures.addCallback(downstream.result(), new FutureCallback<Bucket>() {
@Override
public void onSuccess(@Nullable Bucket result) {
statsTables.operationFinished(
operationId,
null,
ramAccountingContext.totalBytes());
ramAccountingContext.close();
directResultFuture.set(result);
}

@Override
public void onFailure(@Nonnull Throwable t) {
statsTables.operationFinished(
operationId,
Exceptions.messageOf(t),
ramAccountingContext.totalBytes());
ramAccountingContext.close();
directResultFuture.setException(t);
}
});
nonDistributingCollectOperation.collect(collectNode, downstream, ramAccountingContext);
}
} catch (Throwable e) {
LOGGER.error("Error executing collect operation [{}]", e, collectNode);
if (directResultFuture != null) {
directResultFuture.setException(e);
}
statsTables.operationFinished(operationId, Exceptions.messageOf(e), ramAccountingContext.totalBytes());
ramAccountingContext.close();
}
}
}
Loading

0 comments on commit 148131d

Please sign in to comment.