Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
seut committed Apr 21, 2015
1 parent ea1e2f1 commit 8701ae4
Show file tree
Hide file tree
Showing 19 changed files with 105 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public Void visitMergeNode(MergeNode node, final VisitorContext context) {
StreamerVisitor.Context streamerContext = streamerVisitor.processPlanNode(node, context.ramAccountingContext);
PageDownstreamContext pageDownstreamContext = new PageDownstreamContext(
pageDownstream, streamerContext.inputStreamers(), node.numUpstreams());
jobExecutionContext.setPageDownstreamContext(node.executionNodeId(), pageDownstreamContext);
jobExecutionContext.pageDownstreamContext(node.executionNodeId(), pageDownstreamContext);

Futures.addCallback(downstream.result(),
new SetBucketFutureCallback(operationId, context.ramAccountingContext, context.directResultFuture));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void messageReceived(final DistributedResultRequest request, final Transp
return;
}
JobExecutionContext context = jobContextService.getOrCreateContext(request.jobId());
ListenableFuture<PageDownstreamContext> pageDownstreamContextFuture = context.getPageDownstreamContext(request.executionNodeId());
ListenableFuture<PageDownstreamContext> pageDownstreamContextFuture = context.pageDownstreamContext(request.executionNodeId());
Futures.addCallback(pageDownstreamContextFuture, new PageDownstreamContextFutureCallback(request, channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
public class NodeFetchRequest extends TransportRequest {

private UUID jobId;
private int executionNodeId;
private LongArrayList jobSearchContextDocIds;
private List<Reference> toFetchReferences;
private boolean closeContext = true;
Expand All @@ -51,6 +52,14 @@ public UUID jobId() {
return jobId;
}

public void executionNodeId(int executionNodeId) {
this.executionNodeId = executionNodeId;
}

public int executionNodeId() {
return executionNodeId;
}

public void jobSearchContextDocIds(LongArrayList jobSearchContextDocIds) {
this.jobSearchContextDocIds = jobSearchContextDocIds;
}
Expand Down Expand Up @@ -79,6 +88,7 @@ public boolean closeContext() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = new UUID(in.readLong(), in.readLong());
executionNodeId = in.readVInt();
int listSize = in.readVInt();
jobSearchContextDocIds = new LongArrayList(listSize);
for (int i = 0; i < listSize; i++) {
Expand All @@ -97,6 +107,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(jobId.getMostSignificantBits());
out.writeLong(jobId.getLeastSignificantBits());
out.writeVInt(executionNodeId);
out.writeVInt(jobSearchContextDocIds.size());
for (LongCursor jobSearchContextDocId : jobSearchContextDocIds) {
out.writeVLong(jobSearchContextDocId.value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private void nodeOperation(final NodeFetchRequest request,

NodeFetchOperation fetchOperation = new NodeFetchOperation(
request.jobId(),
request.executionNodeId(),
request.jobSearchContextDocIds(),
request.toFetchReferences(),
request.closeContext(),
Expand Down
2 changes: 1 addition & 1 deletion sql/src/main/java/io/crate/jobs/JobContextService.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected void contextProcessedSuccessfully(JobExecutionContext context) {

public void initializeFinalMerge(UUID jobId, int executionNodeId, PageDownstreamContext pageDownstreamContext) {
JobExecutionContext jobExecutionContext = getOrCreateContext(jobId);
jobExecutionContext.setPageDownstreamContext(executionNodeId, pageDownstreamContext);
jobExecutionContext.pageDownstreamContext(executionNodeId, pageDownstreamContext);
}

class Reaper implements Runnable {
Expand Down
46 changes: 30 additions & 16 deletions sql/src/main/java/io/crate/jobs/JobExecutionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

package io.crate.jobs;

import com.carrotsearch.hppc.IntObjectOpenHashMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
Expand All @@ -31,29 +32,41 @@
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

public class JobExecutionContext implements Releasable {

private final UUID jobId;
private final JobCollectContext collectContext;
private final long keepAlive;
private final AtomicBoolean closed = new AtomicBoolean(false);

private final ConcurrentMap<Integer, JobCollectContext> collectContextMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, PageDownstreamContext> pageDownstreamMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, SettableFuture<PageDownstreamContext>> pageDownstreamFuturesMap = new ConcurrentHashMap<>();
private final IntObjectOpenHashMap<SettableFuture<PageDownstreamContext>> pageDownstreamFuturesMap = new IntObjectOpenHashMap<>();

private volatile long lastAccessTime = -1;


public JobExecutionContext(UUID jobId, long keepAlive) {
this.jobId = jobId;
this.keepAlive = keepAlive;
this.collectContext = new JobCollectContext(jobId);
}

public JobCollectContext collectContext() {
return collectContext;
public JobCollectContext collectContext(int executionNodeId) {
if (closed.get()) {
throw new IllegalStateException("Context already closed");
}
for (;;) {
JobCollectContext collectContext = collectContextMap.get(executionNodeId);
if (collectContext != null) {
return collectContext;
}
collectContext = new JobCollectContext(jobId);
if (collectContextMap.putIfAbsent(executionNodeId, collectContext) == null) {
return collectContext;
}
}
}

public void accessed(long accessTime) {
Expand All @@ -71,18 +84,22 @@ public long keepAlive() {
@Override
public void close() throws ElasticsearchException {
if (closed.compareAndSet(false, true)) { // prevent double release
collectContext.close();
for (JobCollectContext collectContext : collectContextMap.values()) {
collectContext.close();
}
}
}

public UUID id() {
return jobId;
}

public void setPageDownstreamContext(int executionNodeId, PageDownstreamContext pageDownstreamContext) {
public void pageDownstreamContext(int executionNodeId,
PageDownstreamContext pageDownstreamContext) {
PageDownstreamContext previousEntry = pageDownstreamMap.put(executionNodeId, pageDownstreamContext);
if (previousEntry != null) {
throw new IllegalStateException(String.format(Locale.ENGLISH, "there is already a pageDownstream set for %d", executionNodeId));
throw new IllegalStateException(String.format(Locale.ENGLISH,
"there is already a pageDownstream set for %d", executionNodeId));
}
synchronized (pageDownstreamFuturesMap) {
SettableFuture<PageDownstreamContext> future = pageDownstreamFuturesMap.remove(executionNodeId);
Expand All @@ -92,7 +109,7 @@ public void setPageDownstreamContext(int executionNodeId, PageDownstreamContext
}
}

public ListenableFuture<PageDownstreamContext> getPageDownstreamContext(int executionNodeId) {
public ListenableFuture<PageDownstreamContext> pageDownstreamContext(int executionNodeId) {
PageDownstreamContext pageDownstreamContext = pageDownstreamMap.get(executionNodeId);
if (pageDownstreamContext == null) {
SettableFuture<PageDownstreamContext> futureContext = SettableFuture.create();
Expand All @@ -101,18 +118,15 @@ public ListenableFuture<PageDownstreamContext> getPageDownstreamContext(int exec
if (pageDownstreamContext != null) {
return Futures.immediateFuture(pageDownstreamContext);
}
SettableFuture<PageDownstreamContext> existingFuture = pageDownstreamFuturesMap.putIfAbsent(executionNodeId, futureContext);
SettableFuture<PageDownstreamContext> existingFuture =
pageDownstreamFuturesMap.getOrDefault(executionNodeId, futureContext);

if (existingFuture == null) {
return futureContext;
if (existingFuture == futureContext) {
pageDownstreamFuturesMap.put(executionNodeId, futureContext);
}
return existingFuture;
}
}
return Futures.immediateFuture(pageDownstreamContext);
}

public void closePageDownstreamContext(int executionNodeId) {
pageDownstreamMap.remove(executionNodeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public void fail(Throwable throwable) {
private void closeContext() {
JobExecutionContext context = jobContextService.getContext(jobId);
if (context != null) {
context.closePageDownstreamContext(executionNodeId);
jobContextService.closeContext(jobId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ protected void handleShardCollect(CollectNode collectNode, RowDownstream downstr
}

assert collectNode.jobId().isPresent() : "jobId must be set on CollectNode";
JobCollectContext jobCollectContext = jobContextService.getOrCreateContext(collectNode.jobId().get()).collectContext();
JobCollectContext jobCollectContext = jobContextService.getOrCreateContext(collectNode.jobId().get())
.collectContext(collectNode.executionNodeId());
ShardProjectorChain projectorChain = new ShardProjectorChain(
numShards,
collectNode.projections(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
public class NodeFetchOperation implements RowUpstream {

private final UUID jobId;
private final int executionNodeId;
private final List<Reference> toFetchReferences;
private final boolean closeContext;
private final IntObjectOpenHashMap<ShardDocIdsBucket> shardBuckets = new IntObjectOpenHashMap<>();
Expand All @@ -72,6 +73,7 @@ public class NodeFetchOperation implements RowUpstream {
private static final ESLogger LOGGER = Loggers.getLogger(NodeFetchOperation.class);

public NodeFetchOperation(UUID jobId,
int executionNodeId,
LongArrayList jobSearchContextDocIds,
List<Reference> toFetchReferences,
boolean closeContext,
Expand All @@ -80,6 +82,7 @@ public NodeFetchOperation(UUID jobId,
Functions functions,
RamAccountingContext ramAccountingContext) {
this.jobId = jobId;
this.executionNodeId = executionNodeId;
this.toFetchReferences = toFetchReferences;
this.closeContext = closeContext;
this.jobContextService = jobContextService;
Expand Down Expand Up @@ -121,7 +124,7 @@ public void fetch(RowDownstream rowDownstream) throws Exception {
LOGGER.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
JobCollectContext jobCollectContext = jobExecutionContext.collectContext();
JobCollectContext jobCollectContext = jobExecutionContext.collectContext(executionNodeId);

RowDownstream upstreamsRowMerger = new PositionalRowMerger(rowDownstream, toFetchReferences.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class FetchProjector implements Projector, RowDownstreamHandle {
private final TransportCloseContextNodeAction transportCloseContextNodeAction;

private final UUID jobId;
private final int executionNodeId;
private final CollectExpression<?> collectDocIdExpression;
private final List<ReferenceInfo> partitionedBy;
private final List<Reference> toFetchReferences;
Expand Down Expand Up @@ -89,6 +90,7 @@ public FetchProjector(TransportFetchNodeAction transportFetchNodeAction,
TransportCloseContextNodeAction transportCloseContextNodeAction,
Functions functions,
UUID jobId,
int executionNodeId,
CollectExpression<?> collectDocIdExpression,
List<Symbol> inputSymbols,
List<Symbol> outputSymbols,
Expand All @@ -101,6 +103,7 @@ public FetchProjector(TransportFetchNodeAction transportFetchNodeAction,
this.transportFetchNodeAction = transportFetchNodeAction;
this.transportCloseContextNodeAction = transportCloseContextNodeAction;
this.jobId = jobId;
this.executionNodeId = executionNodeId;
this.collectDocIdExpression = collectDocIdExpression;
this.partitionedBy = partitionedBy;
this.jobSearchContextIdToNode = jobSearchContextIdToNode;
Expand Down Expand Up @@ -250,6 +253,7 @@ private void flushNodeBucket(final NodeBucket nodeBucket) {

NodeFetchRequest request = new NodeFetchRequest();
request.jobId(jobId);
request.executionNodeId(executionNodeId);
request.toFetchReferences(toFetchReferences);
request.jobSearchContextDocIds(nodeBucket.docIds());
if (bulkSize > NO_BULK_REQUESTS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ public Projector visitFetchProjection(FetchProjection projection, Context contex
transportActionProvider.transportCloseContextNodeAction(),
symbolVisitor.functions(),
context.jobId.get(),
projection.executionNodeId(),
ctxDocId.collectExpressions().iterator().next(),
projection.inputSymbols(),
projection.outputSymbols(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public PlannedAnalyzedRelation visitQueriedTable(QueriedTable table, ConsumerCon
bulkSize = Constants.DEFAULT_SELECT_LIMIT;
}
FetchProjection fetchProjection = new FetchProjection(
collectNode.executionNodeId(),
DEFAULT_DOC_ID_INPUT_COLUMN, collectSymbols, outputSymbols,
tableInfo.partitionedByColumns(),
collectNode.executionNodes(),
Expand Down
27 changes: 20 additions & 7 deletions sql/src/main/java/io/crate/planner/projection/FetchProjection.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public FetchProjection newInstance() {
}
};

private int executionNodeId;
private Symbol docIdSymbol;
private List<Symbol> inputSymbols;
private List<Symbol> outputSymbols;
Expand All @@ -53,32 +54,36 @@ public FetchProjection newInstance() {
private FetchProjection() {
}

public FetchProjection(Symbol docIdSymbol,
public FetchProjection(int executionNodeId,
Symbol docIdSymbol,
List<Symbol> inputSymbols,
List<Symbol> outputSymbols,
List<ReferenceInfo> partitionBy,
Set<String> executionNodes) {
this(docIdSymbol, inputSymbols, outputSymbols, partitionBy, executionNodes,
FetchProjector.NO_BULK_REQUESTS, false);
this(executionNodeId, docIdSymbol, inputSymbols, outputSymbols, partitionBy,
executionNodes, FetchProjector.NO_BULK_REQUESTS, false);
}

public FetchProjection(Symbol docIdSymbol,
public FetchProjection(int executionNodeId,
Symbol docIdSymbol,
List<Symbol> inputSymbols,
List<Symbol> outputSymbols,
List<ReferenceInfo> partitionBy,
Set<String> executionNodes,
int bulkSize) {
this(docIdSymbol, inputSymbols, outputSymbols, partitionBy, executionNodes,
bulkSize, false);
this(executionNodeId, docIdSymbol, inputSymbols, outputSymbols, partitionBy,
executionNodes, bulkSize, false);
}

public FetchProjection(Symbol docIdSymbol,
public FetchProjection(int executionNodeId,
Symbol docIdSymbol,
List<Symbol> inputSymbols,
List<Symbol> outputSymbols,
List<ReferenceInfo> partitionBy,
Set<String> executionNodes,
int bulkSize,
boolean closeContexts) {
this.executionNodeId = executionNodeId;
this.docIdSymbol = docIdSymbol;
this.inputSymbols = inputSymbols;
this.outputSymbols = outputSymbols;
Expand All @@ -88,6 +93,10 @@ public FetchProjection(Symbol docIdSymbol,
this.closeContexts = closeContexts;
}

public int executionNodeId() {
return executionNodeId;
}

public Symbol docIdSymbol() {
return docIdSymbol;
}
Expand Down Expand Up @@ -138,6 +147,7 @@ public boolean equals(Object o) {

FetchProjection that = (FetchProjection) o;

if (executionNodeId != that.executionNodeId) return false;
if (closeContexts != that.closeContexts) return false;
if (bulkSize != that.bulkSize) return false;
if (executionNodes != that.executionNodes) return false;
Expand All @@ -151,6 +161,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + executionNodeId;
result = 31 * result + docIdSymbol.hashCode();
result = 31 * result + inputSymbols.hashCode();
result = 31 * result + outputSymbols.hashCode();
Expand All @@ -163,6 +174,7 @@ public int hashCode() {

@Override
public void readFrom(StreamInput in) throws IOException {
executionNodeId = in.readVInt();
docIdSymbol = Symbol.fromStream(in);
int inputSymbolsSize = in.readVInt();
inputSymbols = new ArrayList<>(inputSymbolsSize);
Expand Down Expand Up @@ -192,6 +204,7 @@ public void readFrom(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(executionNodeId);
Symbol.toStream(docIdSymbol, out);
out.writeVInt(inputSymbols.size());
for (Symbol symbol : inputSymbols) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private LuceneDocCollector createDocCollector(OrderBy orderBy, Integer limit, Pr
when(projectorChain.newShardDownstreamProjector(any(ProjectionToProjectorVisitor.class))).thenReturn(projector);

int jobSearchContextId = 0;
JobCollectContext jobCollectContext = jobContextService.getOrCreateContext(node.jobId().get()).collectContext();
JobCollectContext jobCollectContext = jobContextService.getOrCreateContext(node.jobId().get()).collectContext(node.executionNodeId());
jobCollectContext.registerJobContextId(shardId, jobSearchContextId);
LuceneDocCollector collector = (LuceneDocCollector)shardCollectService.getCollector(node, projectorChain, jobCollectContext, 0);
collector.pageSize(PAGE_SIZE);
Expand Down
Loading

0 comments on commit 8701ae4

Please sign in to comment.