Skip to content

Commit 2d41b02

Browse files
authored
[Improve][Fake] Improve memory usage when split size is large (#7821)
1 parent b87d732 commit 2d41b02

File tree

2 files changed

+22
-10
lines changed

2 files changed

+22
-10
lines changed

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils;
4242
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
4343

44+
import com.google.common.annotations.VisibleForTesting;
45+
4446
import java.io.IOException;
4547
import java.lang.reflect.Array;
4648
import java.math.BigDecimal;
@@ -51,6 +53,7 @@
5153
import java.util.ArrayList;
5254
import java.util.HashMap;
5355
import java.util.List;
56+
import java.util.function.Consumer;
5457
import java.util.function.Function;
5558

5659
import static org.apache.seatunnel.api.table.type.SqlType.TIME;
@@ -105,26 +108,36 @@ private SeaTunnelRow randomRow() {
105108
return seaTunnelRow;
106109
}
107110

111+
@VisibleForTesting
112+
public List<SeaTunnelRow> generateFakedRows(int rowNum) {
113+
List<SeaTunnelRow> rows = new ArrayList<>();
114+
generateFakedRows(rowNum, rows::add);
115+
return rows;
116+
}
117+
108118
/**
109119
* @param rowNum The number of pieces of data to be generated by the current task
110-
* @return The generated data
120+
* @param consumer The generated data is sent to consumer
121+
* @return The number of generated data row count
111122
*/
112-
public List<SeaTunnelRow> generateFakedRows(int rowNum) {
123+
public long generateFakedRows(int rowNum, Consumer<SeaTunnelRow> consumer) {
113124
// Use manual configuration data preferentially
114-
List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
125+
long rowCount = 0;
115126
if (fakeConfig.getFakeRows() != null) {
116127
SeaTunnelDataType<?>[] fieldTypes = catalogTable.getSeaTunnelRowType().getFieldTypes();
117128
String[] fieldNames = catalogTable.getSeaTunnelRowType().getFieldNames();
118129
for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) {
119130
customField(rowData, fieldTypes, fieldNames);
120-
seaTunnelRows.add(convertRow(rowData));
131+
consumer.accept(convertRow(rowData));
132+
rowCount++;
121133
}
122134
} else {
123135
for (int i = 0; i < rowNum; i++) {
124-
seaTunnelRows.add(randomRow());
136+
consumer.accept(randomRow());
137+
rowCount++;
125138
}
126139
}
127-
return seaTunnelRows;
140+
return rowCount;
128141
}
129142

130143
private void customField(

seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,11 @@ public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException
8989
if (null != split) {
9090
FakeDataGenerator fakeDataGenerator = fakeDataGeneratorMap.get(split.getTableId());
9191
// Randomly generated data are sent directly to the downstream operator
92-
List<SeaTunnelRow> seaTunnelRows =
93-
fakeDataGenerator.generateFakedRows(split.getRowNum());
94-
seaTunnelRows.forEach(output::collect);
92+
long rowCount =
93+
fakeDataGenerator.generateFakedRows(split.getRowNum(), output::collect);
9594
log.info(
9695
"{} rows of data have been generated in split({}) for table {}. Generation time: {}",
97-
seaTunnelRows.size(),
96+
rowCount,
9897
split.splitId(),
9998
split.getTableId(),
10099
latestTimestamp);

0 commit comments

Comments
 (0)