Skip to content

Commit 1177d02

Browse files
authored
[Hotfix][File-Connector] Fix WriteStrategy parallel writing thread unsafe issue (#5546)
Thread unsafe exception occurs when parallel writes are run on a single node, should not shares thread unsafe object to writers.
1 parent b889618 commit 1177d02

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

release-note.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
- [Connector-v2] [Jdbc] Fix oracle sql table identifier (#4754)
5050
- [Connector-v2] [Clickhouse] fix get clickhouse local table name with closing bracket from distributed table engineFull (#4710)
5151
- [Connector-v2] [CDC] Fix jdbc connection leak for mysql (#5037)
52+
- [Connector-v2] [File] Fix WriteStrategy parallel writing thread unsafe issue #5546
5253

5354
### Zeta(ST-Engine)
5455

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public abstract class BaseFileSink
5151
protected HadoopConf hadoopConf;
5252
protected FileSystemUtils fileSystemUtils;
5353
protected FileSinkConfig fileSinkConfig;
54-
protected WriteStrategy writeStrategy;
5554
protected JobContext jobContext;
5655
protected String jobId;
5756

@@ -65,11 +64,7 @@ public void setJobContext(JobContext jobContext) {
6564
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
6665
this.seaTunnelRowType = seaTunnelRowType;
6766
this.fileSinkConfig = new FileSinkConfig(pluginConfig, seaTunnelRowType);
68-
this.writeStrategy =
69-
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
7067
this.fileSystemUtils = new FileSystemUtils(hadoopConf);
71-
this.writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
72-
this.writeStrategy.setFileSystemUtils(fileSystemUtils);
7368
}
7469

7570
@Override
@@ -80,7 +75,7 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
8075
@Override
8176
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> restoreWriter(
8277
SinkWriter.Context context, List<FileSinkState> states) throws IOException {
83-
return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, jobId, states);
78+
return new BaseFileSinkWriter(createWriteStrategy(), hadoopConf, context, jobId, states);
8479
}
8580

8681
@Override
@@ -92,7 +87,7 @@ public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> restoreWriter(
9287
@Override
9388
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> createWriter(
9489
SinkWriter.Context context) throws IOException {
95-
return new BaseFileSinkWriter(writeStrategy, hadoopConf, context, jobId);
90+
return new BaseFileSinkWriter(createWriteStrategy(), hadoopConf, context, jobId);
9691
}
9792

9893
@Override
@@ -121,4 +116,12 @@ public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
121116
public void prepare(Config pluginConfig) throws PrepareFailException {
122117
this.pluginConfig = pluginConfig;
123118
}
119+
120+
protected WriteStrategy createWriteStrategy() {
121+
WriteStrategy writeStrategy =
122+
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
123+
writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
124+
writeStrategy.setFileSystemUtils(fileSystemUtils);
125+
return writeStrategy;
126+
}
124127
}

0 commit comments

Comments
 (0)