18
18
package org .apache .seatunnel .connectors .seatunnel .file .source .split ;
19
19
20
20
import org .apache .seatunnel .api .source .SourceSplitEnumerator ;
21
- import org .apache .seatunnel .common .config .Common ;
22
21
import org .apache .seatunnel .connectors .seatunnel .file .source .state .FileSourceState ;
23
22
23
+ import lombok .extern .slf4j .Slf4j ;
24
+
24
25
import java .io .IOException ;
25
26
import java .util .ArrayList ;
26
- import java .util .Collection ;
27
- import java .util .Collections ;
28
- import java .util .HashMap ;
29
27
import java .util .HashSet ;
30
28
import java .util .List ;
31
- import java .util .Map ;
32
29
import java .util .Set ;
30
+ import java .util .stream .Collectors ;
33
31
32
+ @ Slf4j
34
33
public class FileSourceSplitEnumerator implements SourceSplitEnumerator <FileSourceSplit , FileSourceState > {
35
34
private final Context <FileSourceSplit > context ;
36
35
private Set <FileSourceSplit > pendingSplit ;
@@ -40,6 +39,7 @@ public class FileSourceSplitEnumerator implements SourceSplitEnumerator<FileSour
40
39
public FileSourceSplitEnumerator (SourceSplitEnumerator .Context <FileSourceSplit > context , List <String > filePaths ) {
41
40
this .context = context ;
42
41
this .filePaths = filePaths ;
42
+ this .assignedSplit = new HashSet <>();
43
43
}
44
44
45
45
public FileSourceSplitEnumerator (SourceSplitEnumerator .Context <FileSourceSplit > context , List <String > filePaths ,
@@ -50,51 +50,59 @@ public FileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit>
50
50
51
51
@ Override
52
52
public void open () {
53
- this .assignedSplit = new HashSet <>();
54
53
this .pendingSplit = new HashSet <>();
55
54
}
56
55
57
56
@ Override
58
57
public void run () {
59
- pendingSplit = getHiveFileSplit ();
60
- assignSplit (context .registeredReaders ());
58
+ // do nothing
61
59
}
62
60
63
- private Set <FileSourceSplit > getHiveFileSplit () {
64
- Set <FileSourceSplit > hiveSourceSplits = new HashSet <>();
65
- filePaths .forEach (k -> hiveSourceSplits .add (new FileSourceSplit (k )));
66
- return hiveSourceSplits ;
67
-
61
+ private Set <FileSourceSplit > getFileSplit () {
62
+ Set <FileSourceSplit > fileSourceSplits = new HashSet <>();
63
+ filePaths .forEach (k -> fileSourceSplits .add (new FileSourceSplit (k )));
64
+ return fileSourceSplits ;
68
65
}
69
66
70
67
@ Override
71
68
public void close () throws IOException {
72
-
69
+ // do nothing
73
70
}
74
71
75
72
@ Override
76
73
public void addSplitsBack (List <FileSourceSplit > splits , int subtaskId ) {
77
74
if (!splits .isEmpty ()) {
78
75
pendingSplit .addAll (splits );
79
- assignSplit (Collections . singletonList ( subtaskId ) );
76
+ assignSplit (subtaskId );
80
77
}
81
78
}
82
79
83
- private void assignSplit (Collection <Integer > taskIDList ) {
84
- Map <Integer , List <FileSourceSplit >> readySplit = new HashMap <>(Common .COLLECTION_SIZE );
85
- for (int taskID : taskIDList ) {
86
- readySplit .computeIfAbsent (taskID , id -> new ArrayList <>());
80
+ private void assignSplit (int taskId ) {
81
+ ArrayList <FileSourceSplit > currentTaskSplits = new ArrayList <>();
82
+ if (context .currentParallelism () == 1 ) {
83
+ // if parallelism == 1, we should assign all the splits to reader
84
+ currentTaskSplits .addAll (pendingSplit );
85
+ } else {
86
+ // if parallelism > 1, according to hashCode of split's id to determine whether to allocate the current task
87
+ for (FileSourceSplit fileSourceSplit : pendingSplit ) {
88
+ int splitOwner = getSplitOwner (fileSourceSplit .splitId (), context .currentParallelism ());
89
+ if (splitOwner == taskId ) {
90
+ currentTaskSplits .add (fileSourceSplit );
91
+ }
92
+ }
87
93
}
88
-
89
- pendingSplit .forEach (s -> readySplit .get (getSplitOwner (s .splitId (), taskIDList .size ()))
90
- .add (s ));
91
- readySplit .forEach (context ::assignSplit );
92
- assignedSplit .addAll (pendingSplit );
93
- pendingSplit .clear ();
94
+ // assign splits
95
+ context .assignSplit (taskId , currentTaskSplits );
96
+ // save the state of assigned splits
97
+ assignedSplit .addAll (currentTaskSplits );
98
+ // remove the assigned splits from pending splits
99
+ currentTaskSplits .forEach (split -> pendingSplit .remove (split ));
100
+ log .info ("SubTask {} is assigned to [{}]" , taskId , currentTaskSplits .stream ().map (FileSourceSplit ::splitId ).collect (Collectors .joining ("," )));
101
+ context .signalNoMoreSplits (taskId );
94
102
}
95
103
96
104
private static int getSplitOwner (String tp , int numReaders ) {
97
- return tp .hashCode () % numReaders ;
105
+ return Math . abs ( tp .hashCode () ) % numReaders ;
98
106
}
99
107
100
108
@ Override
@@ -104,9 +112,8 @@ public int currentUnassignedSplitSize() {
104
112
105
113
@ Override
106
114
public void registerReader (int subtaskId ) {
107
- if (!pendingSplit .isEmpty ()) {
108
- assignSplit (Collections .singletonList (subtaskId ));
109
- }
115
+ pendingSplit = getFileSplit ();
116
+ assignSplit (subtaskId );
110
117
}
111
118
112
119
@ Override
0 commit comments