Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more Projector / RowPipe transitions #2536

Merged
merged 4 commits into from
Sep 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import io.crate.core.collections.Bucket;
import io.crate.core.collections.Row;
import io.crate.operation.RowDownstream;
import io.crate.operation.RowDownstreamHandle;
import io.crate.operation.RowUpstream;

Expand All @@ -38,7 +37,7 @@
*/
public class PositionalBucketMerger implements RowUpstream {

private RowDownstreamHandle downstream;
private final RowDownstreamHandle downstream;
private final AtomicInteger upstreamsRemaining = new AtomicInteger(0);
private final int orderingColumnIndex;
private final UpstreamBucket[] remainingBuckets;
Expand All @@ -47,10 +46,10 @@ public class PositionalBucketMerger implements RowUpstream {
private volatile int leastBucketId = -1;
private final AtomicBoolean consumeBuckets = new AtomicBoolean(true);

public PositionalBucketMerger(RowDownstream downstream,
public PositionalBucketMerger(RowDownstreamHandle rowDownstreamHandle,
int numUpstreams,
int orderingColumnIndex) {
downstream(downstream);
this.downstream = rowDownstreamHandle;
this.orderingColumnIndex = orderingColumnIndex;
remainingBuckets = new UpstreamBucket[numUpstreams];
}
Expand Down Expand Up @@ -147,10 +146,6 @@ private boolean emitRow(Row row) {
}


public void downstream(RowDownstream downstream) {
this.downstream = downstream.registerUpstream(this);
}

public PositionalBucketMerger registerUpstream(RowUpstream upstream) {
upstreamsRemaining.incrementAndGet();
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@
import io.crate.metadata.TableIdent;
import io.crate.metadata.settings.CrateSettings;
import io.crate.operation.Input;
import io.crate.operation.RowDownstream;
import io.crate.operation.RowDownstreamHandle;
import io.crate.operation.RowUpstream;
import io.crate.operation.collect.CollectExpression;
import io.crate.operation.collect.RowShardResolver;
import io.crate.planner.symbol.Reference;
Expand All @@ -65,16 +63,14 @@
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

public abstract class AbstractIndexWriterProjector extends RowDownstreamAndHandle implements Projector {
public abstract class AbstractIndexWriterProjector extends AbstractRowPipe {

private final AtomicInteger remainingUpstreams = new AtomicInteger(0);
private final CollectExpression<Row, ?>[] collectExpressions;
private final TableIdent tableIdent;

@Nullable
private final String partitionIdent;
private final Object lock = new Object();
private final List<Input<?>> partitionedByInputs;
private final Function<Input<?>, BytesRef> inputToBytesRef = new Function<Input<?>, BytesRef>() {
@Nullable
Expand All @@ -87,7 +83,6 @@ public BytesRef apply(Input<?> input) {
private final BulkRetryCoordinatorPool bulkRetryCoordinatorPool;
private final TransportActionProvider transportActionProvider;
private BulkShardProcessor<ShardUpsertRequest, ShardUpsertResponse> bulkShardProcessor;
private RowDownstreamHandle downstream;

private final LoadingCache<List<BytesRef>, String> partitionIdentCache;

Expand Down Expand Up @@ -172,48 +167,29 @@ protected void createBulkShardProcessor(ClusterService clusterService,
}

@Override
public void startProjection(ExecutionState executionState) {
public void prepare(ExecutionState executionState) {
super.prepare(executionState);
assert bulkShardProcessor != null : "must create a BulkShardProcessor first";
}

protected abstract Row updateRow(Row row);

@Override
public boolean setNextRow(Row row) {
String indexName;
boolean result;

synchronized (lock) {
for (CollectExpression<Row, ?> collectExpression : collectExpressions) {
collectExpression.setNextRow(row);
}

indexName = getIndexName();
row = updateRow(row);
result = bulkShardProcessor.add(indexName, row, null);
for (CollectExpression<Row, ?> collectExpression : collectExpressions) {
collectExpression.setNextRow(row);
}

return result;
}

@Override
public RowDownstreamHandle registerUpstream(RowUpstream upstream) {
remainingUpstreams.incrementAndGet();
return this;
return bulkShardProcessor.add(getIndexName(), updateRow(row), null);
}

@Override
public void finish() {
if (remainingUpstreams.decrementAndGet() <= 0) {
bulkShardProcessor.close();
}
bulkShardProcessor.close();
}

@Override
public void fail(Throwable throwable) {
if (downstream != null) {
downstream.fail(throwable);
}
downstream.fail(throwable);
if (throwable instanceof CancellationException) {
bulkShardProcessor.kill();
} else {
Expand All @@ -222,7 +198,6 @@ public void fail(Throwable throwable) {
}

private void setResultCallback() {
assert downstream != null;
Futures.addCallback(bulkShardProcessor.result(), new FutureCallback<BitSet>() {
@Override
public void onSuccess(@Nullable BitSet result) {
Expand Down Expand Up @@ -254,8 +229,8 @@ private String getIndexName() {
}

@Override
public void downstream(RowDownstream downstream) {
this.downstream = downstream.registerUpstream(this);
public void downstream(RowDownstreamHandle rowDownstreamHandle) {
super.downstream(rowDownstreamHandle);
setResultCallback();
}
}
64 changes: 23 additions & 41 deletions sql/src/main/java/io/crate/operation/projectors/FetchProjector.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class FetchProjector extends RowDownstreamAndHandle implements Projector {

public static final int NO_BULK_REQUESTS = -1;

public class FetchProjector extends AbstractRowPipe {

private PositionalBucketMerger downstream;
private final TransportFetchNodeAction transportFetchNodeAction;
Expand All @@ -75,7 +72,6 @@ public class FetchProjector extends RowDownstreamAndHandle implements Projector
private final Object rowDelegateLock = new Object();
private final Row outputRow;
private final Map<String, NodeBucket> nodeBuckets = new HashMap<>();
private final AtomicInteger remainingUpstreams = new AtomicInteger(0);
private final AtomicBoolean consumingRows = new AtomicBoolean(true);
private final List<String> executionNodes;
private final int numNodes;
Expand Down Expand Up @@ -150,18 +146,14 @@ public FetchProjector(TransportFetchNodeAction transportFetchNodeAction,
}

@Override
public void startProjection(ExecutionState executionState) {
public void prepare(ExecutionState executionState) {
this.executionState = executionState;
if (remainingUpstreams.get() <= 0) {
finish();
} else {
// register once to increment downstream upstreams counter
downstream.registerUpstream(this);
}
// register once to increment downstream upstreams counter
downstream.registerUpstream(this);
}

@Override
public synchronized boolean setNextRow(Row row) {
public boolean setNextRow(Row row) {
if (!consumingRows.get()) {
return false;
}
Expand All @@ -186,36 +178,28 @@ public synchronized boolean setNextRow(Row row) {
}

@Override
public void downstream(RowDownstream downstream) {
public void downstream(RowDownstreamHandle downstream) {
this.downstream = new PositionalBucketMerger(downstream, numNodes, outputRow.size());
}

@Override
public RowDownstreamHandle registerUpstream(RowUpstream upstream) {
remainingUpstreams.incrementAndGet();
return super.registerUpstream(upstream);
}

@Override
public void finish() {
if (remainingUpstreams.decrementAndGet() == 0) {
// flush all remaining buckets
Iterator<NodeBucket> it = nodeBuckets.values().iterator();
remainingRequests.set(nodeBuckets.size());
while (it.hasNext()) {
flushNodeBucket(it.next());
it.remove();
}
// flush all remaining buckets
Iterator<NodeBucket> it = nodeBuckets.values().iterator();
remainingRequests.set(nodeBuckets.size());
while (it.hasNext()) {
flushNodeBucket(it.next());
it.remove();
}

// no rows consumed (so no fetch requests made), but collect contexts are open, close them.
if (!consumedRows) {
closeContextsAndFinish();
} else {
finishDownstream();
// projector registered itself as an upstream to prevent downstream of
// flushing rows before all requests finished.
// release it now as no new rows are consumed anymore (downstream will flush all remaining rows)
}
// no rows consumed (so no fetch requests made), but collect contexts are open, close them.
if (!consumedRows) {
closeContextsAndFinish();
} else {
finishDownstream();
// projector registered itself as an upstream to prevent downstream of
// flushing rows before all requests finished.
// release it now as no new rows are consumed anymore (downstream will flush all remaining rows)
}
}

Expand All @@ -236,9 +220,7 @@ private void finishDownstream() {
@Override
public void fail(Throwable throwable) {
failures.add(throwable);
if (remainingUpstreams.decrementAndGet() == 0) {
closeContextsAndFinish();
}
closeContextsAndFinish();
}

@Nullable
Expand Down Expand Up @@ -301,7 +283,7 @@ public void onResponse(NodeFetchResponse response) {
if (!downstream.setNextBucket(rows, nodeBucket.nodeIdx)) {
consumingRows.set(false);
}
if (remainingRequests.decrementAndGet() <= 0 && remainingUpstreams.get() <= 0) {
if (remainingRequests.decrementAndGet() <= 0) {
closeContextsAndFinish();
} else {
downstream.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,14 @@ public Projector visitWriterProjection(WriterProjection projection, Context cont
}
uri = sb.toString();
}
return new WriterProjector(
return new ForwardingProjector(new WriterProjector(
((ThreadPoolExecutor) threadPool.generic()),
uri,
projection.settings(),
inputs,
symbolContext.collectExpressions(),
overwrites
);
));
}

protected Map<ColumnIdent, Object> symbolMapToObject(Map<ColumnIdent, Symbol> symbolMap,
Expand All @@ -244,7 +244,7 @@ public Projector visitSourceIndexWriterProjection(SourceIndexWriterProjection pr
partitionedByInputs.add(symbolVisitor.process(partitionedBySymbol, symbolContext));
}
Input<?> sourceInput = symbolVisitor.process(projection.rawSource(), symbolContext);
return new IndexWriterProjector(
return new ForwardingProjector(new IndexWriterProjector(
clusterService,
clusterService.state().metaData().settings(),
transportActionProvider,
Expand All @@ -266,7 +266,7 @@ public Projector visitSourceIndexWriterProjection(SourceIndexWriterProjection pr
projection.autoCreateIndices(),
projection.overwriteDuplicates(),
context.jobId
);
));
}

@Override
Expand All @@ -276,7 +276,7 @@ public Projector visitColumnIndexWriterProjection(ColumnIndexWriterProjection pr
for (Symbol partitionedBySymbol : projection.partitionedBySymbols()) {
partitionedByInputs.add(symbolVisitor.process(partitionedBySymbol, symbolContext));
}
return new ColumnIndexWriterProjector(
return new ForwardingProjector(new ColumnIndexWriterProjector(
clusterService,
clusterService.state().metaData().settings(),
transportActionProvider,
Expand All @@ -295,7 +295,7 @@ public Projector visitColumnIndexWriterProjection(ColumnIndexWriterProjection pr
projection.bulkActions(),
projection.autoCreateIndices(),
context.jobId
);
));
}

@Override
Expand Down Expand Up @@ -323,7 +323,7 @@ public Projector visitUpdateProjection(UpdateProjection projection, Context cont
symbolVisitor.process(projection.uidSymbol(), ctx);
assert ctx.collectExpressions().size() == 1;

return new UpdateProjector(
return new ForwardingProjector(new UpdateProjector(
clusterService,
settings,
shardId,
Expand All @@ -333,7 +333,7 @@ public Projector visitUpdateProjection(UpdateProjection projection, Context cont
projection.assignmentsColumns(),
projection.assignments(),
projection.requiredVersion(),
context.jobId);
context.jobId));
}

@Override
Expand All @@ -343,7 +343,7 @@ public Projector visitFetchProjection(FetchProjection projection, Context contex
symbolVisitor.process(projection.docIdSymbol(), ctxDocId);
assert ctxDocId.collectExpressions().size() == 1;

return new FetchProjector(
return new ForwardingProjector(new FetchProjector(
transportActionProvider.transportFetchNodeAction(),
transportActionProvider.transportCloseContextNodeAction(),
symbolVisitor.functions(),
Expand All @@ -355,7 +355,7 @@ public Projector visitFetchProjection(FetchProjection projection, Context contex
projection.partitionedBy(),
projection.jobSearchContextIdToNode(),
projection.jobSearchContextIdToShard(),
projection.executionNodes());
projection.executionNodes()));
}

@Override
Expand Down
Loading