Skip to content

Commit

Permalink
GG-17475 Port [IGNITE-11202] SQL: Move partition reservation logic to…
Browse files Browse the repository at this point in the history
… separate class
  • Loading branch information
tledkov committed Apr 29, 2019
1 parent e55061a commit afa3492
Show file tree
Hide file tree
Showing 8 changed files with 479 additions and 363 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
Expand Down Expand Up @@ -288,6 +289,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
private final ConcurrentMap<QueryTable, GridH2Table> dataTables = new ConcurrentHashMap<>();

/** Partition reservation manager. */
private PartitionReservationManager partReservationMgr;

/** */
private volatile GridBoundedConcurrentLinkedHashMap<H2TwoStepCachedQueryKey, H2TwoStepCachedQuery> twoStepCache =
new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE);
Expand Down Expand Up @@ -2803,6 +2807,8 @@ public GridReduceQueryExecutor reduceQueryExecutor() {
SysProperties.serializeJavaObject = false;
}

partReservationMgr = new PartitionReservationManager(ctx);

connMgr = new ConnectionManager(ctx);

longRunningQryMgr = new LongRunningQueryManager(ctx);
Expand Down Expand Up @@ -3110,10 +3116,11 @@ private void createSqlFunctions(String schema, Class<?>[] clss) throws IgniteChe

String schemaName = schema(cacheName);

partReservationMgr.onCacheStop(cacheName);

H2Schema schema = schemas.get(schemaName);

if (schema != null) {
mapQryExec.onCacheStop(cacheName);
dmlProc.onCacheStop(cacheName);

// Remove this mapping only after callback to DML proc - it needs that mapping internally
Expand Down Expand Up @@ -3322,6 +3329,13 @@ public ConnectionManager connections() {
return connMgr;
}

/**
* @return Partition reservation manager.
*/
public PartitionReservationManager partitionReservationManager() {
return partReservationMgr;
}

/**
* Collect cache identifiers from two-step query.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
Expand All @@ -53,7 +54,7 @@ public class GridH2QueryContext {
private volatile boolean cleared;

/** */
private List<GridReservable> reservations;
private PartitionReservation reserved;

/** Range streams for indexes. */
private Map<Integer, Object> streams;
Expand Down Expand Up @@ -190,11 +191,11 @@ public DistributedJoinMode distributedJoinMode() {
}

/**
* @param reservations Reserved partitions or group reservations.
* @param reserved Reserved partitions or group reservations.
* @return {@code this}.
*/
public GridH2QueryContext reservations(List<GridReservable> reservations) {
this.reservations = reservations;
public GridH2QueryContext reservations(PartitionReservation reserved) {
this.reserved = reserved;

return this;
}
Expand Down Expand Up @@ -416,12 +417,8 @@ private static boolean doClear(Key key, boolean nodeStop) {
public void clearContext(boolean nodeStop) {
cleared = true;

List<GridReservable> r = reservations;

if (!nodeStop && !F.isEmpty(r)) {
for (int i = 0; i < r.size(); i++)
r.get(i).release();
}
if (!nodeStop && reserved != null)
reserved.release();
}

/**
Expand Down

0 comments on commit afa3492

Please sign in to comment.