Skip to content
Permalink
Browse files
IGNITE-16502 fixed deadlock caused by query plan cache (#652)
  • Loading branch information
korlov42 committed Feb 14, 2022
1 parent 0cf58f5 commit a6aa54b0159bd31f344b01fa33249cf21f6628f0
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 14 deletions.
@@ -24,6 +24,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.ignite.lang.IgniteException;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
@@ -73,6 +74,7 @@ public void testUnicodeStrings() {
}

/** Tests NOT NULL and DEFAULT column constraints. */
@Disabled("https://issues.apache.org/jira/browse/IGNITE-16292")
@Test
public void testCheckDefaultsAndNullables() {
sql("CREATE TABLE tbl(c1 int primary key, c2 int NOT NULL, c3 int NOT NULL DEFAULT 100)");
@@ -38,7 +38,6 @@
/**
* Group of tests that still has not been sorted out. It’s better to avoid extending this class with new tests.
*/
@Disabled("https://issues.apache.org/jira/browse/IGNITE-16502")
public class ItMixedQueriesTest extends AbstractBasicIntegrationTest {
/**
* Before all.
@@ -63,12 +63,12 @@ public class SqlQueryProcessor implements QueryProcessor {
/** Busy lock for stop synchronisation. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();

/** Keeps queries plans to avoid expensive planning of the same queries. */
private final QueryPlanCache planCache = new QueryPlanCacheImpl(PLAN_CACHE_SIZE);

/** Event listeners to close. */
private final List<Pair<TableEvent, EventListener<TableEventParameters>>> evtLsnrs = new ArrayList<>();

/** Keeps queries plans to avoid expensive planning of the same queries. */
private volatile QueryPlanCache planCache;

private volatile ExecutionService executionSrvc;

private volatile MessageService msgSrvc;
@@ -88,6 +88,7 @@ public SqlQueryProcessor(
/** {@inheritDoc} */
@Override
public void start() {
planCache = new QueryPlanCacheImpl(clusterSrvc.topologyService().localMember().name(), PLAN_CACHE_SIZE);
taskExecutor = new QueryTaskExecutorImpl(clusterSrvc.localConfiguration().getName());

msgSrvc = new MessageServiceImpl(
@@ -18,44 +18,69 @@
package org.apache.ignite.internal.sql.engine.prepare;

import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;

/**
* Implementation of {@link QueryPlanCache} that simply wraps a {@link Caffeine} cache.
*/
public class QueryPlanCacheImpl implements QueryPlanCache {
private final ConcurrentMap<CacheKey, QueryPlan> cache;
private static final long THREAD_TIMEOUT_MS = 60_000;

private final ConcurrentMap<CacheKey, CompletableFuture<QueryPlan>> cache;

private final ThreadPoolExecutor planningPool;

/**
* Creates a plan cache of provided size.
*
* @param cacheSize Desired cache size.
*/
public QueryPlanCacheImpl(int cacheSize) {
public QueryPlanCacheImpl(String nodeName, int cacheSize) {
planningPool = new ThreadPoolExecutor(
4,
4,
THREAD_TIMEOUT_MS,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, "sqlPlan"))
);

planningPool.allowCoreThreadTimeOut(true);

cache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.<CacheKey, QueryPlan>build()
.<CacheKey, CompletableFuture<QueryPlan>>build()
.asMap();
}

/** {@inheritDoc} */
@Override
public QueryPlan queryPlan(CacheKey key, Supplier<QueryPlan> planSupplier) {
Map<CacheKey, QueryPlan> cache = this.cache;
QueryPlan plan = cache.computeIfAbsent(key, k -> planSupplier.get());
CompletableFuture<QueryPlan> planFut = cache.computeIfAbsent(key, k -> CompletableFuture.supplyAsync(planSupplier, planningPool));

return plan.copy();
return join(planFut).copy();
}

/** {@inheritDoc} */
@Override
public QueryPlan queryPlan(CacheKey key) {
Map<CacheKey, QueryPlan> cache = this.cache;
QueryPlan plan = cache.get(key);
CompletableFuture<QueryPlan> planFut = cache.get(key);

return plan != null ? plan.copy() : null;
if (planFut == null) {
return null;
}

return join(planFut).copy();
}

/** {@inheritDoc} */
@@ -74,5 +99,31 @@ public void start() {
@Override
public void stop() {
clear();

planningPool.shutdownNow();
}

/**
* Waits for the future to complete and returns the result. If an exception is thrown, converts future-related exceptions
* to internal exceptions.
*
* @param future A future to wait.
* @param <T> Type of the result of the future.
* @return The result of the future.
*/
private <T> T join(CompletableFuture<T> future) {
try {
return future.join();
} catch (CancellationException | CompletionException ex) {
if (ex.getCause() instanceof IgniteInternalException) {
throw (IgniteInternalException) ex.getCause();
}

if (ex.getCause() instanceof IgniteException) {
throw (IgniteException) ex.getCause();
}

throw new IgniteInternalException(ex);
}
}
}
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.sql.engine.prepare;

import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

/**
* Tests to verify implementation of the {@link QueryPlanCache}.
*/
public class QueryPlanCacheSelfTest {
/**
* Test ensures that plan supplier (even the blocking one) doesn't blocks clearing of the cache.
*
* @throws Exception If something went wrong.
*/
@Test
public void test() throws Exception {
final QueryPlan plan = new TestPlan();

QueryPlanCacheImpl cache = new QueryPlanCacheImpl("TestNode", 32);
CountDownLatch latch = new CountDownLatch(1);

cache.queryPlan(new CacheKey(String.valueOf(0), ""), () -> plan);

for (char c = 1; c < 64; c++) {
char c0 = c;
runAsync(() -> cache.queryPlan(new CacheKey(String.valueOf(c0), ""), () -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return plan;
}));
}

try {
CompletableFuture<?> clearFut = runAsync(cache::clear);

clearFut.get(5, TimeUnit.SECONDS);
} finally {
latch.countDown();
}
}

private static class TestPlan implements QueryPlan {
@Override
public Type type() {
return Type.QUERY;
}

@Override
public QueryPlan copy() {
return this;
}
}
}

0 comments on commit a6aa54b

Please sign in to comment.