Skip to content

Commit

Permalink
IGNITE-10580: SQL: Fixed incorrect re-use of cached connection for lo…
Browse files Browse the repository at this point in the history
…cal queries. This closes #5592.
  • Loading branch information
tledkov-gridgain authored and devozerov committed Dec 25, 2018
1 parent 2dabbd2 commit 86a815e
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.processors.query.h2;

import com.sun.org.apache.xml.internal.utils.ObjectPool;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -34,17 +35,23 @@ public class H2FieldsIterator extends H2ResultSetIterator<List<?>> {
/** */
private transient MvccQueryTracker mvccTracker;

/** Detached connection. */
private final ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detachedConn;

/**
* @param data Data.
* @param mvccTracker Mvcc tracker.
* @param forUpdate {@code SELECT FOR UPDATE} flag.
* @param detachedConn Detached connection.
* @throws IgniteCheckedException If failed.
*/
public H2FieldsIterator(ResultSet data, MvccQueryTracker mvccTracker, boolean forUpdate)
public H2FieldsIterator(ResultSet data, MvccQueryTracker mvccTracker, boolean forUpdate,
ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detachedConn)
throws IgniteCheckedException {
super(data, forUpdate);

this.mvccTracker = mvccTracker;
this.detachedConn = detachedConn;
}

/** {@inheritDoc} */
Expand All @@ -62,6 +69,9 @@ public H2FieldsIterator(ResultSet data, MvccQueryTracker mvccTracker, boolean fo
super.onClose();
}
finally {
if (detachedConn != null)
detachedConn.recycle();

if (mvccTracker != null)
mvccTracker.onDone();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,8 @@ else if (DdlStatementsProcessor.isDdlStatement(p)) {

GridH2QueryContext.set(ctx);

ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detachedConn = connMgr.detachThreadConnection();

try {
ResultSet rs = executeSqlQueryWithTimer(stmt0, conn, qry0, params, timeout0, cancel);

Expand All @@ -657,10 +659,20 @@ else if (DdlStatementsProcessor.isDdlStatement(p)) {

enlistFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>() {
@Override public void apply(IgniteInternalFuture<Long> fut) {
if (fut.error() != null)
sfuFut0.onResult(IgniteH2Indexing.this.ctx.localNodeId(), 0L, false, fut.error());
else
sfuFut0.onResult(IgniteH2Indexing.this.ctx.localNodeId(), fut.result(), false, null);
if (fut.error() != null) {
sfuFut0.onResult(
IgniteH2Indexing.this.ctx.localNodeId(),
0L,
false,
fut.error());
}
else {
sfuFut0.onResult(
IgniteH2Indexing.this.ctx.localNodeId(),
fut.result(),
false,
null);
}
}
});

Expand All @@ -679,9 +691,12 @@ else if (DdlStatementsProcessor.isDdlStatement(p)) {
}
}

return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 != null);
return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 != null,
detachedConn);
}
catch (IgniteCheckedException | RuntimeException | Error e) {
detachedConn.recycle();

try {
if (mvccTracker0 != null)
mvccTracker0.onDone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ else if (mvccTracker != null)
timeoutMillis,
cancel);

resIter = new H2FieldsIterator(res, mvccTracker, false);
resIter = new H2FieldsIterator(res, mvccTracker, false, null);

mvccTracker = null; // To prevent callback inside finally block;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query;

import java.util.Iterator;
import java.util.List;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* Test for statement reuse.
*/
@RunWith(JUnit4.class)
public class SqlLocalQueryConnectionAndStatementTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();

startGrids(1);
}

/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
stopAllGrids();

super.afterTestsStopped();
}

/**
*/
@Test
public void testReplicated() {
sql("CREATE TABLE repl_tbl (id LONG PRIMARY KEY, val LONG) WITH \"template=replicated\"").getAll();

for (int i = 0; i < 10; i++)
sql("insert into repl_tbl(id,val) VALUES(" + i + "," + i + ")").getAll();

Iterator<List<?>> it0 = sql(new SqlFieldsQuery("SELECT * FROM repl_tbl where id > ?").setArgs(1)).iterator();

it0.next();

sql(new SqlFieldsQuery("SELECT * FROM repl_tbl where id > ?").setArgs(1)).getAll();

it0.next();
}

/**
*/
@Test
public void testLocalQuery() {
sql("CREATE TABLE tbl (id LONG PRIMARY KEY, val LONG)").getAll();

for (int i = 0; i < 10; i++)
sql("insert into tbl(id,val) VALUES(" + i + "," + i + ")").getAll();

Iterator<List<?>> it0 = sql(
new SqlFieldsQuery("SELECT * FROM tbl where id > ?")
.setArgs(1)
.setLocal(true))
.iterator();

it0.next();

sql(new SqlFieldsQuery("SELECT * FROM tbl where id > ?").setArgs(1).setLocal(true)).getAll();

it0.next();
}

/**
* @param sql SQL query.
* @return Results.
*/
private FieldsQueryCursor<List<?>> sql(String sql) {
return sql(new SqlFieldsQuery(sql));
}

/**
* @param qry SQL query.
* @return Results.
*/
private FieldsQueryCursor<List<?>> sql(SqlFieldsQuery qry) {
GridQueryProcessor qryProc = grid(0).context().query();

return qryProc.querySqlFields(qry, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlDistributedJoinSelfTest;
import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexMultiNodeSelfTest;
import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexSelfTest;
import org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest;
import org.apache.ignite.internal.processors.query.h2.CacheQueryEntityWithDateTimeApiFieldsTest;
import org.apache.ignite.internal.processors.query.h2.twostep.CacheQueryMemoryLeakTest;
import org.apache.ignite.internal.processors.query.h2.twostep.CreateTableWithDateKeySelfTest;
Expand Down Expand Up @@ -134,6 +135,8 @@ public static TestSuite suite() {

suite.addTest(new JUnit4TestAdapter(IgniteCacheQueriesLoadTest1.class));

suite.addTest(new JUnit4TestAdapter(SqlLocalQueryConnectionAndStatementTest.class));

return suite;
}
}

0 comments on commit 86a815e

Please sign in to comment.