Skip to content

Commit e081762

Browse files
authored
feat: add record write failure log and metrics (#13417)
1 parent f386128 commit e081762

File tree

4 files changed

+94
-3
lines changed

4 files changed

+94
-3
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public WriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
9191
public WriteStatus() {
9292
this.failureFraction = 0.0d;
9393
this.trackSuccessRecords = false;
94-
this.random = null;
94+
this.random = new Random(RANDOM_SEED);
9595
this.isMetadataTable = false;
9696
}
9797

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkStreamWriteMetrics.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public class FlinkStreamWriteMetrics extends HoodieFlinkMetrics {
6666
*/
6767
private long numOfFilesWritten;
6868

69+
/**
70+
* Number of record write failure during a checkpoint window.
71+
*/
72+
private long numOfRecordWriteFailures;
73+
6974
/**
7075
* Number of records written per seconds.
7176
*/
@@ -104,6 +109,8 @@ public void registerMetrics() {
104109
metricGroup.gauge("fileFlushTotalCosts", () -> fileFlushTotalCosts);
105110
metricGroup.gauge("numOfFilesWritten", () -> numOfFilesWritten);
106111
metricGroup.gauge("numOfOpenHandle", () -> numOfOpenHandle);
112+
metricGroup.gauge("numOfRecordWriteFailures", () -> numOfRecordWriteFailures);
113+
107114

108115
metricGroup.meter("handleSwitchPerSecond", handleSwitchPerSecond);
109116

@@ -132,6 +139,14 @@ public void increaseNumOfFilesWritten() {
132139
numOfFilesWritten += 1;
133140
}
134141

142+
public void increaseNumOfRecordWriteFailure(long recordWriteFailures) {
143+
numOfRecordWriteFailures += recordWriteFailures;
144+
}
145+
146+
public long getNumOfRecordWriteFailures() {
147+
return numOfRecordWriteFailures;
148+
}
149+
135150
public void increaseNumOfOpenHandle() {
136151
numOfOpenHandle += 1;
137152
increaseNumOfFilesWritten();
@@ -165,6 +180,7 @@ public void resetAfterCommit() {
165180
this.numOfOpenHandle = 0;
166181
this.writeBufferedSize = 0;
167182
this.fileFlushTotalCosts = 0;
183+
this.numOfRecordWriteFailures = 0;
168184
}
169185

170186
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.hudi.sink.append;
2020

2121
import org.apache.hudi.client.WriteStatus;
22+
23+
import org.apache.hudi.common.model.HoodieKey;
2224
import org.apache.hudi.common.util.Option;
2325
import org.apache.hudi.exception.HoodieException;
2426
import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
@@ -40,6 +42,7 @@
4042

4143
import java.util.Collections;
4244
import java.util.List;
45+
import java.util.Map;
4346

4447
/**
4548
* Sink function to write the data to the underneath filesystem.
@@ -144,8 +147,8 @@ private void flushData(boolean endInput) {
144147
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, this.currentInstant);
145148
}
146149

150+
recordWriteFailure(writeMetrics, writeStatus);
147151
StreamerUtil.validateWriteStatus(config, currentInstant, writeStatus);
148-
149152
final WriteMetadataEvent event = WriteMetadataEvent.builder()
150153
.taskID(taskID)
151154
.checkpointId(this.checkpointId)
@@ -167,4 +170,26 @@ private void registerMetrics() {
167170
writeMetrics = new FlinkStreamWriteMetrics(metrics);
168171
writeMetrics.registerMetrics();
169172
}
173+
174+
/**
175+
* Update metrics and log for errors in write status.
176+
*
177+
* @param writeMetrics FlinkStreamWriteMetrics
178+
* @param writeStatus write status from write handler
179+
*/
180+
@VisibleForTesting
181+
static void recordWriteFailure(FlinkStreamWriteMetrics writeMetrics, List<WriteStatus> writeStatus) {
182+
Map.Entry<HoodieKey, Throwable> firstFailure = null;
183+
for (WriteStatus status : writeStatus) {
184+
writeMetrics.increaseNumOfRecordWriteFailure(status.getTotalErrorRecords());
185+
if (firstFailure == null && status.getErrors().size() > 0) {
186+
firstFailure = status.getErrors().entrySet().stream().findFirst().get();
187+
}
188+
}
189+
190+
// Only print the first record failure to prevent logs occupy too much disk in worst case.
191+
if (firstFailure != null) {
192+
LOG.error("The first record with written failure {}", firstFailure.getKey().getRecordKey(), firstFailure.getValue());
193+
}
194+
}
170195
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,30 @@
3131
import java.util.List;
3232

3333
import static org.junit.jupiter.api.Assertions.assertThrows;
34+
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
35+
36+
import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
37+
38+
import org.junit.jupiter.api.BeforeEach;
39+
40+
import java.util.Arrays;
41+
42+
import static org.junit.jupiter.api.Assertions.assertEquals;
3443

3544
/**
3645
* Test cases for {@link AppendWriteFunction}.
3746
*/
3847
public class TestAppendWriteFunction {
3948

49+
private FlinkStreamWriteMetrics flinkStreamWriteMetrics;
50+
private UnregisteredMetricsGroup metricGroup;
51+
52+
@BeforeEach
53+
void setUp() {
54+
metricGroup = new UnregisteredMetricsGroup();
55+
flinkStreamWriteMetrics = new FlinkStreamWriteMetrics(metricGroup);
56+
}
57+
4058
@Test
4159
void testRecordWriteNoFailure() {
4260
WriteStatus writeStatus = new WriteStatus();
@@ -61,7 +79,7 @@ void testRecordWriteFailureValidationWithoutFailFast() {
6179
void testRecordWriteFailureValidationWithFailFast() {
6280
WriteStatus writeStatus = new WriteStatus();
6381
writeStatus.markFailure(
64-
"key1", "/partition1", new RuntimeException("test exception"));
82+
"key1", "/partition1", new RuntimeException("test exception"));
6583
List<WriteStatus> writeStatusList = Collections.singletonList(writeStatus);
6684

6785
Configuration configuration = new Configuration();
@@ -71,4 +89,36 @@ void testRecordWriteFailureValidationWithFailFast() {
7189
assertThrows(HoodieException.class,
7290
() -> StreamerUtil.validateWriteStatus(configuration, HoodieInstantTimeGenerator.getCurrentInstantTimeStr(), writeStatusList));
7391
}
92+
93+
@Test
94+
void testRecordWriteFailure() {
95+
WriteStatus writeStatus = new WriteStatus();
96+
writeStatus.markFailure(
97+
"key1", "/partition1", new RuntimeException("test exception"));
98+
List<WriteStatus> writeStatusList = Arrays.asList(writeStatus);
99+
100+
AppendWriteFunction.recordWriteFailure(flinkStreamWriteMetrics, writeStatusList);
101+
102+
// Verify that the failure was recorded in metrics
103+
assertEquals(1, flinkStreamWriteMetrics.getNumOfRecordWriteFailures());
104+
}
105+
106+
@Test
107+
void testRecordWriteFailureMultipleErrors() {
108+
WriteStatus writeStatus1 = new WriteStatus();
109+
writeStatus1.markFailure("key1", "/partition1", new RuntimeException("error 1"));
110+
writeStatus1.markFailure("key2", "/partition1", new RuntimeException("error 2"));
111+
writeStatus1.markFailure("key3", "/partition1", new RuntimeException("error 3"));
112+
113+
WriteStatus writeStatus2 = new WriteStatus();
114+
writeStatus2.markFailure("key4", "/partition2", new IllegalArgumentException("illegal argument"));
115+
writeStatus2.markFailure("key5", "/partition2", new NullPointerException("null pointer"));
116+
117+
List<WriteStatus> writeStatusList = Arrays.asList(writeStatus1, writeStatus2);
118+
119+
AppendWriteFunction.recordWriteFailure(flinkStreamWriteMetrics, writeStatusList);
120+
121+
// Should record total 5 failures across both write statuses
122+
assertEquals(5, flinkStreamWriteMetrics.getNumOfRecordWriteFailures());
123+
}
74124
}

0 commit comments

Comments
 (0)