Skip to content

Commit

Permalink
IGNITE-4490: SQL: avoid querying H2 for INSERT and MERGE when it is n…
Browse files Browse the repository at this point in the history
…ot needed. This closes #1387.
  • Loading branch information
alexpaschenko authored and devozerov committed Dec 19, 2017
1 parent 8f327a1 commit ec29023
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 137 deletions.
Expand Up @@ -107,7 +107,7 @@ public void testIndexErrors() throws SQLException {
public void testDmlErrors() throws SQLException { public void testDmlErrors() throws SQLException {
checkErrorState("INSERT INTO \"test\".INTEGER(_key, _val) values(1, null)", "22004"); checkErrorState("INSERT INTO \"test\".INTEGER(_key, _val) values(1, null)", "22004");


checkErrorState("INSERT INTO \"test\".INTEGER(_key, _val) values(1, 'zzz')", "50000"); checkErrorState("INSERT INTO \"test\".INTEGER(_key, _val) values(1, 'zzz')", "0700B");
} }


/** /**
Expand Down
Expand Up @@ -260,15 +260,16 @@ public void testBatchExceptionPrepared() throws SQLException {
pstmt.executeBatch(); pstmt.executeBatch();


fail("BatchUpdateException must be thrown"); fail("BatchUpdateException must be thrown");
} catch(BatchUpdateException e) { }
catch(BatchUpdateException e) {
int [] updCnts = e.getUpdateCounts(); int [] updCnts = e.getUpdateCounts();


assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length); assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);


for (int i = 0; i < BATCH_SIZE; ++i) for (int i = 0; i < BATCH_SIZE; ++i)
assertEquals("Invalid update count",1, updCnts[i]); assertEquals("Invalid update count",1, updCnts[i]);


if (!e.getMessage().contains("Failed to execute SQL query.")) { if (!e.getMessage().contains("Value conversion failed")) {
log.error("Invalid exception: ", e); log.error("Invalid exception: ", e);


fail(); fail();
Expand Down
Expand Up @@ -32,7 +32,6 @@
import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException; import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry; import javax.cache.processor.MutableEntry;

import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteException;
Expand All @@ -51,7 +50,6 @@
import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender; import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender;
import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo; import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
import org.apache.ignite.internal.processors.query.h2.dml.FastUpdate;
import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode; import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
Expand Down Expand Up @@ -167,7 +165,7 @@ else if (!opCtx.isKeepBinary())
UpdateResult r; UpdateResult r;


try { try {
r = executeUpdateStatement(schemaName, cctx, conn, prepared, fieldsQry, loc, filters, cancel, errKeys); r = executeUpdateStatement(schemaName, cctx, conn, prepared, fieldsQry, loc, filters, cancel);
} }
finally { finally {
cctx.operationContextPerCall(opCtx); cctx.operationContextPerCall(opCtx);
Expand Down Expand Up @@ -247,15 +245,13 @@ GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, Connection conn, P
* @throws IgniteCheckedException if failed. * @throws IgniteCheckedException if failed.
*/ */
@SuppressWarnings({"unchecked", "ConstantConditions"}) @SuppressWarnings({"unchecked", "ConstantConditions"})
long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, Object[] args) long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, final Object[] args)
throws IgniteCheckedException { throws IgniteCheckedException {
args = U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY);

Prepared p = GridSqlQueryParser.prepared(stmt); Prepared p = GridSqlQueryParser.prepared(stmt);


assert p != null; assert p != null;


UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null); final UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null);


if (!F.eq(streamer.cacheName(), plan.cacheContext().name())) if (!F.eq(streamer.cacheName(), plan.cacheContext().name()))
throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" + throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" +
Expand All @@ -270,14 +266,22 @@ long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, Obje


final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount()); final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());


final GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), plan.selectQuery(),
F.asList(args), null, false, 0, null);

QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() { QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() { @Override public Iterator<List<?>> iterator() {
try { try {
return new GridQueryCacheObjectsIterator(res.iterator(), idx.objectContext(), Iterator<List<?>> it;
cctx.keepBinary());
if (!F.isEmpty(plan.selectQuery())) {
GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()),
plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)),
null, false, 0, null);

it = res.iterator();
}
else
it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator();

return new GridQueryCacheObjectsIterator(it, idx.objectContext(), cctx.keepBinary());
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
throw new IgniteException(e); throw new IgniteException(e);
Expand Down Expand Up @@ -329,27 +333,23 @@ long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, Obje
* @param loc Local query flag. * @param loc Local query flag.
* @param filters Cache name and key filter. * @param filters Cache name and key filter.
* @param cancel Query cancel state holder. * @param cancel Query cancel state holder.
* @param failedKeys Keys to restrict UPDATE and DELETE operations with. Null or empty array means no restriction.
* @return Pair [number of successfully processed items; keys that have failed to be processed] * @return Pair [number of successfully processed items; keys that have failed to be processed]
* @throws IgniteCheckedException if failed. * @throws IgniteCheckedException if failed.
*/ */
@SuppressWarnings({"ConstantConditions", "unchecked"}) @SuppressWarnings({"ConstantConditions", "unchecked"})
private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx, Connection c, private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx, Connection c,
Prepared prepared, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, Prepared prepared, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel)
GridQueryCancel cancel, Object[] failedKeys) throws IgniteCheckedException { throws IgniteCheckedException {
int mainCacheId = cctx.cacheId(); int mainCacheId = cctx.cacheId();


Integer errKeysPos = null; Integer errKeysPos = null;


UpdatePlan plan = getPlanForStatement(schemaName, c, prepared, fieldsQry, loc, errKeysPos); UpdatePlan plan = getPlanForStatement(schemaName, c, prepared, fieldsQry, loc, errKeysPos);


FastUpdate fastUpdate = plan.fastUpdate(); UpdateResult fastUpdateRes = plan.processFast(fieldsQry.getArgs());

if (fastUpdate != null) {
assert F.isEmpty(failedKeys) && errKeysPos == null;


return fastUpdate.execute(plan.cacheContext().cache(), fieldsQry.getArgs()); if (fastUpdateRes != null)
} return fastUpdateRes;


if (plan.distributedPlan() != null) { if (plan.distributedPlan() != null) {
UpdateResult result = doDistributedUpdate(schemaName, fieldsQry, plan, cancel); UpdateResult result = doDistributedUpdate(schemaName, fieldsQry, plan, cancel);
Expand All @@ -359,13 +359,13 @@ private UpdateResult executeUpdateStatement(String schemaName, final GridCacheCo
return result; return result;
} }


assert !F.isEmpty(plan.selectQuery()); Iterable<List<?>> cur;

QueryCursorImpl<List<?>> cur;


// Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
// sub-query and not some dummy stuff like "select 1, 2, 3;" // sub-query and not some dummy stuff like "select 1, 2, 3;"
if (!loc && !plan.isLocalSubquery()) { if (!loc && !plan.isLocalSubquery()) {
assert !F.isEmpty(plan.selectQuery());

SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated()) SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
.setArgs(fieldsQry.getArgs()) .setArgs(fieldsQry.getArgs())
.setDistributedJoins(fieldsQry.isDistributedJoins()) .setDistributedJoins(fieldsQry.isDistributedJoins())
Expand All @@ -374,9 +374,10 @@ private UpdateResult executeUpdateStatement(String schemaName, final GridCacheCo
.setPageSize(fieldsQry.getPageSize()) .setPageSize(fieldsQry.getPageSize())
.setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS); .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);


cur = (QueryCursorImpl<List<?>>)idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, cur = idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, cancel, mainCacheId, true).get(0);
cancel, mainCacheId, true).get(0);
} }
else if (plan.hasRows())
cur = plan.createRows(fieldsQry.getArgs());
else { else {
final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQuery(), final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQuery(),
F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel); F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
Expand Down
Expand Up @@ -17,11 +17,15 @@


package org.apache.ignite.internal.processors.query.h2.dml; package org.apache.ignite.internal.processors.query.h2.dml;


import org.apache.ignite.internal.util.lang.GridPlainClosure;

/** /**
* Operand for fast UPDATE or DELETE (single item operation that does not involve any SELECT). * DML argument
*/ */
public interface FastUpdateArgument extends GridPlainClosure<Object[], Object> { public interface DmlArgument {
// No-op. /**
* Get argument from parameter list.
*
* @param params Query input parameters.
* @return value.
*/
Object get(Object[] params);
} }
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.h2.dml;

import org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlElement;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlParameter;
import org.jetbrains.annotations.Nullable;

/**
* DML arguments factory.
*/
public class DmlArguments {
/** Operand that always evaluates as {@code null}. */
private final static DmlArgument NULL_ARG = new ConstantArgument(null);

/**
* Create argument from AST element.
*
* @param el Element.
* @return DML argument.
*/
public static DmlArgument create(@Nullable GridSqlElement el) {
assert el == null ^ (el instanceof GridSqlConst || el instanceof GridSqlParameter);

if (el == null)
return NULL_ARG;

if (el instanceof GridSqlConst)
return new ConstantArgument(((GridSqlConst)el).value().getObject());
else
return new ParamArgument(((GridSqlParameter)el).index());
}

/**
* Private constructor.
*/
private DmlArguments() {
// No-op.
}

/**
* Value argument.
*/
private static class ConstantArgument implements DmlArgument {
/** Value to return. */
private final Object val;

/**
* Constructor.
*
* @param val Value.
*/
private ConstantArgument(Object val) {
this.val = val;
}

/** {@inheritDoc} */
public Object get(Object[] params) {
return val;
}
}

/**
* Parameter argument.
*/
private static class ParamArgument implements DmlArgument {
/** Value to return. */
private final int paramIdx;

/**
* Constructor.
*
* @param paramIdx Parameter index.
*/
private ParamArgument(int paramIdx) {
assert paramIdx >= 0;

this.paramIdx = paramIdx;
}

/** {@inheritDoc} */
@Override public Object get(Object[] params) {
assert params.length > paramIdx;

return params[paramIdx];
}
}
}
Expand Up @@ -83,7 +83,7 @@ private DmlAstUtils() {
* @param cols Columns to insert values into. * @param cols Columns to insert values into.
* @param rows Rows to create pseudo-SELECT upon. * @param rows Rows to create pseudo-SELECT upon.
* @param subQry Subquery to use rather than rows. * @param subQry Subquery to use rather than rows.
* @return Subquery or pseudo-SELECT to evaluate inserted expressions. * @return Subquery or pseudo-SELECT to evaluate inserted expressions, or {@code null} no query needs to be run.
*/ */
public static GridSqlQuery selectForInsertOrMerge(GridSqlColumn[] cols, List<GridSqlElement[]> rows, public static GridSqlQuery selectForInsertOrMerge(GridSqlColumn[] cols, List<GridSqlElement[]> rows,
GridSqlQuery subQry) { GridSqlQuery subQry) {
Expand All @@ -98,6 +98,8 @@ public static GridSqlQuery selectForInsertOrMerge(GridSqlColumn[] cols, List<Gri


GridSqlArray[] args = new GridSqlArray[cols.length]; GridSqlArray[] args = new GridSqlArray[cols.length];


boolean noQry = true;

for (int i = 0; i < cols.length; i++) { for (int i = 0; i < cols.length; i++) {
GridSqlArray arr = new GridSqlArray(rows.size()); GridSqlArray arr = new GridSqlArray(rows.size());


Expand All @@ -121,10 +123,18 @@ public static GridSqlQuery selectForInsertOrMerge(GridSqlColumn[] cols, List<Gri
for (GridSqlElement[] row : rows) { for (GridSqlElement[] row : rows) {
assert cols.length == row.length; assert cols.length == row.length;


for (int i = 0; i < row.length; i++) for (int i = 0; i < row.length; i++) {
GridSqlElement el = row[i];

noQry &= (el instanceof GridSqlConst || el instanceof GridSqlParameter);

args[i].addChild(row[i]); args[i].addChild(row[i]);
}
} }


if (noQry)
return null;

return sel; return sel;
} }
else { else {
Expand Down
Expand Up @@ -17,6 +17,10 @@


package org.apache.ignite.internal.processors.query.h2.dml; package org.apache.ignite.internal.processors.query.h2.dml;


import java.lang.reflect.Array;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;
import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.IgniteSQLException;
Expand All @@ -30,11 +34,6 @@
import org.h2.value.ValueTime; import org.h2.value.ValueTime;
import org.h2.value.ValueTimestamp; import org.h2.value.ValueTimestamp;


import java.lang.reflect.Array;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;

/** /**
* DML utility methods. * DML utility methods.
*/ */
Expand Down Expand Up @@ -101,7 +100,15 @@ public static Object convert(Object val, GridH2RowDescriptor desc, Class<?> expC
return newArr; return newArr;
} }


return H2Utils.convert(val, desc, type); Object res = H2Utils.convert(val, desc, type);

if (res instanceof Date && res.getClass() != Date.class && expCls == Date.class) {
// We can get a Timestamp instead of Date when converting a String to Date
// without query - let's handle this
return new Date(((Date) res).getTime());
}

return res;
} }
catch (Exception e) { catch (Exception e) {
throw new IgniteSQLException("Value conversion failed [from=" + currCls.getName() + ", to=" + throw new IgniteSQLException("Value conversion failed [from=" + currCls.getName() + ", to=" +
Expand Down

0 comments on commit ec29023

Please sign in to comment.