Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31092][sql-gateway][table-common] Fix Hive ITCase fail with OutOfMemoryError #22101

Merged
merged 2 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.gateway.api.utils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -47,6 +48,7 @@ public static ThreadPoolExecutor newThreadPool(
poolQueueSize,
keepAliveMs,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>());
new SynchronousQueue<>(),
new ExecutorThreadFactory(threadPoolName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
Expand All @@ -34,8 +35,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
Expand Down Expand Up @@ -366,7 +370,6 @@ private void updateState(OperationStatus toStatus) {
String.format(
"Failed to convert the Operation Status from %s to %s for %s.",
currentStatus, toStatus, operationHandle);
LOG.error(message);
throw new SqlGatewayException(message);
}
} while (!status.compareAndSet(currentStatus, toStatus));
Expand All @@ -384,6 +387,7 @@ private void updateState(OperationStatus toStatus) {
private void closeResources() {
if (invocation != null && !invocation.isDone()) {
invocation.cancel(true);
stopExecutionByForce(invocation);
LOG.debug(String.format("Cancel the operation %s.", operationHandle));
}

Expand All @@ -400,6 +404,47 @@ private void processThrowable(Throwable t) {
// when status is error.
updateState(OperationStatus.ERROR);
}

private void stopExecutionByForce(FutureTask<?> invocation) {
// thread is cleaned async, waiting for a while
Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1));
while (deadline.hasTimeLeft()) {
Optional<Thread> threadOptional = getThreadInFuture(invocation);
if (!threadOptional.isPresent()) {
// thread has been cleaned up
return;
}
}
Optional<Thread> threadOptional = getThreadInFuture(invocation);
if (threadOptional.isPresent()) {
// we have to use Thread.stop() here, because this can
// guarantee thread to be stopped, even there is some
// potential consistent problem, we are fine with it.
Thread thread = threadOptional.get();
LOG.info(
"\"Future.cancel(true)\" can't cleanup current thread {}, using \"Thread.stop()\" instead.",
thread.getName());
try {
thread.stop();
} catch (Throwable e) {
// catch all errors to project the sqlserver
LOG.error("Failed to stop thread: " + thread.getName(), e);
}
}
}

private Optional<Thread> getThreadInFuture(FutureTask<?> invocation) {
try {
Class<?> k = FutureTask.class;
Field runnerField = k.getDeclaredField("runner");
runnerField.setAccessible(true);
Thread t = (Thread) runnerField.get(invocation);
return Optional.of(t);
} catch (Throwable e) {
// can't get thread
return Optional.empty();
}
}
}

// -------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.gateway.service.operation;

import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
Expand All @@ -38,10 +39,12 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
import static org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
Expand Down Expand Up @@ -125,6 +128,32 @@ public void testCancelOperation() throws Exception {
.isEqualTo(OperationStatus.CANCELED);
}

@Test
public void testCancelOperationByForce() throws Exception {
AtomicReference<Throwable> exception = new AtomicReference<>(null);
OperationHandle operationHandle =
operationManager.submitOperation(
() -> {
try {
// mock cpu busy task that doesn't interrupt system call
while (true) {}
} catch (Throwable t) {
exception.set(t);
throw t;
}
});

threadFactory.newThread(() -> operationManager.cancelOperation(operationHandle)).start();
operationManager.awaitOperationTermination(operationHandle);

assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
.isEqualTo(OperationStatus.CANCELED);
CommonTestUtils.waitUtil(
() -> exception.get() != null,
Duration.ofSeconds(10),
"Failed to kill the task with infinite loop.");
}

@Test
public void testCloseOperation() throws Exception {
CountDownLatch endRunningLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -801,27 +802,34 @@ static <T extends DynamicTableFactory> T discoverManagedTableFactory(
}

static List<Factory> discoverFactories(ClassLoader classLoader) {
final List<Factory> result = new LinkedList<>();
ServiceLoaderUtil.load(Factory.class, classLoader)
.forEach(
loadResult -> {
if (loadResult.hasFailed()) {
if (loadResult.getError() instanceof NoClassDefFoundError) {
LOG.debug(
"NoClassDefFoundError when loading a "
+ Factory.class
+ ". This is expected when trying to load a format dependency but no flink-connector-files is loaded.",
loadResult.getError());
// After logging, we just ignore this failure
return;
}
throw new TableException(
"Unexpected error when trying to load service provider for factories.",
loadResult.getError());
}
result.add(loadResult.getService());
});
return result;
final Iterator<Factory> serviceLoaderIterator =
ServiceLoader.load(Factory.class, classLoader).iterator();

final List<Factory> loadResults = new ArrayList<>();
while (true) {
try {
// error handling should also be applied to the hasNext() call because service
// loading might cause problems here as well
if (!serviceLoaderIterator.hasNext()) {
break;
}

loadResults.add(serviceLoaderIterator.next());
} catch (Throwable t) {
if (t instanceof NoClassDefFoundError) {
LOG.debug(
"NoClassDefFoundError when loading a "
+ Factory.class.getCanonicalName()
+ ". This is expected when trying to load a format dependency but no flink-connector-files is loaded.",
t);
} else {
throw new TableException(
"Unexpected error when trying to load service provider.", t);
}
}
}

return loadResults;
}

private static String stringifyOption(String key, String value) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CommonCatalogOptions;
Expand All @@ -33,12 +34,15 @@
import org.apache.flink.table.factories.TestFormatFactory.EncodingFormatMock;
import org.apache.flink.table.factories.utils.FactoryMocks;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.MutableURLClassLoader;

import org.assertj.core.api.AbstractThrowableAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -668,6 +672,22 @@ void testDiscoverFactoryBadClass(@TempDir Path tempDir) throws IOException {
.contains(serializationSchemaImplementationName);
}

@Test
void testDiscoverFactoryFromClosedClassLoader() throws Exception {
MutableURLClassLoader classLoader =
FlinkUserCodeClassLoaders.create(
new URL[0], FactoryUtilTest.class.getClassLoader(), new Configuration());
classLoader.close();
assertThatThrownBy(() -> FactoryUtil.discoverFactory(classLoader, Factory.class, "test"))
.satisfies(
FlinkAssertions.anyCauseMatches(
IllegalStateException.class,
"Trying to access closed classloader. Please check if you store classloaders directly "
+ "or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third "
+ "party library and cannot be fixed immediately, you can disable this check with the "
+ "configuration 'classloader.check-leaked-classloader'"));
}

// --------------------------------------------------------------------------------------------
// Helper methods
// --------------------------------------------------------------------------------------------
Expand Down