Skip to content

Commit

Permalink
[INLONG-10357][Sort] Make StarRocks sink support report audit informa…
Browse files Browse the repository at this point in the history
…tion exactly once (#10549)
  • Loading branch information
XiaoYou201 committed Jul 2, 2024
1 parent b5eeffe commit 5b29cc0
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.inlong.sort.starrocks.table.sink.table;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.metric.SinkExactlyMetric;
import org.apache.inlong.sort.starrocks.table.sink.utils.SchemaUtils;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -70,6 +70,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;

/**
* StarRocks dynamic sink function. It supports insert, upsert, delete in Starrocks.
* @param <T>
Expand All @@ -87,7 +89,7 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct

private transient volatile ListState<StarrocksSnapshotState> snapshotStates;
private final Map<Long, List<StreamLoadSnapshot>> snapshotMap = new ConcurrentHashMap<>();
private transient SinkMetricData sinkMetricData;
private transient SinkExactlyMetric sinkExactlyMetric;

@Deprecated
private transient ListState<Map<String, StarRocksSinkBufferEntity>> legacyState;
Expand Down Expand Up @@ -207,19 +209,18 @@ public void invoke(T value, Context context)

Object[] data = rowTransformer.transform(value, sinkOptions.supportUpsertDelete());

ouputMetrics(value, data);

sinkManager.write(
null,
sinkOptions.getDatabaseName(),
sinkOptions.getTableName(),
serializer.serialize(schemaUtils.filterOutTimeField(data)));

ouputMetrics(value, data);
}

private void ouputMetrics(T value, Object[] data) {
if (sinkMetricData != null) {
sinkMetricData.invokeWithEstimate(value, schemaUtils.getDataTime(data));
if (sinkExactlyMetric != null) {
sinkExactlyMetric.invoke(1, getDataSize(value), schemaUtils.getDataTime(data));
}
}

Expand All @@ -237,10 +238,10 @@ public void open(Configuration parameters) {
.build();

if (metricOption != null) {
sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
sinkExactlyMetric = new SinkExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}

notifyCheckpointComplete(Long.MAX_VALUE);
commitTransaction(Long.MAX_VALUE);
log.info("Open sink function v2. {}", EnvUtils.getGitInformation());
}

Expand All @@ -265,10 +266,9 @@ public void close() {

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
sinkManager.flush();

flushAudit();

if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
return;
}
Expand All @@ -290,12 +290,6 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw
}
}

private void flushAudit() {
if (sinkMetricData != null) {
sinkMetricData.flushAuditData();
}
}

@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
Expand Down Expand Up @@ -346,7 +340,12 @@ public void initializeState(FunctionInitializationContext functionInitialization

@Override
public void notifyCheckpointComplete(long checkpointId) {
commitTransaction(checkpointId);
flushAudit();
updateLastCheckpointId(checkpointId);
}

private void commitTransaction(long checkpointId) {
boolean succeed = true;

List<Long> commitCheckpointIds = snapshotMap.keySet().stream()
Expand Down Expand Up @@ -395,4 +394,22 @@ private void flushLegacyData() {
legacyData.clear();
}

private void flushAudit() {
if (sinkExactlyMetric != null) {
sinkExactlyMetric.flushAudit();
}
}

private void updateCurrentCheckpointId(long checkpointId) {
if (sinkExactlyMetric != null) {
sinkExactlyMetric.updateCurrentCheckpointId(checkpointId);
}
}

private void updateLastCheckpointId(long checkpointId) {
if (sinkExactlyMetric != null) {
sinkExactlyMetric.updateLastCheckpointId(checkpointId);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class SchemaUtils implements Serializable {

private static final long serialVersionUID = 1L;

private final String AUDIT_DATA_TIME = "audit_data_time";
private final String AUDIT_DATA_TIME = "AUDIT_DATA_TIME";
private final int DATA_TIME_ABSENT_INDEX = -1;
private final int dataTimeFieldIndex;

Expand Down Expand Up @@ -73,7 +73,7 @@ public Object[] filterOutTimeField(Object[] data) {
*/
public String[] filterOutTimeField(TableSchema schema) {
return Arrays.stream(schema.getFieldNames())
.filter(field -> !AUDIT_DATA_TIME.equals(field))
.filter(field -> !AUDIT_DATA_TIME.equalsIgnoreCase(field))
.toArray(String[]::new);
}

Expand All @@ -84,7 +84,7 @@ public String[] filterOutTimeField(TableSchema schema) {
*/
private int getDataTimeIndex(String[] fieldNames) {
for (int i = 0; i < fieldNames.length; i++) {
if (AUDIT_DATA_TIME.equals(fieldNames[i])) {
if (AUDIT_DATA_TIME.equalsIgnoreCase(fieldNames[i])) {
return i;
}
}
Expand Down

0 comments on commit 5b29cc0

Please sign in to comment.