diff --git a/inlong-sort/sort-connectors/base/pom.xml b/inlong-sort/sort-connectors/base/pom.xml
index 8e4d2701efc..675190c511c 100644
--- a/inlong-sort/sort-connectors/base/pom.xml
+++ b/inlong-sort/sort-connectors/base/pom.xml
@@ -42,6 +42,11 @@
${project.version}
provided
+
+
+ com.amazonaws
+ aws-java-sdk-s3
+
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
index b6725ddd82b..07784b4437a 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
@@ -17,12 +17,13 @@
package org.apache.inlong.sort.base.dirty.sink;
-import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.Factory;
/**
* Dirty sink factory class, it is used to create dirty sink
*/
-public interface DirtySinkFactory extends DynamicTableFactory {
+public interface DirtySinkFactory extends Factory {
/**
* Create dirty sink
@@ -31,6 +32,6 @@ public interface DirtySinkFactory extends DynamicTableFactory {
* @param The data mode that is handled by the dirty sink
* @return A dirty sink
*/
- DirtySink createDirtySink(DynamicTableFactory.Context context);
+ DirtySink createDirtySink(Context context);
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
index bf5a4f13509..a57c981fee2 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
@@ -46,7 +46,7 @@ public class LogDirtySink implements DirtySink {
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(LogDirtySink.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(LogDirtySink.class);
private final RowData.FieldGetter[] fieldGetters;
private final String format;
@@ -85,7 +85,7 @@ public void invoke(DirtyData dirtyData) throws Exception {
// Only support csv format when the row is not a 'RowData' and 'JsonNode'
value = FormatUtils.csvFormat(data, labelMap, fieldDelimiter);
}
- LOG.info("[{}] {}", dirtyData.getLogTag(), value);
+ LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
}
private String format(RowData data, Map labels) throws JsonProcessingException {
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
index 93a12f584a8..c3720a93d91 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
@@ -18,12 +18,14 @@
package org.apache.inlong.sort.base.dirty.sink.log;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
import java.util.HashSet;
import java.util.Set;
+import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
@@ -36,10 +38,9 @@ public class LogDirtySinkFactory implements DirtySinkFactory {
@Override
public DirtySink createDirtySink(Context context) {
- final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- FactoryUtil.validateFactoryOptions(this, helper.getOptions());
- String format = helper.getOptions().get(DIRTY_SIDE_OUTPUT_FORMAT);
- String fieldDelimiter = helper.getOptions().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+ FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
+ String format = context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FORMAT);
+ String fieldDelimiter = context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
return new LogDirtySink<>(format, fieldDelimiter,
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
}
@@ -59,6 +60,7 @@ public Set> optionalOptions() {
final Set> options = new HashSet<>();
options.add(DIRTY_SIDE_OUTPUT_FORMAT);
options.add(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+ options.add(DIRTY_IDENTIFIER);
return options;
}
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
new file mode 100644
index 00000000000..3cb20c7b871
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * S3 dirty sink that is used to sink dirty data to s3
+ *
+ * @param
+ */
+public class S3DirtySink implements DirtySink {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(S3DirtySink.class);
+
+ private final Map> batchMap = new HashMap<>();
+ private final S3Options s3Options;
+ private final AtomicLong readInNum = new AtomicLong(0);
+ private final AtomicLong writeOutNum = new AtomicLong(0);
+ private final AtomicLong errorNum = new AtomicLong(0);
+ private final DataType physicalRowDataType;
+ private final RowData.FieldGetter[] fieldGetters;
+ private RowDataToJsonConverter converter;
+ private long batchBytes = 0L;
+ private int size;
+ private transient volatile boolean closed = false;
+ private transient volatile boolean flushing = false;
+ private transient ScheduledExecutorService scheduler;
+ private transient ScheduledFuture> scheduledFuture;
+ private transient S3Helper s3Helper;
+
+ public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) {
+ this.s3Options = s3Options;
+ this.physicalRowDataType = physicalRowDataType;
+ final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
+ .stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
+ this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+ for (int i = 0; i < logicalTypes.length; i++) {
+ fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+ }
+ }
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ converter = new RowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, null)
+ .createConverter(physicalRowDataType.getLogicalType());
+ AmazonS3 s3Client;
+ if (s3Options.getAccessKeyId() != null && s3Options.getSecretKeyId() != null) {
+ BasicAWSCredentials awsCreds =
+ new BasicAWSCredentials(s3Options.getAccessKeyId(), s3Options.getSecretKeyId());
+ s3Client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+ .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
+ s3Options.getEndpoint(),
+ s3Options.getRegion()))
+ .build();
+ } else {
+ s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(s3Options.getEndpoint(), s3Options.getRegion())).build();
+ }
+ s3Helper = new S3Helper(s3Client, s3Options);
+ this.scheduler = new ScheduledThreadPoolExecutor(1,
+ new ExecutorThreadFactory("s3-dirty-sink"));
+ this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+ if (!closed && !flushing) {
+ flush();
+ }
+ }, s3Options.getBatchIntervalMs(), s3Options.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public synchronized void invoke(DirtyData dirtyData) throws Exception {
+ try {
+ addBatch(dirtyData);
+ } catch (Exception e) {
+ if (!s3Options.ignoreSideOutputErrors()) {
+ throw new RuntimeException(String.format("Add batch to identifier:%s failed, the dirty data: %s.",
+ dirtyData.getIdentifier(), dirtyData.toString()), e);
+ }
+ LOGGER.warn("Add batch to identifier:{} failed "
+ + "and the dirty data will be throw away in the future"
+ + " because the option 'dirty.side-output.ignore-errors' is 'true'", dirtyData.getIdentifier());
+ }
+ if (valid() && !flushing) {
+ flush();
+ }
+ }
+
+ private boolean valid() {
+ return (s3Options.getBatchSize() > 0 && size >= s3Options.getBatchSize())
+ || batchBytes >= s3Options.getMaxBatchBytes();
+ }
+
+ private void addBatch(DirtyData dirtyData) throws IOException {
+ readInNum.incrementAndGet();
+ String value;
+ Map labelMap = LabelUtils.parseLabels(dirtyData.getLabels());
+ T data = dirtyData.getData();
+ if (data instanceof RowData) {
+ value = format((RowData) data, labelMap);
+ } else if (data instanceof JsonNode) {
+ value = format((JsonNode) data, labelMap);
+ } else {
+ // Only support csv format when the row is not a 'RowData' and 'JsonNode'
+ value = FormatUtils.csvFormat(data, labelMap, s3Options.getFieldDelimiter());
+ }
+ if (s3Options.enableDirtyLog()) {
+ LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
+ }
+ batchBytes += value.getBytes(UTF_8).length;
+ size++;
+ batchMap.computeIfAbsent(dirtyData.getIdentifier(), k -> new ArrayList<>()).add(value);
+ }
+
+ private String format(RowData data, Map labels) throws JsonProcessingException {
+ String value;
+ switch (s3Options.getFormat()) {
+ case "csv":
+ value = FormatUtils.csvFormat(data, fieldGetters, labels, s3Options.getFieldDelimiter());
+ break;
+ case "json":
+ value = FormatUtils.jsonFormat(data, converter, labels);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported format for: %s", s3Options.getFormat()));
+ }
+ return value;
+ }
+
+ private String format(JsonNode data, Map labels) throws JsonProcessingException {
+ String value;
+ switch (s3Options.getFormat()) {
+ case "csv":
+ value = FormatUtils.csvFormat(data, labels, s3Options.getFieldDelimiter());
+ break;
+ case "json":
+ value = FormatUtils.jsonFormat(data, labels);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported format for: %s", s3Options.getFormat()));
+ }
+ return value;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (!closed) {
+ closed = true;
+ if (this.scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+ }
+ try {
+ flush();
+ } catch (Exception e) {
+ LOGGER.warn("Writing records to s3 failed.", e);
+ throw new RuntimeException("Writing records to s3 failed.", e);
+ }
+ }
+ }
+
+ /**
+ * Flush data to s3
+ */
+ public synchronized void flush() {
+ flushing = true;
+ if (!hasRecords()) {
+ flushing = false;
+ return;
+ }
+ for (Entry> kvs : batchMap.entrySet()) {
+ flushSingleIdentifier(kvs.getKey(), kvs.getValue());
+ }
+ batchMap.clear();
+ batchBytes = 0;
+ size = 0;
+ flushing = false;
+ LOGGER.info("S3 dirty sink statistics: readInNum: {}, writeOutNum: {}, errorNum: {}",
+ readInNum.get(), writeOutNum.get(), errorNum.get());
+ }
+
+ /**
+ * Flush data of single identifier to s3
+ *
+ * @param identifier The identifier of dirty data
+ * @param values The values of the identifier
+ */
+ private void flushSingleIdentifier(String identifier, List values) {
+ if (values == null || values.isEmpty()) {
+ return;
+ }
+ String content = null;
+ try {
+ content = StringUtils.join(values, s3Options.getLineDelimiter());
+ s3Helper.upload(identifier, content);
+ LOGGER.info("Write {} records to s3 of identifier: {}", values.size(), identifier);
+ writeOutNum.addAndGet(values.size());
+ // Clean the data that has been loaded.
+ values.clear();
+ } catch (Exception e) {
+ errorNum.addAndGet(values.size());
+ if (!s3Options.ignoreSideOutputErrors()) {
+ throw new RuntimeException(
+ String.format("Writing records to s3 of identifier:%s failed, the value: %s.",
+ identifier, content),
+ e);
+ }
+ LOGGER.warn("Writing records to s3 of identifier:{} failed "
+ + "and the dirty data will be throw away in the future"
+ + " because the option 'dirty.side-output.ignore-errors' is 'true'", identifier);
+ }
+ }
+
+ private boolean hasRecords() {
+ if (batchMap.isEmpty()) {
+ return false;
+ }
+ for (List value : batchMap.values()) {
+ if (!value.isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
new file mode 100644
index 00000000000..d9ec2643448
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_BYTES;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_INTERVAL;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_BATCH_SIZE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LINE_DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES;
+
+/**
+ * S3 dirty sink factory
+ */
+public class S3DirtySinkFactory implements DirtySinkFactory {
+
+ private static final String IDENTIFIER = "s3";
+
+ private static final ConfigOption DIRTY_SIDE_OUTPUT_ENDPOINT =
+ ConfigOptions.key("dirty.side-output.s3.endpoint")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The endpoint of s3");
+ private static final ConfigOption DIRTY_SIDE_OUTPUT_REGION =
+ ConfigOptions.key("dirty.side-output.s3.region")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The region of s3");
+ private static final ConfigOption DIRTY_SIDE_OUTPUT_BUCKET =
+ ConfigOptions.key("dirty.side-output.s3.bucket")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The bucket of s3");
+ private static final ConfigOption DIRTY_SIDE_OUTPUT_KEY =
+ ConfigOptions.key("dirty.side-output.s3.key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The key of s3");
+ private static final ConfigOption DIRTY_SIDE_OUTPUT_ACCESS_KEY_ID =
+ ConfigOptions.key("dirty.side-output.s3.access-key-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The access key of s3");
+ private static final ConfigOption DIRTY_SIDE_OUTPUT_SECRET_KEY_ID =
+ ConfigOptions.key("dirty.side-output.s3.secret-key-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The secret key of s3");
+
+ @Override
+ public DirtySink createDirtySink(Context context) {
+ FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
+ validate(context.getConfiguration());
+ return new S3DirtySink<>(getS3Options(context.getConfiguration()),
+ context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
+ }
+
+ private void validate(ReadableConfig config) {
+ String identifier = config.getOptional(DIRTY_IDENTIFIER).orElse(null);
+ if (identifier == null || identifier.trim().length() == 0) {
+ throw new ValidationException(
+ "The option 'dirty.identifier' is not allowed to be empty.");
+ }
+ }
+
+ private S3Options getS3Options(ReadableConfig config) {
+ final S3Options.Builder builder = S3Options.builder()
+ .setEndpoint(config.getOptional(DIRTY_SIDE_OUTPUT_ENDPOINT).orElse(null))
+ .setRegion(config.getOptional(DIRTY_SIDE_OUTPUT_REGION).orElse(null))
+ .setBucket(config.getOptional(DIRTY_SIDE_OUTPUT_BUCKET).orElse(null))
+ .setKey(config.getOptional(DIRTY_SIDE_OUTPUT_KEY).orElse(null))
+ .setBatchSize(config.get(DIRTY_SIDE_OUTPUT_BATCH_SIZE))
+ .setMaxRetries(config.get(DIRTY_SIDE_OUTPUT_RETRIES))
+ .setBatchIntervalMs(config.get(DIRTY_SIDE_OUTPUT_BATCH_INTERVAL))
+ .setMaxBatchBytes(config.get(DIRTY_SIDE_OUTPUT_BATCH_BYTES))
+ .setFormat(config.get(DIRTY_SIDE_OUTPUT_FORMAT))
+ .setIgnoreSideOutputErrors(config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS))
+ .setEnableDirtyLog(config.get(DIRTY_SIDE_OUTPUT_LOG_ENABLE))
+ .setFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER))
+ .setLineDelimiter(config.get(DIRTY_SIDE_OUTPUT_LINE_DELIMITER))
+ .setAccessKeyId(config.getOptional(DIRTY_SIDE_OUTPUT_ACCESS_KEY_ID).orElse(null))
+ .setSecretKeyId(config.getOptional(DIRTY_SIDE_OUTPUT_SECRET_KEY_ID).orElse(null));
+ return builder.build();
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ final Set> options = new HashSet<>();
+ options.add(DIRTY_SIDE_OUTPUT_ENDPOINT);
+ options.add(DIRTY_SIDE_OUTPUT_REGION);
+ options.add(DIRTY_SIDE_OUTPUT_BUCKET);
+ options.add(DIRTY_SIDE_OUTPUT_KEY);
+ options.add(DIRTY_IDENTIFIER);
+ return options;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ final Set> options = new HashSet<>();
+ options.add(DIRTY_SIDE_OUTPUT_BATCH_SIZE);
+ options.add(DIRTY_SIDE_OUTPUT_RETRIES);
+ options.add(DIRTY_SIDE_OUTPUT_BATCH_INTERVAL);
+ options.add(DIRTY_SIDE_OUTPUT_BATCH_BYTES);
+ options.add(DIRTY_SIDE_OUTPUT_FORMAT);
+ options.add(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS);
+ options.add(DIRTY_SIDE_OUTPUT_LOG_ENABLE);
+ options.add(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+ options.add(DIRTY_SIDE_OUTPUT_LINE_DELIMITER);
+ options.add(DIRTY_SIDE_OUTPUT_ACCESS_KEY_ID);
+ options.add(DIRTY_SIDE_OUTPUT_SECRET_KEY_ID);
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
new file mode 100644
index 00000000000..d79b8aecd36
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import com.amazonaws.services.s3.AmazonS3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Random;
+
+/**
+ * S3 helper class, it helps write to s3
+ */
+public class S3Helper implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3DirtySink.class);
+
+ private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
+
+ private static final int SEQUENCE_LENGTH = 4;
+ private static final String ESCAPE_PATTERN = "[\\pP\\p{Punct}\\s]";
+ private static final String FILE_NAME_SUFFIX = ".txt";
+ private final Random r = new Random();
+ private final AmazonS3 s3Client;
+ private final S3Options s3Options;
+
+ S3Helper(AmazonS3 s3Client, S3Options s3Options) {
+ this.s3Client = s3Client;
+ this.s3Options = s3Options;
+ }
+
+ /**
+ * Upload data to s3
+ *
+ * @param identifier The identifier of dirty data
+ * @param content The content that will be upload
+ * @throws IOException The exception may be thrown when executing
+ */
+ public void upload(String identifier, String content) throws IOException {
+ String path = genFileName(identifier);
+ for (int i = 0; i < s3Options.getMaxRetries(); i++) {
+ try {
+ s3Client.putObject(s3Options.getBucket(), path, content);
+ break;
+ } catch (Exception e) {
+ LOG.error("s3 dirty sink error, retry times = {}", i, e);
+ if (i >= s3Options.getMaxRetries()) {
+ throw new IOException(e);
+ }
+ try {
+ Thread.sleep(1000L * i);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException("unable to flush; interrupted while doing another attempt", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Generate the file name for s3
+ *
+ * @param identifier The identifier of dirty data
+ * @return File name of s3
+ */
+ private String genFileName(String identifier) {
+ return String.format("%s/%s-%s%s", s3Options.getKey(),
+ identifier.replaceAll(ESCAPE_PATTERN, ""), generateSequence(), FILE_NAME_SUFFIX);
+ }
+
+ private String generateSequence() {
+ StringBuilder sb = new StringBuilder(DATE_TIME_FORMAT.format(LocalDateTime.now()));
+ for (int i = 0; i < SEQUENCE_LENGTH; i++) {
+ sb.append(r.nextInt(10));
+ }
+ return sb.toString();
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Options.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Options.java
new file mode 100644
index 00000000000..f507b4802ed
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Options.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.s3;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * S3 options
+ */
+public class S3Options implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int DEFAULT_BATCH_SIZE = 100;
+ private static final int DEFAULT_MAX_RETRY_TIMES = 3;
+ private static final long DEFAULT_MAX_BATCH_BYTES = 1024 * 10L;
+ private static final long DEFAULT_INTERVAL_MILLIS = 10000L;
+ private static final String DEFAULT_FIELD_DELIMITER = ",";
+ private static final String DEFAULT_LINE_DELIMITER = "\n";
+ private static final String DEFAULT_FORMAT = "csv";
+
+ private final Integer batchSize;
+ private final Integer maxRetries;
+ private final Long batchIntervalMs;
+ private final Long maxBatchBytes;
+ private final boolean ignoreSideOutputErrors;
+ private final boolean enableDirtyLog;
+ private final String format;
+ private final String fieldDelimiter;
+ private final String lineDelimiter;
+ private final String endpoint;
+ private final String region;
+ private final String bucket;
+ private final String key;
+ private final String accessKeyId;
+ private final String secretKeyId;
+
+ private S3Options(Integer batchSize, Integer maxRetries, Long batchIntervalMs, Long maxBatchBytes,
+ String format, boolean ignoreSideOutputErrors, boolean enableDirtyLog, String fieldDelimiter,
+ String lineDelimiter, String endpoint, String region, String bucket, String key,
+ String accessKeyId, String secretKeyId) {
+ Preconditions.checkArgument(maxRetries >= 0);
+ Preconditions.checkArgument(maxBatchBytes >= 0);
+ this.batchSize = batchSize;
+ this.maxRetries = maxRetries;
+ this.batchIntervalMs = batchIntervalMs;
+ this.maxBatchBytes = maxBatchBytes;
+ this.format = format;
+ this.ignoreSideOutputErrors = ignoreSideOutputErrors;
+ this.enableDirtyLog = enableDirtyLog;
+ this.fieldDelimiter = fieldDelimiter;
+ this.lineDelimiter = lineDelimiter;
+ this.endpoint = Preconditions.checkNotNull(endpoint, "endpoint is null");
+ this.region = Preconditions.checkNotNull(region, "region is null");
+ this.bucket = Preconditions.checkNotNull(bucket, "bucket is null");
+ this.key = Preconditions.checkNotNull(key, "key is null");
+ this.accessKeyId = accessKeyId;
+ this.secretKeyId = secretKeyId;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public Integer getBatchSize() {
+ return batchSize;
+ }
+
+ public Integer getMaxRetries() {
+ return maxRetries;
+ }
+
+ public Long getBatchIntervalMs() {
+ return batchIntervalMs;
+ }
+
+ public Long getMaxBatchBytes() {
+ return maxBatchBytes;
+ }
+
+ public String getFormat() {
+ return format;
+ }
+
+ public boolean ignoreSideOutputErrors() {
+ return ignoreSideOutputErrors;
+ }
+
+ public boolean enableDirtyLog() {
+ return enableDirtyLog;
+ }
+
+ public String getFieldDelimiter() {
+ return fieldDelimiter;
+ }
+
+ public String getLineDelimiter() {
+ return lineDelimiter;
+ }
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getAccessKeyId() {
+ return accessKeyId;
+ }
+
+ public String getSecretKeyId() {
+ return secretKeyId;
+ }
+
+ public static class Builder {
+
+ private Integer batchSize = DEFAULT_BATCH_SIZE;
+ private Integer maxRetries = DEFAULT_MAX_RETRY_TIMES;
+ private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
+ private Long maxBatchBytes = DEFAULT_MAX_BATCH_BYTES;
+ private String format = DEFAULT_FORMAT;
+ private boolean ignoreSideOutputErrors;
+ private boolean enableDirtyLog;
+ private String fieldDelimiter = DEFAULT_FIELD_DELIMITER;
+ private String lineDelimiter = DEFAULT_LINE_DELIMITER;
+ private String endpoint;
+ private String region;
+ private String bucket;
+ private String key;
+ private String accessKeyId;
+ private String secretKeyId;
+
+ public Builder setBatchSize(Integer batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public Builder setMaxRetries(Integer maxRetries) {
+ this.maxRetries = maxRetries;
+ return this;
+ }
+
+ public Builder setBatchIntervalMs(Long batchIntervalMs) {
+ this.batchIntervalMs = batchIntervalMs;
+ return this;
+ }
+
+ public Builder setMaxBatchBytes(Long maxBatchBytes) {
+ this.maxBatchBytes = maxBatchBytes;
+ return this;
+ }
+
+ public Builder setFormat(String format) {
+ this.format = format;
+ return this;
+ }
+
+ public Builder setIgnoreSideOutputErrors(boolean ignoreSideOutputErrors) {
+ this.ignoreSideOutputErrors = ignoreSideOutputErrors;
+ return this;
+ }
+
+ public Builder setEnableDirtyLog(boolean enableDirtyLog) {
+ this.enableDirtyLog = enableDirtyLog;
+ return this;
+ }
+
+ public Builder setFieldDelimiter(String fieldDelimiter) {
+ this.fieldDelimiter = fieldDelimiter;
+ return this;
+ }
+
+ public Builder setLineDelimiter(String lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ return this;
+ }
+
+ public Builder setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ return this;
+ }
+
+ public Builder setRegion(String region) {
+ this.region = region;
+ return this;
+ }
+
+ public Builder setBucket(String bucket) {
+ this.bucket = bucket;
+ return this;
+ }
+
+ public Builder setKey(String key) {
+ this.key = key;
+ return this;
+ }
+
+ public Builder setAccessKeyId(String accessKeyId) {
+ this.accessKeyId = accessKeyId;
+ return this;
+ }
+
+ public Builder setSecretKeyId(String secretKeyId) {
+ this.secretKeyId = secretKeyId;
+ return this;
+ }
+
+ public S3Options build() {
+ return new S3Options(batchSize, maxRetries, batchIntervalMs, maxBatchBytes, format,
+ ignoreSideOutputErrors, enableDirtyLog, fieldDelimiter, lineDelimiter, endpoint,
+ region, bucket, key, accessKeyId, secretKeyId);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 412dedf67c2..a83e4d025c3 100644
--- a/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.inlong.sort.base.dirty.sink.log.LogDirtySinkFactory
\ No newline at end of file
+org.apache.inlong.sort.base.dirty.sink.log.LogDirtySinkFactory
+org.apache.inlong.sort.base.dirty.sink.s3.S3DirtySinkFactory
\ No newline at end of file
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 95793e251be..5a0d591b52a 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -844,7 +844,7 @@ The text of each license is the standard Apache 2.0 license.
com.tencentcloudapi:tencentcloud-sdk-java:3.1.545 - tencentcloud-sdk-java (https://github.com/TencentCloud/tencentcloud-sdk-java), (The Apache Software License, Version 2.0)
com.qcloud:dlc-data-catalog-metastore-client:1.1.1 - dlc-data-catalog-metastore-client (https://mvnrepository.com/artifact/com.qcloud/dlc-data-catalog-metastore-client/1.1), (The Apache Software License, Version 2.0)
org.apache.doris:flink-doris-connector-1.13_2.11:1.0.3 - Flink Connector for Apache Doris (https://github.com/apache/doris-flink-connector/tree/1.13_2.11-1.0.3), (The Apache Software License, Version 2.0)
-
+ com.amazonaws:aws-java-sdk-s3:jar:1.12.346 - AWS Java SDK for Amazon S3 (https://aws.amazon.com/sdkforjava), (The Apache Software License, Version 2.0)
========================================================================
Apache 2.0 licenses
diff --git a/pom.xml b/pom.xml
index 6ecfe05a264..a3897cc042c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,6 +148,7 @@
0.5.4
2.1.3
+ 1.12.346
3.6.3
2.8.1
1.15.3
@@ -1073,6 +1074,12 @@
${flink.version}
+
+ com.amazonaws
+ aws-java-sdk-s3
+ ${aws.sdk.version}
+
+
io.streamnative.connectors
pulsar-flink-connector_${scala.binary.version}