diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index 38a5238598066..84453e9272059 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -30,6 +30,7 @@ import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.runtime.client.JobInitializationException; +import org.apache.flink.runtime.execution.librarycache.ClassLoaderFactoryBuilder; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.ExceptionUtils; @@ -41,8 +42,10 @@ import java.net.URL; import java.net.URLClassLoader; +import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.ServiceLoader; import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -70,6 +73,23 @@ public static URLClassLoader buildUserCodeClassLoader( FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder); final boolean checkClassloaderLeak = configuration.getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER); + + ServiceLoader classLoaderService = + ServiceLoader.load(ClassLoaderFactoryBuilder.class); + Iterator factoryIt = classLoaderService.iterator(); + ClassLoaderFactoryBuilder factory = null; + while (factoryIt.hasNext()) { + factory = factoryIt.next(); + if (factory.isCompatible()) { + return factory.buildClientLoaderFactory( + resolveOrder, + alwaysParentFirstLoaderPatterns, + NOOP_EXCEPTION_HANDLER, + checkClassloaderLeak) + .createClassLoader(urls); + } + } + return FlinkUserCodeClassLoaders.create( resolveOrder, urls, diff --git a/flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java b/flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java index 7c38d1e5e6ac4..5897862da856b 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java @@ -20,18 +20,26 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.TestingClientClassLoaderFactoryBuilder.TestingURLClassLoader; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.client.JobInitializationException; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.util.FlinkUserCodeClassLoader; import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; +import java.net.URLClassLoader; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; +import static org.junit.Assert.assertTrue; + /** Test for the ClientUtils. */ public class ClientUtilsTest extends TestLogger { @@ -128,4 +136,47 @@ public void testWaitUntilJobInitializationFinished_regular() throws Exception { }, ClassLoader.getSystemClassLoader()); } + + @Test + public void testClassLoader_by_customized_classLoaderFactory() throws Exception { + System.getProperties() + .setProperty( + TestingClientClassLoaderFactoryBuilder + .TESTING_CLASSLOADER_FACTORY_BUILDER_ENABLE, + Boolean.TRUE.toString()); + URLClassLoader classLoader = null; + try { + Configuration configuration = new Configuration(); + classLoader = + ClientUtils.buildUserCodeClassLoader( + Collections.emptyList(), + Collections.emptyList(), + this.getClass().getClassLoader(), + configuration); + assertTrue( + "The impl class must be " + TestingURLClassLoader.class.getName(), + classLoader instanceof TestingURLClassLoader); + } finally { + System.clearProperty( + TestingClientClassLoaderFactoryBuilder + .TESTING_CLASSLOADER_FACTORY_BUILDER_ENABLE); + } + } + + @Test + public void testClassLoader_without_customized_classLoaderFactory() { + URLClassLoader classLoader = null; + + Configuration configuration = new Configuration(); + configuration.setBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); + classLoader = + ClientUtils.buildUserCodeClassLoader( + Collections.emptyList(), + Collections.emptyList(), + this.getClass().getClassLoader(), + configuration); + assertTrue( + "The impl class must be " + FlinkUserCodeClassLoader.class.getName(), + classLoader instanceof FlinkUserCodeClassLoader); + } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/TestingClientClassLoaderFactoryBuilder.java b/flink-clients/src/test/java/org/apache/flink/client/TestingClientClassLoaderFactoryBuilder.java new file mode 100644 index 0000000000000..afc3160f39b4b --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/TestingClientClassLoaderFactoryBuilder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.ClassLoaderFactory; +import org.apache.flink.runtime.execution.librarycache.ClassLoaderFactoryBuilder; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder; + +import javax.annotation.Nullable; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.function.Consumer; + +/** Testing implementation of {@link ClassLoaderFactoryBuilder}. */ +public class TestingClientClassLoaderFactoryBuilder implements ClassLoaderFactoryBuilder { + public static final String TESTING_CLASSLOADER_FACTORY_BUILDER_ENABLE = + "testing_classloader_factory_builder_enable"; + + @Override + public boolean isCompatible() { + return Boolean.parseBoolean( + System.getProperties().getProperty(TESTING_CLASSLOADER_FACTORY_BUILDER_ENABLE)); + } + + @Override + public ClassLoaderFactory buildServerLoaderFactory( + ResolveOrder classLoaderResolveOrder, + String[] alwaysParentFirstPatterns, + @Nullable Consumer exceptionHander, + boolean checkClassLoaderLeak) { + throw new UnsupportedOperationException(); + } + + @Override + public ClassLoaderFactory buildClientLoaderFactory( + ResolveOrder classLoaderResolveOrder, + String[] alwaysParentFirstPatterns, + @Nullable Consumer exceptionHander, + boolean checkClassLoaderLeak) { + return new TestingClassLoaderFactory(); + } + + private static class TestingClassLoaderFactory implements ClassLoaderFactory { + @Override + public URLClassLoader createClassLoader(URL[] libraryURLs) { + return new TestingURLClassLoader( + libraryURLs, Thread.currentThread().getContextClassLoader()); + } + } + + public static class TestingURLClassLoader extends URLClassLoader { + + public TestingURLClassLoader(URL[] urls, ClassLoader parent) { + super(urls, parent); + } + } +} diff --git a/flink-clients/src/test/resources/META-INF/services/org.apache.flink.runtime.execution.librarycache.ClassLoaderFactoryBuilder b/flink-clients/src/test/resources/META-INF/services/org.apache.flink.runtime.execution.librarycache.ClassLoaderFactoryBuilder new file mode 100644 index 0000000000000..021c7d4a487fa --- /dev/null +++ b/flink-clients/src/test/resources/META-INF/services/org.apache.flink.runtime.execution.librarycache.ClassLoaderFactoryBuilder @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.client.TestingClientClassLoaderFactoryBuilder diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index 460f83e449bae..b0140b8562dd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -41,7 +41,9 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; +import java.util.ServiceLoader; import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -125,30 +127,31 @@ public void shutdown() { } // -------------------------------------------------------------------------------------------- - + /** Customize classLoader factory. */ @FunctionalInterface public interface ClassLoaderFactory { URLClassLoader createClassLoader(URL[] libraryURLs); } - private static final class DefaultClassLoaderFactory implements ClassLoaderFactory { + /** Default ClassLoader factory. */ + public static class DefaultClassLoaderFactory implements ClassLoaderFactory { /** The resolve order to use when creating a {@link ClassLoader}. */ - private final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder; + protected final FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder; /** * List of patterns for classes that should always be resolved from the parent ClassLoader, * if possible. */ - private final String[] alwaysParentFirstPatterns; + protected final String[] alwaysParentFirstPatterns; /** Class loading exception handler. */ - private final Consumer classLoadingExceptionHandler; + protected final Consumer classLoadingExceptionHandler; /** Test if classloader is used outside of job. */ - private final boolean checkClassLoaderLeak; + protected final boolean checkClassLoaderLeak; - private DefaultClassLoaderFactory( + protected DefaultClassLoaderFactory( FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder, String[] alwaysParentFirstPatterns, Consumer classLoadingExceptionHandler, @@ -164,11 +167,15 @@ public URLClassLoader createClassLoader(URL[] libraryURLs) { return FlinkUserCodeClassLoaders.create( classLoaderResolveOrder, libraryURLs, - FlinkUserCodeClassLoaders.class.getClassLoader(), + getParentClassLoader(), alwaysParentFirstPatterns, classLoadingExceptionHandler, checkClassLoaderLeak); } + + protected ClassLoader getParentClassLoader() { + return FlinkUserCodeClassLoaders.class.getClassLoader(); + } } public static ClassLoaderFactory defaultClassLoaderFactory( @@ -176,10 +183,27 @@ public static ClassLoaderFactory defaultClassLoaderFactory( String[] alwaysParentFirstPatterns, @Nullable FatalErrorHandler fatalErrorHandlerJvmMetaspaceOomError, boolean checkClassLoaderLeak) { + Consumer exceptionHandler = + createClassLoadingExceptionHandler(fatalErrorHandlerJvmMetaspaceOomError); + ServiceLoader classLoaderService = + ServiceLoader.load(ClassLoaderFactoryBuilder.class); + Iterator factoryIt = classLoaderService.iterator(); + ClassLoaderFactoryBuilder factory = null; + while (factoryIt.hasNext()) { + factory = factoryIt.next(); + if (factory.isCompatible()) { + return factory.buildServerLoaderFactory( + classLoaderResolveOrder, + alwaysParentFirstPatterns, + exceptionHandler, + checkClassLoaderLeak); + } + } + return new DefaultClassLoaderFactory( classLoaderResolveOrder, alwaysParentFirstPatterns, - createClassLoadingExceptionHandler(fatalErrorHandlerJvmMetaspaceOomError), + exceptionHandler, checkClassLoaderLeak); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/ClassLoaderFactoryBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/ClassLoaderFactoryBuilder.java new file mode 100644 index 0000000000000..bc2e8fef0a6e6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/ClassLoaderFactoryBuilder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.execution.librarycache; + +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.ClassLoaderFactory; + +import javax.annotation.Nullable; + +import java.util.function.Consumer; + +/** + * Provides facilities to customize {@link ClassLoaderFactory} by user ,it is instantinate by + * ServiceLoader. + * + * @see java.util.ServiceLoader + */ +public interface ClassLoaderFactoryBuilder { + + public default boolean isCompatible() { + return false; + } + + public default ClassLoaderFactory buildServerLoaderFactory( + FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder, + String[] alwaysParentFirstPatterns, + @Nullable Consumer exceptionHander, + boolean checkClassLoaderLeak) { + throw new UnsupportedOperationException(); + } + + public default ClassLoaderFactory buildClientLoaderFactory( + FlinkUserCodeClassLoaders.ResolveOrder classLoaderResolveOrder, + String[] alwaysParentFirstPatterns, + @Nullable Consumer exceptionHander, + boolean checkClassLoaderLeak) { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java index 8d9286be1787a..790edc8e90968 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java @@ -84,7 +84,7 @@ public static URLClassLoader create( } } - private static URLClassLoader wrapWithSafetyNet( + public static URLClassLoader wrapWithSafetyNet( FlinkUserCodeClassLoader classLoader, boolean check) { return check ? new SafetyNetWrapperClassLoader(classLoader, classLoader.getParent()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 00ce52a1ee8b3..c1876505af8fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -27,6 +27,9 @@ import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.blob.PermanentBlobService; import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.ClassLoaderFactory; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.DefaultClassLoaderFactory; +import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderFactoryBuilder.TestingClassLoaderFactory; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.TestLogger; import org.apache.flink.util.UserCodeClassLoader; @@ -578,6 +581,43 @@ public void releaseUserCodeClassLoader_willRegisterOnce() releaseHookLatch.await(); } + @Test + public void testDefaultClassLoaderFactory_by_customized_classLoaderFactory() { + System.getProperties() + .setProperty( + TestingClassLoaderFactoryBuilder.TESTING_CLASSLOADER_FACTORY_BUILDER_ENABLE, + Boolean.TRUE.toString()); + ClassLoaderFactory classLoaderFactory = null; + try { + classLoaderFactory = + BlobLibraryCacheManager.defaultClassLoaderFactory( + FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, + new String[0], + null, + true); + assertTrue( + "The impl class must be " + TestingClassLoaderFactory.class.getName(), + classLoaderFactory instanceof TestingClassLoaderFactory); + } finally { + System.clearProperty( + TestingClassLoaderFactoryBuilder.TESTING_CLASSLOADER_FACTORY_BUILDER_ENABLE); + } + } + + @Test + public void testDefaultClassLoaderFactory_without_customized_classLoaderFactory() { + ClassLoaderFactory classLoaderFactory = null; + classLoaderFactory = + BlobLibraryCacheManager.defaultClassLoaderFactory( + FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, + new String[0], + null, + true); + assertTrue( + "The impl class must be " + DefaultClassLoaderFactory.class.getName(), + classLoaderFactory instanceof DefaultClassLoaderFactory); + } + private BlobLibraryCacheManager createSimpleBlobLibraryCacheManager() throws IOException { return new TestingBlobLibraryCacheManagerBuilder().build(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/TestingClassLoaderFactoryBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/TestingClassLoaderFactoryBuilder.java new file mode 100644 index 0000000000000..c7629dedac86b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/TestingClassLoaderFactoryBuilder.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.execution.librarycache; + +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.ClassLoaderFactory; +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder; + +import javax.annotation.Nullable; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.function.Consumer; + +/** Testing implementation of {@link ClassLoaderFactoryBuilder}. */ +public class TestingClassLoaderFactoryBuilder implements ClassLoaderFactoryBuilder { + public static final String TESTING_CLASSLOADER_FACTORY_BUILDER_ENABLE = + "testing_classloader_factory_builder_enable"; + + @Override + public boolean isCompatible() { + return Boolean.parseBoolean( + System.getProperties().getProperty(TESTING_CLASSLOADER_FACTORY_BUILDER_ENABLE)); + } + + @Override + public ClassLoaderFactory buildServerLoaderFactory( + ResolveOrder classLoaderResolveOrder, + String[] alwaysParentFirstPatterns, + @Nullable Consumer exceptionHander, + boolean checkClassLoaderLeak) { + return new TestingClassLoaderFactory(); + } + + @Override + public ClassLoaderFactory buildClientLoaderFactory( + ResolveOrder classLoaderResolveOrder, + String[] alwaysParentFirstPatterns, + @Nullable Consumer exceptionHander, + boolean checkClassLoaderLeak) { + throw new UnsupportedOperationException(); + } + + public static class TestingClassLoaderFactory implements ClassLoaderFactory { + @Override + public URLClassLoader createClassLoader(URL[] libraryURLs) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.execution.librarycache.ClassLoaderFactoryBuilder b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.execution.librarycache.ClassLoaderFactoryBuilder new file mode 100644 index 0000000000000..f4a9338b7b7d3 --- /dev/null +++ b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.execution.librarycache.ClassLoaderFactoryBuilder @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.runtime.execution.librarycache.TestingClassLoaderFactoryBuilder