Skip to content

Commit

Permalink
[FLINK-25421] Port JDBC Sink to new Unified Sink API (FLIP-143)
Browse files Browse the repository at this point in the history
  • Loading branch information
eskabetxe committed Mar 26, 2024
1 parent 95294ff commit 92e2b5f
Show file tree
Hide file tree
Showing 62 changed files with 3,687 additions and 72 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ jobs:
compile_and_test:
strategy:
matrix:
flink: [ 1.16-SNAPSHOT, 1.17-SNAPSHOT ]
jdk: [ '8, 11' ]
include:
- flink: 1.18-SNAPSHOT
jdk: '8, 11, 17'
- flink: 1.19-SNAPSHOT
jdk: '8, 11, 17, 21'

uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
Expand Down
16 changes: 4 additions & 12 deletions .github/workflows/weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,21 @@ jobs:
strategy:
matrix:
flink_branches: [{
flink: 1.16-SNAPSHOT,
branch: main
}, {
flink: 1.17-SNAPSHOT,
branch: main
}, {
flink: 1.18-SNAPSHOT,
jdk: '8, 11, 17',
branch: main
}, {
flink: 1.19-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
}, {
flink: 1.16.2,
branch: v3.1
}, {
flink: 1.17.1,
branch: v3.1
}, {
flink: 1.18.0,
jdk: '8, 11, 17',
branch: v3.1
}, {
flink: 1.19.0,
jdk: '8, 11, 17, 21',
branch: v3.1
}]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.flink.connector.jdbc;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.types.Row;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.flink.connector.jdbc;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.connector.jdbc.sink.JdbcSinkBuilder;
import org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction;
import org.apache.flink.connector.jdbc.xa.XaFacade;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
Expand Down Expand Up @@ -107,5 +108,9 @@ public static <T> SinkFunction<T> exactlyOnceSink(
exactlyOnceOptions);
}

public static <IN> JdbcSinkBuilder<IN> builder() {
return org.apache.flink.connector.jdbc.sink.JdbcSink.builder();
}

private JdbcSink() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
* limitations under the License.
*/

package org.apache.flink.connector.jdbc.internal.connection;
package org.apache.flink.connector.jdbc.datasource.connections;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;

/** JDBC connection provider. */
@Internal
public interface JdbcConnectionProvider {
@PublicEvolving
public interface JdbcConnectionProvider extends Serializable, AutoCloseable {
/**
* Get existing connection.
*
Expand Down Expand Up @@ -64,4 +65,8 @@ public interface JdbcConnectionProvider {
* @throws ClassNotFoundException driver class not found
*/
Connection reestablishConnection() throws SQLException, ClassNotFoundException;

default void close() throws Exception {
closeConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
* limitations under the License.
*/

package org.apache.flink.connector.jdbc.internal.connection;
package org.apache.flink.connector.jdbc.datasource.connections;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.util.Preconditions;

Expand All @@ -35,6 +36,7 @@

/** Simple JDBC connection provider. */
@NotThreadSafe
@Internal
public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Serializable {

private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
Expand Down Expand Up @@ -73,8 +75,7 @@ public boolean isConnectionValid() throws SQLException {
&& connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
}

private static Driver loadDriver(String driverName)
throws SQLException, ClassNotFoundException {
private Driver loadDriver(String driverName) throws SQLException, ClassNotFoundException {
Preconditions.checkNotNull(driverName);
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.jdbc.datasource.connections.xa;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.TransientXaException;
import org.apache.flink.util.function.ThrowingConsumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.sql.XADataSource;
import javax.transaction.xa.Xid;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.function.Supplier;

import static org.apache.flink.util.ExceptionUtils.rethrow;
import static org.apache.flink.util.Preconditions.checkState;

/**
* A "pooling" implementation of {@link XaConnectionProvider}. Some database implement XA such that
* one connection is limited to a single transaction. As a workaround, this implementation creates a
* new XA resource after each xa_start call is made (and associates it with the xid to commit
* later).
*/
@Internal
public class PoolingXaConnectionProvider implements XaConnectionProvider {
private static final long serialVersionUID = 1L;

/** A supplier of connection provider. */
public interface ConnectionProviderSupplier
extends Serializable, Supplier<XaConnectionProvider> {}

private static final Logger LOG = LoggerFactory.getLogger(PoolingXaConnectionProvider.class);
private final ConnectionProviderSupplier providerSupplier;
private transient XaConnectionProvider active;
private transient Map<Xid, XaConnectionProvider> mappedToXids;
private transient Deque<XaConnectionProvider> pooled;

public static PoolingXaConnectionProvider from(
Supplier<XADataSource> dataSourceSupplier, Integer timeoutSec) {
return from(() -> SimpleXaConnectionProvider.from(dataSourceSupplier, timeoutSec));
}

public static PoolingXaConnectionProvider from(ConnectionProviderSupplier facadeSupplier) {
return new PoolingXaConnectionProvider(facadeSupplier);
}

private PoolingXaConnectionProvider(ConnectionProviderSupplier facadeSupplier) {
this.providerSupplier = facadeSupplier;
}

@Override
public void open() throws Exception {
checkState(active == null);
pooled = new LinkedList<>();
mappedToXids = new HashMap<>();
}

@Override
public boolean isOpen() {
return active != null && active.isOpen();
}

@Override
public void start(Xid xid) throws Exception {
checkState(active == null);
if (pooled.isEmpty()) {
active = providerSupplier.get();
active.open();
} else {
active = pooled.poll();
}
active.start(xid);
mappedToXids.put(xid, active);
}

/**
* Must be called after {@link #start(Xid)} with the same {@link Xid}.
*
* @see XaConnectionProvider#endAndPrepare(Xid)
*/
@Override
public void endAndPrepare(Xid xid) throws Exception {
checkState(active == mappedToXids.get(xid));
try {
active.endAndPrepare(xid);
} finally {
active = null;
}
}

@Override
public void commit(Xid xid, boolean ignoreUnknown) throws TransientXaException {
runForXid(xid, facade -> facade.commit(xid, ignoreUnknown));
}

@Override
public void rollback(Xid xid) throws TransientXaException {
runForXid(xid, facade -> facade.rollback(xid));
}

@Override
public void failAndRollback(Xid xid) throws TransientXaException {
runForXid(xid, facade -> facade.failAndRollback(xid));
}

@Override
public Collection<Xid> recover() throws TransientXaException {
return peekPooled().recover();
}

@Override
public void close() throws Exception {
for (XaConnectionProvider facade : mappedToXids.values()) {
facade.close();
}
for (XaConnectionProvider facade : pooled) {
facade.close();
}
if (active != null && active.isOpen()) {
active.close();
}
}

@Nullable
@Override
public Connection getConnection() {
return active.getConnection();
}

@Override
public boolean isConnectionValid() throws SQLException {
return active.isConnectionValid();
}

@Override
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
return active.getOrEstablishConnection();
}

@Override
public void closeConnection() {
active.closeConnection();
}

@Override
public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
return active.reestablishConnection();
}

// WARN: action MUST leave the facade in IDLE state (i.e. not start/end/prepare any tx)
private void runForXid(
Xid xid, ThrowingConsumer<XaConnectionProvider, TransientXaException> action) {
XaConnectionProvider mapped = mappedToXids.remove(xid);
if (mapped == null) {
// a transaction can be not known during recovery
LOG.debug("No XA resource found associated with XID: {}", xid);
action.accept(peekPooled());
} else {
LOG.debug("Found mapped XA resource for XID: {} {}", xid, mapped);
try {
action.accept(mapped);
} finally {
pooled.offer(mapped);
}
}
}

// WARN: the returned facade MUST be left in IDLE state (i.e. not start/end/prepare any tx)
private XaConnectionProvider peekPooled() {
XaConnectionProvider xaFacade = pooled.peek();
if (xaFacade == null) {
xaFacade = providerSupplier.get();
try {
xaFacade.open();
} catch (Exception e) {
rethrow(e);
}
pooled.offer(xaFacade);
}
return xaFacade;
}
}

0 comments on commit 92e2b5f

Please sign in to comment.