Skip to content

Commit

Permalink
ignite-6858 Fail query if thread has is cache lock and exchange is in…
Browse files Browse the repository at this point in the history
… progress
  • Loading branch information
ascherbakoff authored and sboikov committed Nov 16, 2017
1 parent 52b46c3 commit caad1e9
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4122,14 +4122,13 @@ public boolean addTimeoutHandler() {
/** {@inheritDoc} */
@Override public void onTimeout() {
if (state(MARKED_ROLLBACK, true) || (state() == MARKED_ROLLBACK)) {
if (log.isDebugEnabled())
log.debug("Will rollback tx on timeout: " + this);

cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
// Note: if rollback asynchonously on timeout should not clear thread map
// Note: if rollback asynchronously on timeout should not clear thread map
// since thread started tx still should be able to see this tx.
rollbackNearTxLocalAsync(true);

U.warn(log, "Transaction was rolled back because the timeout is reached: " + GridNearTxLocal.this);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo;
Expand Down Expand Up @@ -2476,6 +2477,22 @@ public AffinityTopologyVersion readyTopologyVersion() {
return ctx.cache().context().exchange().readyAffinityVersion();
}

/**
* @param readyVer Ready topology version.
*
* @return {@code true} If pending distributed exchange exists because server topology is changed.
*/
public boolean serverTopologyChanged(AffinityTopologyVersion readyVer) {
GridDhtPartitionsExchangeFuture fut = ctx.cache().context().exchange().lastTopologyFuture();

if (fut.isDone())
return false;

AffinityTopologyVersion initVer = fut.initialVersion();

return initVer.compareTo(readyVer) > 0 && !CU.clientNode(fut.firstEvent().node());
}

/**
* @param topVer Topology version.
* @throws IgniteCheckedException If failed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.transactions.TransactionException;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Session;
import org.h2.index.Index;
Expand Down Expand Up @@ -560,9 +561,15 @@ public Iterator<List<?>> query(

AffinityTopologyVersion topVer = h2.readyTopologyVersion();

// Check if topology is changed while retrying on locked topology.
if (h2.serverTopologyChanged(topVer) && ctx.cache().context().lockedTopologyVersion(null) != null) {
throw new CacheException(new TransactionException("Server topology is changed during query " +
"execution inside a transaction. It's recommended to rollback and retry transaction."));
}

List<Integer> cacheIds = qry.cacheIds();

Collection<ClusterNode> nodes = null;
Collection<ClusterNode> nodes;

// Explicit partition mapping for unstable topology.
Map<ClusterNode, IntArray> partsMap = null;
Expand Down Expand Up @@ -1737,4 +1744,4 @@ Map<ClusterNode, IntArray> queryPartitionsMap() {
return qryMap;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,25 @@
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.typedef.CAX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.TransactionException;
import org.apache.ignite.transactions.TransactionTimeoutException;

import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
Expand Down Expand Up @@ -224,7 +229,7 @@ public void testRestarts() throws Exception {

IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() {
@Override public void applyx() throws IgniteCheckedException {
GridRandom rnd = new GridRandom();
final GridRandom rnd = new GridRandom();

while (!qrysDone.get()) {
int g;
Expand All @@ -235,28 +240,43 @@ public void testRestarts() throws Exception {
while (!locks.compareAndSet(g, 0, 1));

try {
final IgniteEx grid = grid(g);

if (rnd.nextBoolean()) { // Partitioned query.
IgniteCache<?,?> cache = grid(g).cache("pu");
final IgniteCache<?,?> cache = grid.cache("pu");

SqlFieldsQuery qry = new SqlFieldsQuery(PARTITIONED_QRY);
final SqlFieldsQuery qry = new SqlFieldsQuery(PARTITIONED_QRY);

boolean smallPageSize = rnd.nextBoolean();

if (smallPageSize)
qry.setPageSize(3);

final IgniteCache<Integer, Company> co = grid.cache("co");

try {
assertEquals(pRes, cache.query(qry).getAll());
runQuery(grid, new Runnable() {
@Override public void run() {
if (rnd.nextBoolean())
co.get(rnd.nextInt(COMPANY_CNT)); // Get lock run test with open transaction.

assertEquals(pRes, cache.query(qry).getAll());
}
});
} catch (CacheException e) {
// Interruptions are expected here.
if (e.getCause() instanceof IgniteInterruptedCheckedException)
if (e.getCause() instanceof IgniteInterruptedCheckedException ||
e.getCause() instanceof InterruptedException ||
e.getCause() instanceof ClusterTopologyException ||
e.getCause() instanceof TransactionTimeoutException ||
e.getCause() instanceof TransactionException)
continue;

if (e.getCause() instanceof QueryCancelledException)
fail("Retry is expected");

if (!smallPageSize)
e.printStackTrace();
U.error(grid.log(), "On large page size must retry.", e);

assertTrue("On large page size must retry.", smallPageSize);

Expand Down Expand Up @@ -286,13 +306,13 @@ public void testRestarts() throws Exception {
continue;

if (!failedOnRemoteFetch) {
e.printStackTrace();
U.error(grid.log(), "Must fail inside of GridResultPage.fetchNextPage or subclass.", e);

fail("Must fail inside of GridResultPage.fetchNextPage or subclass.");
}
}
} else { // Replicated query.
IgniteCache<?, ?> cache = grid(g).cache("co");
IgniteCache<?, ?> cache = grid.cache("co");

assertEquals(rRes, cache.query(new SqlFieldsQuery(REPLICATED_QRY)).getAll());
}
Expand Down Expand Up @@ -358,7 +378,14 @@ public void testRestarts() throws Exception {

restartsDone.set(true);

fut2.get();
try {
fut2.get(20_000);
}
catch (IgniteFutureTimeoutCheckedException e) {
U.dumpThreads(log);

fail("Stopping restarts timeout.");
}

info("Restarts stopped.");

Expand All @@ -379,13 +406,27 @@ public void testRestarts() throws Exception {
stopAllGrids();
}

/**
* Run query closure.
*
* @param grid Grid.
* @param qryRunnable Query runnable.
*/
protected void runQuery(IgniteEx grid, Runnable qryRunnable) {
qryRunnable.run();
}

/**
*
*/
private static class Person implements Serializable {
/** */
@QuerySqlField(index = true)
int id;

/**
* @param id Person ID.
*/
Person(int id) {
this.id = id;
}
Expand All @@ -395,12 +436,18 @@ private static class Person implements Serializable {
*
*/
private static class Purchase implements Serializable {
/** */
@QuerySqlField(index = true)
int personId;

/** */
@QuerySqlField(index = true)
int productId;

/**
* @param personId Person ID.
* @param productId Product ID.
*/
Purchase(int personId, int productId) {
this.personId = personId;
this.productId = productId;
Expand All @@ -411,9 +458,13 @@ private static class Purchase implements Serializable {
*
*/
private static class Company implements Serializable {
/** */
@QuerySqlField(index = true)
int id;

/**
* @param id ID.
*/
Company(int id) {
this.id = id;
}
Expand All @@ -423,12 +474,18 @@ private static class Company implements Serializable {
*
*/
private static class Product implements Serializable {
/** */
@QuerySqlField(index = true)
int id;

/** */
@QuerySqlField(index = true)
int companyId;

/**
* @param id ID.
* @param companyId Company ID.
*/
Product(int id, int companyId) {
this.id = id;
this.companyId = companyId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.distributed.near;

import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.transactions.Transaction;

import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;

/**
* Test for distributed queries with node restarts inside transactions.
*/
public class IgniteCacheQueryNodeRestartTxSelfTest extends IgniteCacheQueryNodeRestartSelfTest2 {
/** {@inheritDoc} */
@Override protected void runQuery(IgniteEx grid, Runnable qryRunnable) {
try(Transaction tx = grid.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
qryRunnable.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartDistributedJoinSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest2;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartTxSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest;
import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentAtomicPartitionedSelfTest;
import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentAtomicReplicatedSelfTest;
Expand Down Expand Up @@ -85,6 +86,7 @@ public static TestSuite suite() throws Exception {
suite.addTestSuite(IgniteCacheQueryNodeFailTest.class);
suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class);
suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class);
suite.addTestSuite(IgniteCacheQueryNodeRestartTxSelfTest.class);
suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class);
suite.addTestSuite(IgniteCachePartitionedQueryMultiThreadedSelfTest.class);
suite.addTestSuite(CacheScanPartitionQueryFallbackSelfTest.class);
Expand Down

0 comments on commit caad1e9

Please sign in to comment.