Skip to content

Commit

Permalink
IGNITE-18090 Sql. Shutdown scan query executor on node stop (#1314)
Browse files Browse the repository at this point in the history
  • Loading branch information
AMashenkov committed Nov 4, 2022
1 parent 018f66e commit 92ffac8
Show file tree
Hide file tree
Showing 17 changed files with 46 additions and 39 deletions.
2 changes: 0 additions & 2 deletions modules/index/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ dependencies {
implementation project(':ignite-schema')
implementation project(':ignite-table')
implementation project(':ignite-transactions')
implementation project(':ignite-table')
implementation project(':ignite-configuration')
implementation libs.jetbrains.annotations
testImplementation(testFixtures(project(':ignite-configuration')))
testImplementation(testFixtures(project(':ignite-core')))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public Node<RowT> visit(IgniteIndexScan rel) {
ranges,
filters,
prj,
requiredColumns
requiredColumns == null ? null : requiredColumns.toBitSet()
);
}

Expand Down Expand Up @@ -370,7 +370,7 @@ public Node<RowT> visit(IgniteTableScan rel) {
group.partitions(ctx.localNode().id()),
filters,
prj,
requiredColumns
requiredColumns == null ? null : requiredColumns.toBitSet()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
Expand All @@ -32,7 +33,6 @@
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.index.SortedIndex;
import org.apache.ignite.internal.schema.BinaryRow;
Expand All @@ -42,7 +42,6 @@
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowConverter;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
Expand All @@ -51,7 +50,6 @@
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.CompositePublisher;
import org.apache.ignite.internal.sql.engine.util.SortingCompositePublisher;
import org.apache.ignite.lang.IgniteTetraFunction;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -79,12 +77,12 @@ public class IndexScanNode<RowT> extends AbstractNode<RowT> {

private final @Nullable Function<RowT, RowT> rowTransformer;

private final IgniteTetraFunction<ExecutionContext<RowT>, BinaryRow, RowFactory<RowT>, ImmutableBitSet, RowT> tableRowConverter;
private final Function<BinaryRow, RowT> tableRowConverter;

private final ImmutableIntList idxColumnMapping;

/** Participating columns. */
private final @Nullable ImmutableBitSet requiredColumns;
private final @Nullable BitSet requiredColumns;

private final @Nullable RangeIterable<RowT> rangeConditions;

Expand Down Expand Up @@ -126,7 +124,7 @@ public IndexScanNode(
@Nullable RangeIterable<RowT> rangeConditions,
@Nullable Predicate<RowT> filters,
@Nullable Function<RowT, RowT> rowTransformer,
@Nullable ImmutableBitSet requiredColumns
@Nullable BitSet requiredColumns
) {
super(ctx, rowType);

Expand All @@ -147,7 +145,7 @@ public IndexScanNode(

factory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);

tableRowConverter = schemaTable::toRow;
tableRowConverter = row -> schemaTable.toRow(context(), row, factory, requiredColumns);

indexRowSchema = RowConverter.createIndexRowSchema(schemaTable.descriptor(), idxColumnMapping);
}
Expand Down Expand Up @@ -321,7 +319,7 @@ private Flow.Publisher<RowT> partitionPublisher(int part, @Nullable RangeConditi
lower,
upper,
flags,
requiredColumns == null ? null : requiredColumns.toBitSet()
requiredColumns
);
} else {
assert schemaIndex.type() == Type.HASH;
Expand All @@ -335,7 +333,7 @@ private Flow.Publisher<RowT> partitionPublisher(int part, @Nullable RangeConditi
part,
context().transaction(),
key,
requiredColumns == null ? null : requiredColumns.toBitSet()
requiredColumns
);
}

Expand Down Expand Up @@ -429,6 +427,6 @@ public void onComplete() {
}

private RowT convert(BinaryRow binaryRow) {
return tableRowConverter.apply(context(), binaryRow, factory, requiredColumns);
return tableRowConverter.apply(binaryRow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;

import java.util.BitSet;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Flow;
Expand All @@ -27,7 +28,6 @@
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
Expand Down Expand Up @@ -59,7 +59,7 @@ public class TableScanNode<RowT> extends AbstractNode<RowT> {
private final @Nullable Function<RowT, RowT> rowTransformer;

/** Participating columns. */
private final @Nullable ImmutableBitSet requiredColumns;
private final @Nullable BitSet requiredColumns;

private int requested;

Expand Down Expand Up @@ -89,7 +89,7 @@ public TableScanNode(
int[] parts,
@Nullable Predicate<RowT> filters,
@Nullable Function<RowT, RowT> rowTransformer,
@Nullable ImmutableBitSet requiredColumns
@Nullable BitSet requiredColumns
) {
super(ctx, rowType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgniteRel {
private final long sourceId;

/** Index collation. Required only for rewriting index scan to table scan + sort in case of index rebuild. */
/** Index collation. Required for proper expanding search bounds and creating index row converter. */
private final RelCollation collation;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -261,7 +262,7 @@ public <RowT> RowT toRow(
ExecutionContext<RowT> ectx,
BinaryRow binaryRow,
RowHandler.RowFactory<RowT> factory,
@Nullable ImmutableBitSet requiredColumns
@Nullable BitSet requiredColumns
) {
RowHandler<RowT> handler = factory.handler();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.sql.engine.schema;

import java.util.BitSet;
import java.util.List;
import java.util.Map;
import org.apache.calcite.plan.RelOptCluster;
Expand Down Expand Up @@ -96,7 +97,7 @@ <RowT> RowT toRow(
ExecutionContext<RowT> ectx,
BinaryRow row,
RowHandler.RowFactory<RowT> factory,
@Nullable ImmutableBitSet requiredColumns
@Nullable BitSet requiredColumns
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
Expand All @@ -33,7 +34,6 @@
import java.util.stream.IntStream;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory.Builder;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.index.ColumnCollation;
import org.apache.ignite.internal.index.Index;
Expand Down Expand Up @@ -542,7 +542,7 @@ public IgniteDistribution distribution() {

@Override
public <RowT> RowT toRow(ExecutionContext<RowT> ectx, BinaryRow binaryRow, RowFactory<RowT> factory,
@Nullable ImmutableBitSet requiredColumns) {
@Nullable BitSet requiredColumns) {
TableDescriptor desc = descriptor();
Row tableRow = new Row(schemaDesc, binaryRow);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.replicator.ReplicaService;
Expand Down Expand Up @@ -130,7 +129,7 @@ public InternalTable table() {

@Override
public <RowT> RowT toRow(ExecutionContext<RowT> ectx, BinaryRow row, RowFactory<RowT> factory,
@Nullable ImmutableBitSet requiredColumns) {
@Nullable BitSet requiredColumns) {
return (RowT) res;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ public InternalTable table() {
/** {@inheritDoc} */
@Override
public <RowT> RowT toRow(ExecutionContext<RowT> ectx, BinaryRow row, RowFactory<RowT> factory,
@Nullable ImmutableBitSet requiredColumns) {
@Nullable BitSet requiredColumns) {
throw new AssertionError();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public HashIndexColumnDescriptor(String name, NativeType type, boolean nullable)
* Creates a Column Descriptor.
*
* @param tableColumnView Table column configuration.
* @param indexColumnView Index column configuration.
*/
HashIndexColumnDescriptor(ColumnView tableColumnView) {
this.name = tableColumnView.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ protected Int2ObjectOpenHashMap<RaftGroupService> startTable(String name, UUID t
raftSvc,
txManagers.get(node),
txManagers.get(node).lockManager(),
Runnable::run,
partId,
tblId,
() -> Map.of(pkLocker.id(), pkLocker),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
private final ScheduledExecutorService rebalanceScheduler;

/** Transaction state storage scheduled pool. */
private final ScheduledExecutorService txStateStorageScheduledPool = Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory("tx-state-storage-scheduled-pool", LOG)
);
private final ScheduledExecutorService txStateStorageScheduledPool;

/** Transaction state storage pool. */
private final ExecutorService txStateStoragePool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
new NamedThreadFactory("tx-state-storage-pool", LOG)
);
private final ExecutorService txStateStoragePool;

/** Scan request executor. */
private final ExecutorService scanRequestExecutor;

/** Separate executor for IO operations like partition storage initialization
* or partition raft group meta data persisting.
Expand Down Expand Up @@ -417,6 +415,15 @@ public TableManager(
});
});

txStateStorageScheduledPool = Executors.newSingleThreadScheduledExecutor(
NamedThreadFactory.create(nodeName, "tx-state-storage-scheduled-pool", LOG));

txStateStoragePool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
NamedThreadFactory.create(nodeName, "tx-state-storage-pool", LOG));

scanRequestExecutor = Executors.newSingleThreadExecutor(
NamedThreadFactory.create(nodeName, "scan-query-executor-", LOG));

rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
NamedThreadFactory.create(nodeName, "rebalance-scheduler", LOG));

Expand Down Expand Up @@ -830,6 +837,7 @@ private void updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> ass
updatedRaftGroupService,
txManager,
lockMgr,
scanRequestExecutor,
partId,
tblId,
table.indexesLockers(partId),
Expand Down Expand Up @@ -959,6 +967,7 @@ public void stop() {
shutdownAndAwaitTermination(ioExecutor, 10, TimeUnit.SECONDS);
shutdownAndAwaitTermination(txStateStoragePool, 10, TimeUnit.SECONDS);
shutdownAndAwaitTermination(txStateStorageScheduledPool, 10, TimeUnit.SECONDS);
shutdownAndAwaitTermination(scanRequestExecutor, 10, TimeUnit.SECONDS);
shutdownAndAwaitTermination(incomingSnapshotsExecutor, 10, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -1832,6 +1841,7 @@ public boolean onUpdate(@NotNull WatchEvent evt) {
internalTable.partitionRaftGroupService(partId),
txManager,
lockMgr,
scanRequestExecutor,
partId,
tblId,
tbl.indexesLockers(partId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand All @@ -45,7 +44,6 @@
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
Expand Down Expand Up @@ -83,7 +81,6 @@
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSwapRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
Expand Down Expand Up @@ -164,9 +161,7 @@ public class PartitionReplicaListener implements ReplicaListener {
private final PlacementDriver placementDriver;

/** Runs async scan tasks for effective tail recursion execution (avoid deep recursive calls). */
private final Executor scanRequestExecutor = Executors.newSingleThreadExecutor(
new NamedThreadFactory("scan-query-executor-", Loggers.forClass(PartitionReplicaListener.class))
);
private final Executor scanRequestExecutor;

/**
* Map to control clock's update in the read only transactions concurrently with a commit timestamp.
Expand Down Expand Up @@ -205,6 +200,7 @@ public PartitionReplicaListener(
RaftGroupService raftClient,
TxManager txManager,
LockManager lockManager,
Executor scanRequestExecutor,
int partId,
UUID tableId,
Supplier<Map<UUID, IndexLocker>> indexesLockers,
Expand All @@ -221,6 +217,7 @@ public PartitionReplicaListener(
this.raftClient = raftClient;
this.txManager = txManager;
this.lockManager = lockManager;
this.scanRequestExecutor = scanRequestExecutor;
this.partId = partId;
this.tableId = tableId;
this.indexesLockers = indexesLockers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ private static void beforeAll() {
mockRaftClient,
mock(TxManager.class),
LOCK_MANAGER,
Runnable::run,
PART_ID,
TABLE_ID,
() -> Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ private static void beforeAll() {
mockRaftClient,
mock(TxManager.class),
lockManager,
Runnable::run,
partId,
tblId,
() -> Map.of(pkLocker.id(), pkLocker, sortedIndexId, sortedIndexLocker, hashIndexId, hashIndexLocker),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ public void result(@Nullable Serializable r) {
partitionMap.get(0),
this.txManager,
this.txManager.lockManager(),
Runnable::run,
0,
tableId,
() -> Map.of(pkLocker.id(), pkLocker),
Expand Down

0 comments on commit 92ffac8

Please sign in to comment.