Skip to content

Commit

Permalink
implement broadcast distributing downstream
Browse files Browse the repository at this point in the history
rename existing distributing downstream to ModuloDistributingDownstream
  • Loading branch information
seut committed Jul 7, 2015
1 parent 156790b commit 5a2d009
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to CRATE.IO GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport.distributed;

import io.crate.Constants;
import io.crate.Streamer;
import io.crate.core.collections.Bucket;
import io.crate.core.collections.Row;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;

import java.util.Collection;
import java.util.UUID;

public class BroadcastDistributingDownstream extends DistributingDownstream {

private static final ESLogger LOGGER = Loggers.getLogger(BroadcastDistributingDownstream.class);

private final SingleBucketBuilder bucketBuilder;

public BroadcastDistributingDownstream(UUID jobId,
int targetExecutionNodeId,
int bucketIdx,
Collection<String> downstreamNodeIds,
TransportDistributedResultAction transportDistributedResultAction,
Streamer<?>[] streamers) {
super(jobId, targetExecutionNodeId, bucketIdx, downstreamNodeIds, transportDistributedResultAction, streamers);

bucketBuilder = new SingleBucketBuilder(streamers);
}

@Override
public boolean setNextRow(Row row) {
if (allDownstreamsFinished()) {
return false;
}
try {
bucketBuilder.setNextRow(row);
sendRequestsIfNeeded();
} catch (Exception e) {
fail(e);
return false;
}
return true;
}

private void sendRequestsIfNeeded() {
int size = bucketBuilder.size();
if (size >= Constants.PAGE_SIZE || remainingUpstreams.get() <= 0) {
Bucket bucket = bucketBuilder.build();
for (Downstream downstream : downstreams) {
if (downstream.wantMore.get()) {
downstream.bucketQueue.add(bucket);
downstream.sendRequest(remainingUpstreams.get() <= 0);
}
}
}
}

@Override
protected void onAllUpstreamsFinished() {
sendRequestsIfNeeded();
}

@Override
protected ESLogger logger() {
return LOGGER;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* Licensed to CRATE.IO GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
Expand All @@ -21,15 +21,11 @@

package io.crate.executor.transport.distributed;

import io.crate.Constants;
import io.crate.Streamer;
import io.crate.core.collections.Bucket;
import io.crate.core.collections.Row;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;

import java.io.IOException;
import java.util.Collection;
import java.util.Deque;
import java.util.UUID;
Expand All @@ -38,76 +34,40 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class DistributingDownstream extends ResultProviderBase {
public abstract class DistributingDownstream extends ResultProviderBase {

private static final ESLogger LOGGER = Loggers.getLogger(DistributingDownstream.class);

private final UUID jobId;
private final TransportDistributedResultAction transportDistributedResultAction;
private final MultiBucketBuilder bucketBuilder;
private Downstream[] downstreams;
private final AtomicInteger finishedDownstreams = new AtomicInteger(0);

protected Downstream[] downstreams;

public DistributingDownstream(UUID jobId,
int targetExecutionNodeId,
int bucketIdx,
Collection<String> downstreamNodeIds,
TransportDistributedResultAction transportDistributedResultAction,
Streamer<?>[] streamers) {
this.jobId = jobId;
this.transportDistributedResultAction = transportDistributedResultAction;

downstreams = new Downstream[downstreamNodeIds.size()];
bucketBuilder = new MultiBucketBuilder(streamers, downstreams.length);

int idx = 0;
for (String downstreamNodeId : downstreamNodeIds) {
downstreams[idx] = new Downstream(downstreamNodeId, jobId, targetExecutionNodeId, bucketIdx, streamers);
downstreams[idx] = new Downstream(downstreamNodeId, jobId, targetExecutionNodeId,
bucketIdx, streamers);
idx++;
}
}

@Override
public boolean setNextRow(Row row) {
if (allDownstreamsFinished()) {
return false;
}
try {
int downstreamIdx = bucketBuilder.getBucket(row);
// only collect if downstream want more rows, otherwise just ignore the row
if (downstreams[downstreamIdx].wantMore.get()) {
bucketBuilder.setNextRow(downstreamIdx, row);
sendRequestIfNeeded(downstreamIdx);
}
} catch (IOException e) {
fail(e);
return false;
}
return true;
}

private void sendRequestIfNeeded(int downstreamIdx) {
int size = bucketBuilder.size(downstreamIdx);
if (size >= Constants.PAGE_SIZE || remainingUpstreams.get() <= 0) {
Downstream downstream = downstreams[downstreamIdx];
downstream.bucketQueue.add(bucketBuilder.build(downstreamIdx));
downstream.sendRequest(remainingUpstreams.get() <= 0);
}
}

private void onAllUpstreamsFinished() {
for (int i = 0; i < downstreams.length; i++) {
sendRequestIfNeeded(i);
}
}
protected abstract void onAllUpstreamsFinished();

private void forwardFailures(Throwable throwable) {
for (Downstream downstream : downstreams) {
downstream.sendRequest(throwable);
}
}

private boolean allDownstreamsFinished() {
protected boolean allDownstreamsFinished() {
return finishedDownstreams.get() == downstreams.length;
}

Expand All @@ -121,14 +81,16 @@ public Bucket doFinish() {
public Throwable doFail(Throwable t) {
if (t instanceof CancellationException) {
// fail without sending anything
LOGGER.debug("{} killed", getClass().getSimpleName());
logger().debug("{} killed", getClass().getSimpleName());
} else {
forwardFailures(t);
}
return t;
}

private class Downstream implements ActionListener<DistributedResultResponse> {
protected abstract ESLogger logger();

protected class Downstream implements ActionListener<DistributedResultResponse> {

final AtomicBoolean wantMore = new AtomicBoolean(true);
final AtomicBoolean requestPending = new AtomicBoolean(false);
Expand Down Expand Up @@ -172,8 +134,8 @@ public void sendRequest(boolean isLast) {
}

private void sendRequest(final DistributedResultRequest request) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[{}] sending distributing collect request to {}, isLast? {} ...",
if (logger().isTraceEnabled()) {
logger().trace("[{}] sending distributing collect request to {}, isLast? {} ...",
jobId.toString(),
node, request.isLast());
}
Expand All @@ -184,15 +146,15 @@ private void sendRequest(final DistributedResultRequest request) {
this
);
} catch (IllegalArgumentException e) {
LOGGER.error(e.getMessage(), e);
logger().error(e.getMessage(), e);
wantMore.set(false);
}
}

@Override
public void onResponse(DistributedResultResponse response) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[{}] successfully sent distributing collect request to {}, needMore? {}",
if (logger().isTraceEnabled()) {
logger().trace("[{}] successfully sent distributing collect request to {}, needMore? {}",
jobId,
node,
response.needMore());
Expand Down Expand Up @@ -220,7 +182,7 @@ public void onResponse(DistributedResultResponse response) {

@Override
public void onFailure(Throwable exp) {
LOGGER.error("[{}] Exception sending distributing collect request to {}", exp, jobId, node);
logger().error("[{}] Exception sending distributing collect results to {}", exp, jobId, node);
wantMore.set(false);
bucketQueue.clear();
finishedDownstreams.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to CRATE.IO GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport.distributed;

import io.crate.Constants;
import io.crate.Streamer;
import io.crate.core.collections.Row;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;

import java.io.IOException;
import java.util.Collection;
import java.util.UUID;

public class ModuloDistributingDownstream extends DistributingDownstream {

private static final ESLogger LOGGER = Loggers.getLogger(ModuloDistributingDownstream.class);

private final MultiBucketBuilder bucketBuilder;

public ModuloDistributingDownstream(UUID jobId,
int targetExecutionNodeId,
int bucketIdx,
Collection<String> downstreamNodeIds,
TransportDistributedResultAction transportDistributedResultAction,
Streamer<?>[] streamers) {
super(jobId, targetExecutionNodeId, bucketIdx, downstreamNodeIds, transportDistributedResultAction, streamers);

bucketBuilder = new MultiBucketBuilder(streamers, downstreams.length);
}

@Override
public boolean setNextRow(Row row) {
if (allDownstreamsFinished()) {
return false;
}
try {
int downstreamIdx = bucketBuilder.getBucket(row);
// only collect if downstream want more rows, otherwise just ignore the row
if (downstreams[downstreamIdx].wantMore.get()) {
bucketBuilder.setNextRow(downstreamIdx, row);
sendRequestIfNeeded(downstreamIdx);
}
} catch (IOException e) {
fail(e);
return false;
}
return true;
}

private void sendRequestIfNeeded(int downstreamIdx) {
int size = bucketBuilder.size(downstreamIdx);
if (size >= Constants.PAGE_SIZE || remainingUpstreams.get() <= 0) {
Downstream downstream = downstreams[downstreamIdx];
downstream.bucketQueue.add(bucketBuilder.build(downstreamIdx));
downstream.sendRequest(remainingUpstreams.get() <= 0);
}
}

protected void onAllUpstreamsFinished() {
for (int i = 0; i < downstreams.length; i++) {
sendRequestIfNeeded(i);
}
}
protected ESLogger logger() {
return LOGGER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ public SingleBucketBuilder(Streamer<?>[] streamers) {

public Bucket build() {
try {
return bucketBuilder.build();
synchronized (bucketBuilder) {
Bucket bucket = bucketBuilder.build();
bucketBuilder.reset();
return bucket;
}
} catch (IOException e) {
Throwables.propagate(e);
}
Expand All @@ -58,12 +62,20 @@ public Throwable doFail(Throwable t) {
}

@Override
public synchronized boolean setNextRow(Row row) {
public boolean setNextRow(Row row) {
try {
bucketBuilder.add(row);
synchronized (bucketBuilder) {
bucketBuilder.add(row);
}
} catch (Throwable e) {
Throwables.propagate(e);
}
return true;
}

public int size() {
synchronized (bucketBuilder) {
return bucketBuilder.size();
}
}
}
Loading

0 comments on commit 5a2d009

Please sign in to comment.