Skip to content

Commit

Permalink
IGNITE-11227: SQL: Decoupled query execution entry point from DML. This
Browse files Browse the repository at this point in the history
closes #6246.
  • Loading branch information
devozerov committed Mar 8, 2019
1 parent 9dd4f75 commit 455b56d
Show file tree
Hide file tree
Showing 14 changed files with 1,167 additions and 700 deletions.
Expand Up @@ -25,7 +25,6 @@
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.SqlClientContext;
Expand Down Expand Up @@ -121,14 +120,19 @@ public void testDataPageScanBatching() throws Exception {
private void checkDataPageScanInBatch(String qryWithParam, @Nullable Boolean dps) throws Exception {
String params = (dps == null) ? null : "dataPageScanEnabled=" + dps;

int expCnt = 0;

try (Connection conn = GridTestUtils.connect(grid(0), params)) {
try (PreparedStatement upd = conn.prepareStatement(qryWithParam)) {
for (int i = 0; i < TOTAL_QUERIES_TO_EXECUTE; i++) {
upd.setInt(1, i);
upd.addBatch();

if ((i + 1) % BATCH_SIZE == 0 || (i + 1) == TOTAL_QUERIES_TO_EXECUTE)
if ((i + 1) % BATCH_SIZE == 0 || (i + 1) == TOTAL_QUERIES_TO_EXECUTE) {
upd.executeBatch();

expCnt++;
}
}
}
}
Expand All @@ -146,10 +150,7 @@ private void checkDataPageScanInBatch(String qryWithParam, @Nullable Boolean dps

int executed = IndexingWithQueries.queries.size();

assertTrue(
"Expected that there are executed at least " + TOTAL_QUERIES_TO_EXECUTE + " queries. " +
"But executed only " + executed,
executed >= TOTAL_QUERIES_TO_EXECUTE);
assertEquals(expCnt, executed);

IndexingWithQueries.queries.clear();
}
Expand Down Expand Up @@ -198,12 +199,24 @@ private static class IndexingWithQueries extends IgniteH2Indexing {
static final Queue<SqlFieldsQuery> queries = new LinkedBlockingQueue<>();

/** {@inheritDoc} */
@Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
@Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts,
MvccQueryTracker tracker, GridQueryCancel cancel, boolean registerAsNewQry) {
@Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
String schemaName,
SqlFieldsQuery qry,
@Nullable SqlClientContext cliCtx,
boolean keepBinary,
boolean failOnMultipleStmts,
GridQueryCancel cancel
) {
queries.add(qry);

return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, tracker, cancel, registerAsNewQry);
return super.querySqlFields(
schemaName,
qry,
cliCtx,
keepBinary,
failOnMultipleStmts,
cancel
);
}
}
}
Expand Up @@ -32,7 +32,6 @@
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.SqlClientContext;
Expand Down Expand Up @@ -517,13 +516,24 @@ static final class IndexingWithContext extends IgniteH2Indexing {
}

/** {@inheritDoc} */
@Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
@Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker,
GridQueryCancel cancel, boolean registerAsNewQry) {
@Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
String schemaName,
SqlFieldsQuery qry,
@Nullable SqlClientContext cliCtx,
boolean keepBinary,
boolean failOnMultipleStmts,
GridQueryCancel cancel
) {
IndexingWithContext.cliCtx = cliCtx;

return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, tracker, cancel,
registerAsNewQry);
return super.querySqlFields(
schemaName,
qry,
cliCtx,
keepBinary,
failOnMultipleStmts,
cancel
);
}
}
}
Expand Up @@ -788,16 +788,17 @@ public static boolean mvccEnabled(GridKernalContext ctx) {
/**
* Initialises MVCC filter and returns MVCC query tracker if needed.
* @param cctx Cache context.
* @param startTx Start transaction flag.
* @param autoStartTx Start transaction flag.
* @return MVCC query tracker.
* @throws IgniteCheckedException If failed.
*/
@NotNull public static MvccQueryTracker mvccTracker(GridCacheContext cctx, boolean startTx) throws IgniteCheckedException {
@NotNull public static MvccQueryTracker mvccTracker(GridCacheContext cctx, boolean autoStartTx)
throws IgniteCheckedException {
assert cctx != null && cctx.mvccEnabled();

GridNearTxLocal tx = tx(cctx.kernalContext());

if (tx == null && startTx)
if (tx == null && autoStartTx)
tx = txStart(cctx, 0);

return mvccTracker(cctx, tx);
Expand Down
Expand Up @@ -31,7 +31,6 @@
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
Expand Down Expand Up @@ -85,14 +84,16 @@ public interface GridQueryIndexing {
* @param cliCtx Client context.
* @param keepBinary Keep binary flag.
* @param failOnMultipleStmts Whether an exception should be thrown for multiple statements query.
* @param tracker Query tracker.
* @param registerAsNewQry {@code true} In case it's new query which should be registered as running query,
* {@code false} otherwise.
* @return Cursor.
*/
public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker,
GridQueryCancel cancel, boolean registerAsNewQry);
public List<FieldsQueryCursor<List<?>>> querySqlFields(
String schemaName,
SqlFieldsQuery qry,
SqlClientContext cliCtx,
boolean keepBinary,
boolean failOnMultipleStmts,
GridQueryCancel cancel
);

/**
* Execute an INSERT statement using data streamer as receiver.
Expand Down
Expand Up @@ -2231,6 +2231,8 @@ public List<FieldsQueryCursor<List<?>>> querySqlFields(
throw new CacheException("Execution of local SqlFieldsQuery on client node disallowed.");

return executeQuerySafe(cctx, () -> {
assert idx != null;

final String schemaName = qry.getSchema() != null ? qry.getSchema()
: (cctx != null ? idx.schema(cctx.name()) : QueryUtils.DFLT_SCHEMA);

Expand All @@ -2239,16 +2241,13 @@ public List<FieldsQueryCursor<List<?>>> querySqlFields(
@Override public List<FieldsQueryCursor<List<?>>> applyx() {
GridQueryCancel cancel0 = cancel != null ? cancel : new GridQueryCancel();

List<FieldsQueryCursor<List<?>>> res =
idx.querySqlFields(
List<FieldsQueryCursor<List<?>>> res = idx.querySqlFields(
schemaName,
qry,
cliCtx,
keepBinary,
failOnMultipleStmts,
null,
cancel0,
true
cancel0
);

if (cctx != null)
Expand Down
Expand Up @@ -42,7 +42,6 @@
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
Expand Down Expand Up @@ -285,9 +284,14 @@ private static class FailedIndexing implements GridQueryIndexing {
}

/** {@inheritDoc} */
@Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker,
GridQueryCancel cancel, boolean registerAsNewQry) {
@Override public List<FieldsQueryCursor<List<?>>> querySqlFields(
String schemaName,
SqlFieldsQuery qry,
SqlClientContext cliCtx,
boolean keepBinary,
boolean failOnMultipleStmts,
GridQueryCancel cancel
) {
return null;
}

Expand Down
Expand Up @@ -37,7 +37,6 @@
import org.apache.ignite.cache.QueryIndexType;
import org.apache.ignite.cache.query.BulkLoadContextCursor;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
Expand All @@ -50,7 +49,6 @@
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
Expand Down Expand Up @@ -160,15 +158,16 @@ private static boolean isDdl(SqlCommand cmd) {
/**
* Execute command.
*
* @param qry Query.
* @param sql SQL.
* @param cmdNative Native command (if any).
* @param cmdH2 H2 command (if any).
* @param params Parameters.
* @param cliCtx Client context.
* @param qryId Running query ID.
* @return Result.
*/
public CommandResult runCommand(SqlFieldsQuery qry, SqlCommand cmdNative, GridSqlStatement cmdH2,
@Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException {
public CommandResult runCommand(String sql, SqlCommand cmdNative, GridSqlStatement cmdH2,
QueryParameters params, @Nullable SqlClientContext cliCtx, Long qryId) throws IgniteCheckedException {
assert cmdNative != null || cmdH2 != null;

// Do execute.
Expand All @@ -179,7 +178,7 @@ public CommandResult runCommand(SqlFieldsQuery qry, SqlCommand cmdNative, GridSq
assert cmdH2 == null;

if (isDdl(cmdNative))
runCommandNativeDdl(qry.getSql(), cmdNative);
runCommandNativeDdl(sql, cmdNative);
else if (cmdNative instanceof SqlBulkLoadCommand) {
res = processBulkLoadCommand((SqlBulkLoadCommand) cmdNative, qryId);

Expand All @@ -188,12 +187,12 @@ else if (cmdNative instanceof SqlBulkLoadCommand) {
else if (cmdNative instanceof SqlSetStreamingCommand)
processSetStreamingCommand((SqlSetStreamingCommand)cmdNative, cliCtx);
else
processTxCommand(cmdNative, qry);
processTxCommand(cmdNative, params);
}
else {
assert cmdH2 != null;

runCommandH2(qry.getSql(), cmdH2);
runCommandH2(sql, cmdH2);
}

return new CommandResult(res, unregister);
Expand Down Expand Up @@ -839,13 +838,12 @@ private static String getTypeClassName(GridSqlColumn col) {
/**
* Process transactional command.
* @param cmd Command.
* @param qry Query.
* @param params Parameters.
* @throws IgniteCheckedException if failed.
*/
private void processTxCommand(SqlCommand cmd, SqlFieldsQuery qry)
private void processTxCommand(SqlCommand cmd, QueryParameters params)
throws IgniteCheckedException {
NestedTxMode nestedTxMode = qry instanceof SqlFieldsQueryEx ? ((SqlFieldsQueryEx)qry).getNestedTxMode() :
NestedTxMode.DEFAULT;
NestedTxMode nestedTxMode = params.nestedTxMode();

GridNearTxLocal tx = tx(ctx);

Expand All @@ -862,7 +860,7 @@ private void processTxCommand(SqlCommand cmd, SqlFieldsQuery qry)
case COMMIT:
doCommit(tx);

txStart(ctx, qry.getTimeout());
txStart(ctx, params.timeout());

break;

Expand All @@ -881,7 +879,7 @@ private void processTxCommand(SqlCommand cmd, SqlFieldsQuery qry)
}
}
else
txStart(ctx, qry.getTimeout());
txStart(ctx, params.timeout());
}
else if (cmd instanceof SqlCommitTransactionCommand) {
// Do nothing if there's no transaction.
Expand Down

0 comments on commit 455b56d

Please sign in to comment.