@@ -63,31 +63,7 @@ public HiveSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
63
63
this .context = context ;
64
64
this .jobId = jobId ;
65
65
this .hiveSinkConfig = hiveSinkConfig ;
66
-
67
- SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin ();
68
- Optional <TransactionStateFileWriter > transactionStateFileWriter = sinkFileSystemPlugin .getTransactionStateFileWriter (this .seaTunnelRowTypeInfo ,
69
- new FileSinkTransactionFileNameGenerator (
70
- this .hiveSinkConfig .getTextFileSinkConfig ().getFileFormat (),
71
- this .hiveSinkConfig .getTextFileSinkConfig ().getFileNameExpression (),
72
- this .hiveSinkConfig .getTextFileSinkConfig ().getFileNameTimeFormat ()),
73
- new FileSinkPartitionDirNameGenerator (
74
- this .hiveSinkConfig .getTextFileSinkConfig ().getPartitionFieldList (),
75
- this .hiveSinkConfig .getTextFileSinkConfig ().getPartitionFieldsIndexInRow (),
76
- this .hiveSinkConfig .getTextFileSinkConfig ().getPartitionDirExpression ()),
77
- this .hiveSinkConfig .getTextFileSinkConfig ().getSinkColumnsIndexInRow (),
78
- this .hiveSinkConfig .getTextFileSinkConfig ().getTmpPath (),
79
- this .hiveSinkConfig .getTextFileSinkConfig ().getPath (),
80
- this .jobId ,
81
- this .context .getIndexOfSubtask (),
82
- this .hiveSinkConfig .getTextFileSinkConfig ().getFieldDelimiter (),
83
- this .hiveSinkConfig .getTextFileSinkConfig ().getRowDelimiter (),
84
- sinkFileSystemPlugin .getFileSystem ().get ());
85
-
86
- if (!transactionStateFileWriter .isPresent ()) {
87
- throw new RuntimeException ("A TransactionStateFileWriter is need" );
88
- }
89
-
90
- this .fileWriter = transactionStateFileWriter .get ();
66
+ this .fileWriter = createFileWriter ();
91
67
92
68
fileWriter .beginTransaction (1L );
93
69
}
@@ -103,31 +79,7 @@ public HiveSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
103
79
this .context = context ;
104
80
this .jobId = jobId ;
105
81
this .hiveSinkConfig = hiveSinkConfig ;
106
-
107
- SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin ();
108
- Optional <TransactionStateFileWriter > transactionStateFileWriter = sinkFileSystemPlugin .getTransactionStateFileWriter (this .seaTunnelRowTypeInfo ,
109
- new FileSinkTransactionFileNameGenerator (
110
- this .hiveSinkConfig .getTextFileSinkConfig ().getFileFormat (),
111
- this .hiveSinkConfig .getTextFileSinkConfig ().getFileNameExpression (),
112
- this .hiveSinkConfig .getTextFileSinkConfig ().getFileNameTimeFormat ()),
113
- new FileSinkPartitionDirNameGenerator (
114
- this .hiveSinkConfig .getTextFileSinkConfig ().getPartitionFieldList (),
115
- this .hiveSinkConfig .getTextFileSinkConfig ().getPartitionFieldsIndexInRow (),
116
- this .hiveSinkConfig .getTextFileSinkConfig ().getPartitionDirExpression ()),
117
- this .hiveSinkConfig .getTextFileSinkConfig ().getSinkColumnsIndexInRow (),
118
- this .hiveSinkConfig .getTextFileSinkConfig ().getTmpPath (),
119
- this .hiveSinkConfig .getTextFileSinkConfig ().getPath (),
120
- this .jobId ,
121
- this .context .getIndexOfSubtask (),
122
- this .hiveSinkConfig .getTextFileSinkConfig ().getFieldDelimiter (),
123
- this .hiveSinkConfig .getTextFileSinkConfig ().getRowDelimiter (),
124
- sinkFileSystemPlugin .getFileSystem ().get ());
125
-
126
- if (!transactionStateFileWriter .isPresent ()) {
127
- throw new RuntimeException ("A TransactionStateFileWriter is need" );
128
- }
129
-
130
- this .fileWriter = transactionStateFileWriter .get ();
82
+ this .fileWriter = createFileWriter ();
131
83
132
84
// Rollback dirty transaction
133
85
if (hiveSinkStates .size () > 0 ) {
@@ -172,4 +124,37 @@ public List<HiveSinkState> snapshotState(long checkpointId) throws IOException {
172
124
public void abortPrepare () {
173
125
fileWriter .abortTransaction ();
174
126
}
127
+
128
+ private TransactionStateFileWriter createFileWriter () {
129
+ SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin ();
130
+ Optional <TransactionStateFileWriter > transactionStateFileWriterOpt = sinkFileSystemPlugin .getTransactionStateFileWriter (this .seaTunnelRowTypeInfo ,
131
+ getFilenameGenerator (),
132
+ getPartitionDirNameGenerator (),
133
+ this .hiveSinkConfig .getTextFileSinkConfig ().getSinkColumnsIndexInRow (),
134
+ this .hiveSinkConfig .getTextFileSinkConfig ().getTmpPath (),
135
+ this .hiveSinkConfig .getTextFileSinkConfig ().getPath (),
136
+ this .jobId ,
137
+ this .context .getIndexOfSubtask (),
138
+ this .hiveSinkConfig .getTextFileSinkConfig ().getFieldDelimiter (),
139
+ this .hiveSinkConfig .getTextFileSinkConfig ().getRowDelimiter (),
140
+ sinkFileSystemPlugin .getFileSystem ().get ());
141
+ if (!transactionStateFileWriterOpt .isPresent ()) {
142
+ throw new RuntimeException ("A TransactionStateFileWriter is need" );
143
+ }
144
+ return transactionStateFileWriterOpt .get ();
145
+ }
146
+
147
+ private FileSinkTransactionFileNameGenerator getFilenameGenerator () {
148
+ return new FileSinkTransactionFileNameGenerator (
149
+ this .hiveSinkConfig .getTextFileSinkConfig ().getFileFormat (),
150
+ this .hiveSinkConfig .getTextFileSinkConfig ().getFileNameExpression (),
151
+ this .hiveSinkConfig .getTextFileSinkConfig ().getFileNameTimeFormat ());
152
+ }
153
+
154
+ private FileSinkPartitionDirNameGenerator getPartitionDirNameGenerator () {
155
+ return new FileSinkPartitionDirNameGenerator (
156
+ this .hiveSinkConfig .getTextFileSinkConfig ().getPartitionFieldList (),
157
+ this .hiveSinkConfig .getTextFileSinkConfig ().getPartitionFieldsIndexInRow (),
158
+ this .hiveSinkConfig .getTextFileSinkConfig ().getPartitionDirExpression ());
159
+ }
175
160
}
0 commit comments