Skip to content

Commit bc852a4

Browse files
EricJoy2048ashulin
andauthored
[Bug]add 3node worker done test and fix some bug (#3115)
* Add master node switch test and fix bug Add master node switch test and fix bug * fix ci error * add todo * fix ci error * fix ci error * add testBatchJobRestoreIn3NodeWorkerDown and testStreamJobRestoreIn3NodeWorkerDown * add testBatchJobRestoreIn3NodeMasterDown and testStreamJobRestoreIn3NodeMasterDown * Add withTryCatch to the whenComplete which may throw exception * Add withTryCatch to the whenComplete which may throw exception * fix ci error * improve test case * notify checkpoint state * add worker node and master node done test * revert some file * fix ci error * improve test * add checkpoint.interval to test job config * fix some error * add try catch to test case * fix ci error * fix checkpoint error * [engine][tests] add cluster test case * [engine] Close checkpoint at the end of the pipeline * [engine] Fix hazelcast master node switching exception * [hotfix][connector][file] fix file sink error * fix checkpoint error # Conflicts: # seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java * fix PipelineState to PipelineStatus bug * [engine][checkpoint] add barrier flow operation (#3158) * fix operator retry error * fix operator retry error * fix checkpoint error * fix checkpoint error * fix checkpoint error * fix TaskExecutionService task is not exist when cancel task Co-authored-by: Zongwen Li <zongwen@apache.org>
1 parent 8cd0e51 commit bc852a4

File tree

25 files changed

+662
-155
lines changed

25 files changed

+662
-155
lines changed

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,15 @@ public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, Si
4646
if (!fileSinkStates.isEmpty()) {
4747
List<String> transactionIds = writeStrategy.getTransactionIdFromStates(fileSinkStates);
4848
transactionIds.forEach(writeStrategy::abortPrepare);
49-
writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId());
49+
writeStrategy.beginTransaction(fileSinkStates.get(0).getCheckpointId() + 1);
50+
} else {
51+
writeStrategy.beginTransaction(1L);
5052
}
5153
}
5254

5355
public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, SinkWriter.Context context, String jobId) {
5456
this(writeStrategy, hadoopConf, context, jobId, Collections.emptyList());
57+
writeStrategy.beginTransaction(1L);
5558
}
5659

5760
@Override

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public FileAggregatedCommitInfo combine(List<FileCommitInfo> commitInfos) {
8383
*/
8484
@Override
8585
public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws Exception {
86+
log.info("rollback aggregate commit");
8687
if (aggregatedCommitInfos == null || aggregatedCommitInfos.size() == 0) {
8788
return;
8889
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,19 @@ public static void deleteFile(@NonNull String file) throws IOException {
7676
* @param rmWhenExist if this is true, we will delete the target file when it already exists
7777
* @throws IOException throw IOException
7878
*/
79-
public static void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist) throws IOException {
79+
public static void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist)
80+
throws IOException {
8081
FileSystem fileSystem = getFileSystem(newName);
8182
log.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]");
8283

8384
Path oldPath = new Path(oldName);
8485
Path newPath = new Path(newName);
86+
87+
if (!fileExist(oldPath.toString())) {
88+
log.warn("rename file :[" + oldPath + "] to [" + newPath + "] already finished in the last commit, skip");
89+
return;
90+
}
91+
8592
if (rmWhenExist) {
8693
if (fileExist(newName) && fileExist(oldName)) {
8794
fileSystem.delete(newPath, true);
@@ -119,6 +126,9 @@ public static boolean fileExist(@NonNull String filePath) throws IOException {
119126
public static List<Path> dirList(@NonNull String filePath) throws FileNotFoundException, IOException {
120127
FileSystem fileSystem = getFileSystem(filePath);
121128
List<Path> pathList = new ArrayList<>();
129+
if (!fileExist(filePath)) {
130+
return pathList;
131+
}
122132
Path fileName = new Path(filePath);
123133
FileStatus[] status = fileSystem.listStatus(fileName);
124134
if (status != null && status.length > 0) {

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
7070
protected Map<String, String> beingWrittenFile;
7171
private Map<String, List<String>> partitionDirAndValuesMap;
7272
protected SeaTunnelRowType seaTunnelRowType;
73-
protected Long checkpointId = 1L;
73+
74+
// Checkpoint id from engine is start with 1
75+
protected Long checkpointId = 0L;
7476

7577
public AbstractWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
7678
this.textFileSinkConfig = textFileSinkConfig;
@@ -88,7 +90,6 @@ public void init(HadoopConf conf, String jobId, int subTaskIndex) {
8890
this.jobId = jobId;
8991
this.subTaskIndex = subTaskIndex;
9092
FileSystemUtils.CONF = getConfiguration(hadoopConf);
91-
this.beginTransaction(this.checkpointId);
9293
}
9394

9495
/**
@@ -233,6 +234,7 @@ public void abortPrepare(String transactionId) {
233234
* @param checkpointId checkpoint id
234235
*/
235236
public void beginTransaction(Long checkpointId) {
237+
this.checkpointId = checkpointId;
236238
this.transactionId = "T" + Constant.TRANSACTION_ID_SPLIT + jobId + Constant.TRANSACTION_ID_SPLIT + subTaskIndex + Constant.TRANSACTION_ID_SPLIT + checkpointId;
237239
this.transactionDirectory = getTransactionDir(this.transactionId);
238240
this.needMoveFiles = new HashMap<>();
@@ -265,8 +267,7 @@ public List<String> getTransactionIdFromStates(List<FileSinkState> fileStates) {
265267
@Override
266268
public List<FileSinkState> snapshotState(long checkpointId) {
267269
ArrayList<FileSinkState> fileState = Lists.newArrayList(new FileSinkState(this.transactionId, this.checkpointId));
268-
this.checkpointId = checkpointId;
269-
this.beginTransaction(checkpointId);
270+
this.beginTransaction(checkpointId + 1);
270271
return fileState;
271272
}
272273

@@ -303,4 +304,8 @@ public String getTargetLocation(@NonNull String seaTunnelFilePath) {
303304
Matcher.quoteReplacement(textFileSinkConfig.getPath()));
304305
return tmpPath.replaceAll(Constant.NON_PARTITION + Matcher.quoteReplacement(File.separator), "");
305306
}
307+
308+
public long getCheckpointId() {
309+
return this.checkpointId;
310+
}
306311
}

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,6 @@ public interface WriteStrategy extends Transaction, Serializable {
7272
* when a transaction is triggered, release resources
7373
*/
7474
void finishAndCloseFile();
75+
76+
long getCheckpointId();
7577
}

0 commit comments

Comments
 (0)