17
17
18
18
package org .apache .seatunnel .connectors .seatunnel .tdengine .source ;
19
19
20
- import org .apache .seatunnel .api .source .Boundedness ;
21
20
import org .apache .seatunnel .api .source .Collector ;
22
21
import org .apache .seatunnel .api .source .SourceReader ;
23
22
import org .apache .seatunnel .api .table .type .SeaTunnelRow ;
24
23
import org .apache .seatunnel .common .exception .CommonErrorCodeDeprecated ;
25
24
import org .apache .seatunnel .connectors .seatunnel .tdengine .config .TDengineSourceConfig ;
26
25
import org .apache .seatunnel .connectors .seatunnel .tdengine .exception .TDengineConnectorException ;
27
26
28
- import org .apache .commons .lang3 .StringUtils ;
29
-
30
- import com .google .common .collect .Sets ;
31
27
import com .taosdata .jdbc .TSDBDriver ;
32
28
import lombok .extern .slf4j .Slf4j ;
33
29
39
35
import java .sql .Statement ;
40
36
import java .sql .Timestamp ;
41
37
import java .util .ArrayList ;
38
+ import java .util .Deque ;
42
39
import java .util .List ;
43
40
import java .util .Objects ;
44
41
import java .util .Properties ;
45
- import java .util .Set ;
42
+ import java .util .concurrent . ConcurrentLinkedDeque ;
46
43
47
44
import static org .apache .seatunnel .connectors .seatunnel .tdengine .utils .TDengineUtil .checkDriverExist ;
48
45
49
46
@ Slf4j
50
47
public class TDengineSourceReader implements SourceReader <SeaTunnelRow , TDengineSourceSplit > {
51
-
52
- private static final long THREAD_WAIT_TIME = 500L ;
53
-
54
48
private final TDengineSourceConfig config ;
55
49
56
- private final Set <TDengineSourceSplit > sourceSplits ;
50
+ private final Deque <TDengineSourceSplit > sourceSplits ;
57
51
58
52
private final Context context ;
59
53
60
54
private Connection conn ;
61
55
56
+ private volatile boolean noMoreSplit ;
57
+
62
58
public TDengineSourceReader (TDengineSourceConfig config , SourceReader .Context readerContext ) {
63
59
this .config = config ;
64
- this .sourceSplits = Sets . newHashSet ();
60
+ this .sourceSplits = new ConcurrentLinkedDeque <> ();
65
61
this .context = readerContext ;
66
62
}
67
63
68
64
@ Override
69
65
public void pollNext (Collector <SeaTunnelRow > collector ) throws InterruptedException {
70
- if (sourceSplits .isEmpty ()) {
71
- Thread .sleep (THREAD_WAIT_TIME );
72
- return ;
73
- }
74
66
synchronized (collector .getCheckpointLock ()) {
75
- sourceSplits .forEach (
76
- split -> {
77
- try {
78
- read (split , collector );
79
- } catch (Exception e ) {
80
- throw new TDengineConnectorException (
81
- CommonErrorCodeDeprecated .READER_OPERATION_FAILED ,
82
- "TDengine split read error" ,
83
- e );
84
- }
85
- });
86
- }
87
-
88
- if (Boundedness .BOUNDED .equals (context .getBoundedness ())) {
89
- // signal to the source that we have reached the end of the data.
90
- log .info ("Closed the bounded TDengine source" );
91
- context .signalNoMoreElement ();
67
+ log .info ("polling new split from queue!" );
68
+ TDengineSourceSplit split = sourceSplits .poll ();
69
+ if (Objects .nonNull (split )) {
70
+ log .info (
71
+ "starting run new split {}, query sql: {}!" ,
72
+ split .splitId (),
73
+ split .getQuery ());
74
+ try {
75
+ read (split , collector );
76
+ } catch (Exception e ) {
77
+ throw new TDengineConnectorException (
78
+ CommonErrorCodeDeprecated .READER_OPERATION_FAILED ,
79
+ "TDengine split read error" ,
80
+ e );
81
+ }
82
+ } else if (noMoreSplit && sourceSplits .isEmpty ()) {
83
+ // signal to the source that we have reached the end of the data.
84
+ log .info ("Closed the bounded TDengine source" );
85
+ context .signalNoMoreElement ();
86
+ } else {
87
+ Thread .sleep (1000L );
88
+ }
92
89
}
93
90
}
94
91
95
92
@ Override
96
93
public void open () {
97
- String jdbcUrl =
98
- StringUtils .join (
99
- config .getUrl (),
100
- config .getDatabase (),
101
- "?user=" ,
102
- config .getUsername (),
103
- "&password=" ,
104
- config .getPassword ());
105
- Properties connProps = new Properties ();
106
- // todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true",
107
- // there is a exception : Caused by: java.sql.SQLException: can't create connection with
108
- // server
109
- // under docker network env
110
- // @bobo (tdengine)
111
- connProps .setProperty (TSDBDriver .PROPERTY_KEY_BATCH_LOAD , "false" );
94
+ String jdbcUrl = config .getUrl ();
95
+
96
+ Properties properties = new Properties ();
97
+ properties .put (TSDBDriver .PROPERTY_KEY_USER , config .getUsername ());
98
+ properties .put (TSDBDriver .PROPERTY_KEY_PASSWORD , config .getPassword ());
99
+
112
100
try {
113
- // check td driver whether exist and if not, try to register
114
101
checkDriverExist (jdbcUrl );
115
- conn = DriverManager .getConnection (jdbcUrl , connProps );
102
+ conn = DriverManager .getConnection (jdbcUrl , properties );
116
103
} catch (SQLException e ) {
117
104
throw new TDengineConnectorException (
118
105
CommonErrorCodeDeprecated .READER_OPERATION_FAILED ,
119
- "get TDengine connection failed:" + jdbcUrl );
106
+ "get TDengine connection failed:" + jdbcUrl ,
107
+ e );
120
108
}
121
109
}
122
110
@@ -135,8 +123,8 @@ public void close() {
135
123
}
136
124
137
125
private void read (TDengineSourceSplit split , Collector <SeaTunnelRow > output ) throws Exception {
138
- try (Statement statement = conn .createStatement ()) {
139
- final ResultSet resultSet = statement .executeQuery (split .getQuery ());
126
+ try (Statement statement = conn .createStatement ();
127
+ ResultSet resultSet = statement .executeQuery (split .getQuery ())) {
140
128
ResultSetMetaData meta = resultSet .getMetaData ();
141
129
142
130
while (resultSet .next ()) {
@@ -151,6 +139,8 @@ private void read(TDengineSourceSplit split, Collector<SeaTunnelRow> output) thr
151
139
}
152
140
153
141
private Object convertDataType (Object object ) {
142
+ if (Objects .isNull (object )) return null ;
143
+
154
144
if (Timestamp .class .equals (object .getClass ())) {
155
145
return ((Timestamp ) object ).toLocalDateTime ();
156
146
} else if (byte [].class .equals (object .getClass ())) {
@@ -171,7 +161,8 @@ public void addSplits(List<TDengineSourceSplit> splits) {
171
161
172
162
@ Override
173
163
public void handleNoMoreSplits () {
174
- // do nothing
164
+ log .info ("no more split accepted!" );
165
+ noMoreSplit = true ;
175
166
}
176
167
177
168
@ Override
0 commit comments