Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-6654][Sort] Supports s3 side-output for dirty data #6655

Merged
merged 3 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions inlong-sort/sort-connectors/base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.inlong.sort.base.dirty.sink;

import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableFactory.Context;
import org.apache.flink.table.factories.Factory;

/**
* Dirty sink factory class, it is used to create dirty sink
*/
public interface DirtySinkFactory extends DynamicTableFactory {
public interface DirtySinkFactory extends Factory {

/**
* Create dirty sink
Expand All @@ -31,6 +32,6 @@ public interface DirtySinkFactory extends DynamicTableFactory {
* @param <T> The data mode that is handled by the dirty sink
* @return A dirty sink
*/
<T> DirtySink<T> createDirtySink(DynamicTableFactory.Context context);
<T> DirtySink<T> createDirtySink(Context context);

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class LogDirtySink<T> implements DirtySink<T> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(LogDirtySink.class);
private static final Logger LOGGER = LoggerFactory.getLogger(LogDirtySink.class);

private final RowData.FieldGetter[] fieldGetters;
private final String format;
Expand Down Expand Up @@ -85,7 +85,7 @@ public void invoke(DirtyData<T> dirtyData) throws Exception {
// Only support csv format when the row is not a 'RowData' and 'JsonNode'
value = FormatUtils.csvFormat(data, labelMap, fieldDelimiter);
}
LOG.info("[{}] {}", dirtyData.getLogTag(), value);
LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
}

private String format(RowData data, Map<String, String> labels) throws JsonProcessingException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.inlong.sort.base.dirty.sink.log;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.DynamicTableFactory.Context;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;

import java.util.HashSet;
import java.util.Set;
import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;

Expand All @@ -36,10 +38,9 @@ public class LogDirtySinkFactory implements DirtySinkFactory {

@Override
public <T> DirtySink<T> createDirtySink(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
FactoryUtil.validateFactoryOptions(this, helper.getOptions());
String format = helper.getOptions().get(DIRTY_SIDE_OUTPUT_FORMAT);
String fieldDelimiter = helper.getOptions().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
String format = context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FORMAT);
String fieldDelimiter = context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
return new LogDirtySink<>(format, fieldDelimiter,
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
}
Expand All @@ -59,6 +60,7 @@ public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(DIRTY_SIDE_OUTPUT_FORMAT);
options.add(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
options.add(DIRTY_IDENTIFIER);
return options;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.base.dirty.sink.s3;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
import org.apache.flink.formats.json.RowDataToJsonConverters;
import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.inlong.sort.base.dirty.DirtyData;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
import org.apache.inlong.sort.base.util.LabelUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.flink.table.data.RowData.createFieldGetter;

/**
* S3 dirty sink that is used to sink dirty data to s3
*
* @param <T>
*/
public class S3DirtySink<T> implements DirtySink<T> {

private static final long serialVersionUID = 1L;

private static final Logger LOGGER = LoggerFactory.getLogger(S3DirtySink.class);

private final Map<String, List<String>> batchMap = new HashMap<>();
private final S3Options s3Options;
private final AtomicLong readInNum = new AtomicLong(0);
private final AtomicLong writeOutNum = new AtomicLong(0);
private final AtomicLong errorNum = new AtomicLong(0);
private final DataType physicalRowDataType;
private final RowData.FieldGetter[] fieldGetters;
private RowDataToJsonConverter converter;
private long batchBytes = 0L;
private int size;
private transient volatile boolean closed = false;
private transient volatile boolean flushing = false;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient S3Helper s3Helper;

public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) {
this.s3Options = s3Options;
this.physicalRowDataType = physicalRowDataType;
final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
.stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
for (int i = 0; i < logicalTypes.length; i++) {
fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
yunqingmoswu marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
public void open(Configuration configuration) throws Exception {
converter = new RowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, null)
.createConverter(physicalRowDataType.getLogicalType());
AmazonS3 s3Client;
if (s3Options.getAccessKeyId() != null && s3Options.getSecretKeyId() != null) {
BasicAWSCredentials awsCreds =
new BasicAWSCredentials(s3Options.getAccessKeyId(), s3Options.getSecretKeyId());
yunqingmoswu marked this conversation as resolved.
Show resolved Hide resolved
s3Client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
s3Options.getEndpoint(),
s3Options.getRegion()))
.build();
} else {
s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(s3Options.getEndpoint(), s3Options.getRegion())).build();
}
s3Helper = new S3Helper(s3Client, s3Options);
this.scheduler = new ScheduledThreadPoolExecutor(1,
new ExecutorThreadFactory("s3-dirty-sink"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
if (!closed && !flushing) {
flush();
}
}, s3Options.getBatchIntervalMs(), s3Options.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
}

@Override
public synchronized void invoke(DirtyData<T> dirtyData) throws Exception {
try {
addBatch(dirtyData);
} catch (Exception e) {
if (!s3Options.ignoreSideOutputErrors()) {
throw new RuntimeException(String.format("Add batch to identifier:%s failed, the dirty data: %s.",
dirtyData.getIdentifier(), dirtyData.toString()), e);
}
LOGGER.warn("Add batch to identifier:{} failed "
+ "and the dirty data will be throw away in the future"
+ " because the option 'dirty.side-output.ignore-errors' is 'true'", dirtyData.getIdentifier());
}
if (valid() && !flushing) {
flush();
}
}

private boolean valid() {
return (s3Options.getBatchSize() > 0 && size >= s3Options.getBatchSize())
|| batchBytes >= s3Options.getMaxBatchBytes();
}

private void addBatch(DirtyData<T> dirtyData) throws IOException {
readInNum.incrementAndGet();
String value;
Map<String, String> labelMap = LabelUtils.parseLabels(dirtyData.getLabels());
T data = dirtyData.getData();
if (data instanceof RowData) {
value = format((RowData) data, labelMap);
} else if (data instanceof JsonNode) {
value = format((JsonNode) data, labelMap);
} else {
// Only support csv format when the row is not a 'RowData' and 'JsonNode'
value = FormatUtils.csvFormat(data, labelMap, s3Options.getFieldDelimiter());
}
if (s3Options.enableDirtyLog()) {
LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
}
batchBytes += value.getBytes(UTF_8).length;
size++;
batchMap.computeIfAbsent(dirtyData.getIdentifier(), k -> new ArrayList<>()).add(value);
}

private String format(RowData data, Map<String, String> labels) throws JsonProcessingException {
String value;
switch (s3Options.getFormat()) {
case "csv":
value = FormatUtils.csvFormat(data, fieldGetters, labels, s3Options.getFieldDelimiter());
break;
case "json":
value = FormatUtils.jsonFormat(data, converter, labels);
break;
default:
throw new UnsupportedOperationException(
String.format("Unsupported format for: %s", s3Options.getFormat()));
}
return value;
}

private String format(JsonNode data, Map<String, String> labels) throws JsonProcessingException {
String value;
switch (s3Options.getFormat()) {
case "csv":
value = FormatUtils.csvFormat(data, labels, s3Options.getFieldDelimiter());
break;
case "json":
value = FormatUtils.jsonFormat(data, labels);
break;
default:
throw new UnsupportedOperationException(
String.format("Unsupported format for: %s", s3Options.getFormat()));
}
return value;
}

@Override
public synchronized void close() throws IOException {
if (!closed) {
closed = true;
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
try {
flush();
} catch (Exception e) {
LOGGER.warn("Writing records to s3 failed.", e);
throw new RuntimeException("Writing records to s3 failed.", e);
}
}
}

/**
* Flush data to s3
*/
public synchronized void flush() {
flushing = true;
if (!hasRecords()) {
flushing = false;
return;
}
for (Entry<String, List<String>> kvs : batchMap.entrySet()) {
flushSingleIdentifier(kvs.getKey(), kvs.getValue());
}
batchMap.clear();
batchBytes = 0;
size = 0;
flushing = false;
LOGGER.info("S3 dirty sink statistics: readInNum: {}, writeOutNum: {}, errorNum: {}",
readInNum.get(), writeOutNum.get(), errorNum.get());
}

/**
* Flush data of single identifier to s3
*
* @param identifier The identifier of dirty data
* @param values The values of the identifier
*/
private void flushSingleIdentifier(String identifier, List<String> values) {
if (values == null || values.isEmpty()) {
return;
}
String content = null;
try {
content = StringUtils.join(values, s3Options.getLineDelimiter());
s3Helper.upload(identifier, content);
LOGGER.info("Write {} records to s3 of identifier: {}", values.size(), identifier);
writeOutNum.addAndGet(values.size());
// Clean the data that has been loaded.
values.clear();
} catch (Exception e) {
errorNum.addAndGet(values.size());
if (!s3Options.ignoreSideOutputErrors()) {
throw new RuntimeException(
String.format("Writing records to s3 of identifier:%s failed, the value: %s.",
identifier, content),
e);
}
LOGGER.warn("Writing records to s3 of identifier:{} failed "
+ "and the dirty data will be throw away in the future"
+ " because the option 'dirty.side-output.ignore-errors' is 'true'", identifier);
}
}

private boolean hasRecords() {
if (batchMap.isEmpty()) {
return false;
}
for (List<String> value : batchMap.values()) {
if (!value.isEmpty()) {
return true;
}
}
return false;
}
}
Loading