diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml index 1a8954bd7bd8f..f9bb13ce1dff8 100644 --- a/flink-connectors/flink-connector-jdbc/pom.xml +++ b/flink-connectors/flink-connector-jdbc/pom.xml @@ -150,4 +150,17 @@ under the License. + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + false + + + + diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/FakeDBUtils.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/FakeDBUtils.java new file mode 100644 index 0000000000000..25c0eae63e95f --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/FakeDBUtils.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.fakedb; + +/** + * Utilities and constants for FakeDB. + */ +public class FakeDBUtils { + public static final String URL_PREFIX = "jdbc:fake:"; + + public static final String TEST_DB_URL = composeDBUrl("test"); + + public static final String DRIVER1_CLASS_NAME = "org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver1"; + public static final String DRIVER2_CLASS_NAME = "org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver2"; + + public static String composeDBUrl(String db) { + return URL_PREFIX + db; + } + + public static boolean acceptsUrl(String url) { + return url.startsWith(URL_PREFIX); + } +} diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection.java new file mode 100644 index 0000000000000..35a17535eb636 --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.fakedb.driver; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * A fake sql connection implementation which throws {@link SQLException} in most of its methods. + */ +public abstract class FakeConnection implements Connection { + private boolean closed = false; + + @Override + public Statement createStatement() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public String nativeSQL(String sql) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getAutoCommit() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void commit() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void rollback() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws SQLException { + closed = true; + } + + @Override + public boolean isClosed() throws SQLException { + return closed; + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReadOnly() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void setCatalog(String catalog) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public String getCatalog() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int getTransactionIsolation() throws SQLException { + return TRANSACTION_NONE; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException { + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Map> getTypeMap() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void setTypeMap(Map> map) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int getHoldability() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Clob createClob() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Blob createBlob() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public NClob createNClob() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return !isClosed(); + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + throw new UnsupportedOperationException(); + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + throw new UnsupportedOperationException(); + } + + @Override + public String getClientInfo(String name) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Properties getClientInfo() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void setSchema(String schema) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public String getSchema() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void abort(Executor executor) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int getNetworkTimeout() throws SQLException { + return 0; + } + + @Override + @SuppressWarnings("unchecked") + public T unwrap(Class iface) throws SQLException { + if (iface.isInstance(this)) { + return (T) this; + } + throw new SQLException(getClass() + " does not implement " + iface); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return iface.isInstance(this); + } +} diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection1.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection1.java new file mode 100644 index 0000000000000..56983d3440dca --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection1.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.fakedb.driver; + +import java.util.Properties; + +/** + * Sql connection created by {@link FakeDriver1#connect(String, Properties)}. + */ +public class FakeConnection1 extends FakeConnection { +} diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection2.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection2.java new file mode 100644 index 0000000000000..3f95a51fb1d54 --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeConnection2.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.fakedb.driver; + +import java.util.Properties; + +/** + * Sql connection created by {@link FakeDriver2#connect(String, Properties)}. + */ +public class FakeConnection2 extends FakeConnection { +} diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeDriver1.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeDriver1.java new file mode 100644 index 0000000000000..01c1bfb6f056f --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeDriver1.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.fakedb.driver; + +import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.Properties; +import java.util.logging.Logger; + +/** + * A {@link Driver} for FakeDB. + */ +public class FakeDriver1 implements Driver { + + static { + try { + DriverManager.registerDriver(new FakeDriver1()); + } catch (SQLException ex) { + throw new ExceptionInInitializerError(ex); + } + } + + @Override + public Connection connect(String url, Properties info) throws SQLException { + if (!acceptsURL(url)) { + return null; + } + return new FakeConnection1(); + } + + @Override + public boolean acceptsURL(String url) throws SQLException { + return FakeDBUtils.acceptsUrl(url); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { + return new DriverPropertyInfo[0]; + } + + @Override + public int getMajorVersion() { + return 0; + } + + @Override + public int getMinorVersion() { + return 0; + } + + @Override + public boolean jdbcCompliant() { + return false; + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + throw new SQLFeatureNotSupportedException(); + } +} diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeDriver2.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeDriver2.java new file mode 100644 index 0000000000000..4d749574f098d --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/fakedb/driver/FakeDriver2.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.fakedb.driver; + +import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.Properties; +import java.util.logging.Logger; + +/** + * Another {@link Driver} for FakeDB. + */ +public class FakeDriver2 implements Driver { + + static { + try { + DriverManager.registerDriver(new FakeDriver2()); + } catch (SQLException ex) { + throw new ExceptionInInitializerError(ex); + } + } + + @Override + public Connection connect(String url, Properties info) throws SQLException { + if (!acceptsURL(url)) { + return null; + } + return new FakeConnection2(); + } + + @Override + public boolean acceptsURL(String url) throws SQLException { + return FakeDBUtils.acceptsUrl(url); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { + return new DriverPropertyInfo[0]; + } + + @Override + public int getMajorVersion() { + return 0; + } + + @Override + public int getMinorVersion() { + return 0; + } + + @Override + public boolean jdbcCompliant() { + return false; + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + throw new SQLFeatureNotSupportedException(); + } +} diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingTest.java new file mode 100644 index 0000000000000..53212020f8985 --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.internal.connection; + +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils; +import org.apache.flink.core.testutils.CheckedThread; + +import org.junit.Test; + +import java.lang.reflect.Method; +import java.sql.Connection; +import java.util.concurrent.CountDownLatch; +import java.util.function.Function; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * This test deals with sql driver class loading issues, write it alone so it won't be + * interfered by other tests. + */ +public class SimpleJdbcConnectionProviderDriverClassConcurrentLoadingTest { + private static boolean isClassLoaded(ClassLoader classLoader, String className) throws Exception { + do { + Method m = ClassLoader.class.getDeclaredMethod("findLoadedClass", String.class); + m.setAccessible(true); + Object loadedClass = m.invoke(classLoader, className); + if (loadedClass != null) { + return true; + } + classLoader = classLoader.getParent(); + } while (classLoader != null); + return false; + } + + @Test(timeout = 5000) + public void testDriverClassConcurrentLoading() throws Exception { + ClassLoader classLoader = getClass().getClassLoader(); + + assertFalse(isClassLoaded(classLoader, FakeDBUtils.DRIVER1_CLASS_NAME)); + assertFalse(isClassLoaded(classLoader, FakeDBUtils.DRIVER2_CLASS_NAME)); + + JdbcConnectionOptions connectionOptions1 = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl(FakeDBUtils.TEST_DB_URL) + .withDriverName(FakeDBUtils.DRIVER1_CLASS_NAME) + .build(); + + JdbcConnectionOptions connectionOptions2 = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl(FakeDBUtils.TEST_DB_URL) + .withDriverName(FakeDBUtils.DRIVER2_CLASS_NAME) + .build(); + + CountDownLatch startLatch = new CountDownLatch(1); + + Function connectionThreadCreator = options -> { + CheckedThread thread = new CheckedThread() { + @Override + public void go() throws Exception { + startLatch.await(); + JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(options); + Connection connection = connectionProvider.getConnection(); + connection.close(); + } + }; + thread.setName("Loading " + options.getDriverName()); + thread.setDaemon(true); + return thread; + }; + + CheckedThread connectionThread1 = connectionThreadCreator.apply(connectionOptions1); + CheckedThread connectionThread2 = connectionThreadCreator.apply(connectionOptions2); + + connectionThread1.start(); + connectionThread2.start(); + + Thread.sleep(2); + startLatch.countDown(); + + connectionThread1.sync(); + connectionThread2.sync(); + + assertTrue(isClassLoaded(classLoader, FakeDBUtils.DRIVER1_CLASS_NAME)); + assertTrue(isClassLoaded(classLoader, FakeDBUtils.DRIVER2_CLASS_NAME)); + } +} diff --git a/flink-connectors/flink-connector-jdbc/src/test/resources/META-INF/services/java.sql.Driver b/flink-connectors/flink-connector-jdbc/src/test/resources/META-INF/services/java.sql.Driver new file mode 100644 index 0000000000000..46ec4f1f1feb8 --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/test/resources/META-INF/services/java.sql.Driver @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver1 +org.apache.flink.connector.jdbc.fakedb.driver.FakeDriver2