Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public Inbox(
* @param comp Optional comparator for merge exchange.
*/
public void init(ExecutionContext<Row> ctx, RelDataType rowType, Collection<UUID> srcNodeIds, @Nullable Comparator<Row> comp) {
assert context().fragmentId() == ctx.fragmentId() : "different fragments unsupported: previous=" + context().fragmentId() +
" current=" + ctx.fragmentId();

// It's important to set proper context here because
// the one, that is created on a first message
// received doesn't have all context variables in place.
Expand Down Expand Up @@ -187,6 +190,8 @@ public void onBatchReceived(UUID src, int batchId, boolean last, List<Row> rows)
/** */
private void doPush() {
try {
checkState();

push();
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,6 @@ public Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes) {
this(id, root, remotes, null, null);
}

/**
* @param id Fragment id.
* @param root Root node of the fragment.
* @param remotes Remote sources of the fragment.
* @param rootSer Root serialized representation.
*/
public Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes, @Nullable String rootSer) {
this(id, root, remotes, rootSer, null);
}

/** */
Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes, @Nullable String rootSer, @Nullable FragmentMapping mapping) {
this.id = id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ else if (!fragment0.rootFragment()) {
sender = new IgniteSender(sender.getCluster(), sender.getTraitSet(),
sender.getInput(), sender.exchangeId(), newTargetId, sender.distribution());

fragment0 = new Fragment(fragment0.fragmentId(), sender, fragment0.remotes(), fragment0.serialized());
fragment0 = new Fragment(fragment0.fragmentId(), sender, fragment0.remotes());
}
}

Expand Down
Loading