Skip to content

Commit a17dd7a

Browse files
authored
[Improve] Code clean for AmazonDynamoDB connector (#5791)
1 parent 37fcff3 commit a17dd7a

File tree

4 files changed

+11
-48
lines changed

4 files changed

+11
-48
lines changed

seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/exception/AmazonDynamoDBConnectorException.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,4 @@ public AmazonDynamoDBConnectorException(
2525
SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
2626
super(seaTunnelErrorCode, errorMessage);
2727
}
28-
29-
public AmazonDynamoDBConnectorException(
30-
SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
31-
super(seaTunnelErrorCode, errorMessage, cause);
32-
}
33-
34-
public AmazonDynamoDBConnectorException(
35-
SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
36-
super(seaTunnelErrorCode, cause);
37-
}
3828
}

seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class AmazonDynamoDBWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
3535
public AmazonDynamoDBWriter(
3636
AmazonDynamoDBSourceOptions amazondynamodbSourceOptions,
3737
SeaTunnelRowType seaTunnelRowType) {
38-
dynamoDbSinkClient = new DynamoDbSinkClient(amazondynamodbSourceOptions, seaTunnelRowType);
38+
dynamoDbSinkClient = new DynamoDbSinkClient(amazondynamodbSourceOptions);
3939
serializer =
4040
new DefaultSeaTunnelRowSerializer(seaTunnelRowType, amazondynamodbSourceOptions);
4141
}

seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
1919

20-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
21-
import org.apache.seatunnel.common.exception.CommonErrorCode;
2220
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
23-
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.exception.AmazonDynamoDBConnectorException;
24-
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer;
25-
import org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer;
2621

2722
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
2823
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -33,7 +28,6 @@
3328
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
3429
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
3530

36-
import java.io.IOException;
3731
import java.net.URI;
3832
import java.util.ArrayList;
3933
import java.util.HashMap;
@@ -43,16 +37,12 @@
4337
public class DynamoDbSinkClient {
4438
private final AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
4539
private volatile boolean initialize;
46-
private volatile Exception flushException;
4740
private DynamoDbClient dynamoDbClient;
4841
private final List<WriteRequest> batchList;
49-
protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
5042

51-
public DynamoDbSinkClient(
52-
AmazonDynamoDBSourceOptions amazondynamodbSourceOptions, SeaTunnelRowType typeInfo) {
43+
public DynamoDbSinkClient(AmazonDynamoDBSourceOptions amazondynamodbSourceOptions) {
5344
this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
5445
this.batchList = new ArrayList<>();
55-
this.seaTunnelRowDeserializer = new DefaultSeaTunnelRowDeserializer(typeInfo);
5646
}
5747

5848
private void tryInit() {
@@ -74,9 +64,8 @@ private void tryInit() {
7464
initialize = true;
7565
}
7666

77-
public synchronized void write(PutItemRequest putItemRequest) throws IOException {
67+
public synchronized void write(PutItemRequest putItemRequest) {
7868
tryInit();
79-
checkFlushException();
8069
batchList.add(
8170
WriteRequest.builder()
8271
.putRequest(PutRequest.builder().item(putItemRequest.item()).build())
@@ -87,15 +76,14 @@ public synchronized void write(PutItemRequest putItemRequest) throws IOException
8776
}
8877
}
8978

90-
public synchronized void close() throws IOException {
79+
public synchronized void close() {
9180
if (dynamoDbClient != null) {
9281
flush();
9382
dynamoDbClient.close();
9483
}
9584
}
9685

9786
synchronized void flush() {
98-
checkFlushException();
9987
if (batchList.isEmpty()) {
10088
return;
10189
}
@@ -106,13 +94,4 @@ synchronized void flush() {
10694

10795
batchList.clear();
10896
}
109-
110-
private void checkFlushException() {
111-
if (flushException != null) {
112-
throw new AmazonDynamoDBConnectorException(
113-
CommonErrorCode.FLUSH_DATA_FAILED,
114-
"Flush data to AmazonDynamoDB failed.",
115-
flushException);
116-
}
117-
}
11897
}

seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,12 @@
3030
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
3131
import software.amazon.awssdk.regions.Region;
3232
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
33-
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
3433
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
3534
import software.amazon.awssdk.services.dynamodb.paginators.ScanIterable;
3635

37-
import java.io.IOException;
3836
import java.net.URI;
3937
import java.util.ArrayList;
4038
import java.util.List;
41-
import java.util.Map;
4239
import java.util.Queue;
4340
import java.util.concurrent.ConcurrentLinkedDeque;
4441

@@ -64,7 +61,7 @@ public AmazonDynamoDBSourceReader(
6461
}
6562

6663
@Override
67-
public void open() throws Exception {
64+
public void open() {
6865
dynamoDbClient =
6966
DynamoDbClient.builder()
7067
.endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
@@ -80,27 +77,26 @@ public void open() throws Exception {
8077
}
8178

8279
@Override
83-
public void close() throws IOException {
80+
public void close() {
8481
dynamoDbClient.close();
8582
}
8683

8784
@Override
8885
@SuppressWarnings("magicnumber")
89-
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
86+
public void pollNext(Collector<SeaTunnelRow> output) {
9087
while (!pendingSplits.isEmpty()) {
9188
synchronized (output.getCheckpointLock()) {
9289
AmazonDynamoDBSourceSplit split = pendingSplits.poll();
93-
9490
read(split, output);
9591
}
9692
}
97-
if (pendingSplits.isEmpty() && noMoreSplit) {
93+
if (noMoreSplit && pendingSplits.isEmpty()) {
9894
context.signalNoMoreElement();
9995
}
10096
}
10197

10298
@Override
103-
public List<AmazonDynamoDBSourceSplit> snapshotState(long checkpointId) throws Exception {
99+
public List<AmazonDynamoDBSourceSplit> snapshotState(long checkpointId) {
104100
return new ArrayList<>(pendingSplits);
105101
}
106102

@@ -115,9 +111,7 @@ public void handleNoMoreSplits() {
115111
noMoreSplit = true;
116112
}
117113

118-
private void read(AmazonDynamoDBSourceSplit split, Collector<SeaTunnelRow> output)
119-
throws Exception {
120-
Map<String, AttributeValue> lastKeyEvaluated = null;
114+
private void read(AmazonDynamoDBSourceSplit split, Collector<SeaTunnelRow> output) {
121115
ScanIterable scan;
122116
ScanRequest scanRequest =
123117
ScanRequest.builder()
@@ -145,5 +139,5 @@ private void read(AmazonDynamoDBSourceSplit split, Collector<SeaTunnelRow> outpu
145139
}
146140

147141
@Override
148-
public void notifyCheckpointComplete(long checkpointId) throws Exception {}
142+
public void notifyCheckpointComplete(long checkpointId) {}
149143
}

0 commit comments

Comments
 (0)