Skip to content

Commit 1ece805

Browse files
authored
[Improve][Connector-V2]Kudu Sink Connector Support to upsert row
1 parent 50c08f5 commit 1ece805

File tree

1 file changed

+48
-18
lines changed
  • seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient

1 file changed

+48
-18
lines changed

seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.kudu.client.KuduTable;
3030
import org.apache.kudu.client.PartialRow;
3131
import org.apache.kudu.client.SessionConfiguration;
32+
import org.apache.kudu.client.Upsert;
3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
3435

@@ -45,26 +46,25 @@ public class KuduOutputFormat
4546

4647
private static final Logger LOGGER = LoggerFactory.getLogger(KuduOutputFormat.class);
4748

48-
private String kuduMaster;
49-
private String kuduTableName;
49+
public static final long TIMEOUTMS = 18000;
50+
public static final long SESSIONTIMEOUTMS = 100000;
51+
52+
private final String kuduMaster;
53+
private final String kuduTableName;
54+
private final KuduSinkConfig.SaveMode saveMode;
5055
private KuduClient kuduClient;
5156
private KuduSession kuduSession;
5257
private KuduTable kuduTable;
53-
public static final long TIMEOUTMS = 18000;
54-
public static final long SESSIONTIMEOUTMS = 100000;
58+
5559
public KuduOutputFormat(KuduSinkConfig kuduSinkConfig) {
5660
this.kuduMaster = kuduSinkConfig.getKuduMaster();
5761
this.kuduTableName = kuduSinkConfig.getKuduTableName();
62+
this.saveMode = kuduSinkConfig.getSaveMode();
5863
init();
5964
}
6065

61-
public void write(SeaTunnelRow element) {
62-
63-
Insert insert = kuduTable.newInsert();
64-
Schema schema = kuduTable.getSchema();
65-
66+
private void transform(PartialRow row, SeaTunnelRow element, Schema schema) {
6667
int columnCount = schema.getColumnCount();
67-
PartialRow row = insert.getRow();
6868
for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
6969
ColumnSchema col = schema.getColumnByIndex(columnIndex);
7070
try {
@@ -114,24 +114,54 @@ public void write(SeaTunnelRow element) {
114114
throw new IllegalArgumentException("Unsupported column type: " + col.getType());
115115
}
116116
} catch (ClassCastException e) {
117-
e.printStackTrace();
118117
throw new IllegalArgumentException(
119118
"Value type does not match column type " + col.getType() +
120119
" for column " + col.getName());
121120
}
122121

123122
}
123+
}
124124

125+
private void upsert(SeaTunnelRow element) {
126+
Upsert upsert = kuduTable.newUpsert();
127+
Schema schema = kuduTable.getSchema();
128+
PartialRow row = upsert.getRow();
129+
transform(row, element, schema);
130+
try {
131+
kuduSession.apply(upsert);
132+
} catch (KuduException e) {
133+
LOGGER.error("Failed to upsert.", e);
134+
throw new RuntimeException("Failed to upsert.", e);
135+
}
136+
}
137+
138+
private void insert(SeaTunnelRow element) {
139+
Insert insert = kuduTable.newInsert();
140+
Schema schema = kuduTable.getSchema();
141+
PartialRow row = insert.getRow();
142+
transform(row, element, schema);
125143
try {
126144
kuduSession.apply(insert);
127145
} catch (KuduException e) {
128-
LOGGER.warn("kudu session insert data fail.", e);
129-
throw new RuntimeException("kudu session insert data fail.", e);
146+
LOGGER.error("Failed to insert.", e);
147+
throw new RuntimeException("Failed to insert.", e);
130148
}
149+
}
131150

151+
public void write(SeaTunnelRow element) {
152+
switch (saveMode) {
153+
case APPEND:
154+
insert(element);
155+
break;
156+
case OVERWRITE:
157+
upsert(element);
158+
break;
159+
default:
160+
throw new IllegalArgumentException(String.format("Unsupported saveMode: %s.", saveMode.name()));
161+
}
132162
}
133163

134-
public void init() {
164+
private void init() {
135165
KuduClient.KuduClientBuilder kuduClientBuilder = new
136166
KuduClient.KuduClientBuilder(kuduMaster);
137167
kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
@@ -142,19 +172,19 @@ public void init() {
142172
try {
143173
kuduTable = kuduClient.openTable(kuduTableName);
144174
} catch (KuduException e) {
145-
LOGGER.warn("Failed to initialize the Kudu client.", e);
175+
LOGGER.error("Failed to initialize the Kudu client.", e);
146176
throw new RuntimeException("Failed to initialize the Kudu client.", e);
147177
}
148-
LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
178+
LOGGER.info("The Kudu client for Master: {} is initialized successfully.", kuduMaster);
149179
}
150180

151181
public void closeOutputFormat() {
152182
if (kuduClient != null) {
153183
try {
154184
kuduClient.close();
155185
kuduSession.close();
156-
} catch (KuduException e) {
157-
LOGGER.warn("Kudu Client close failed.", e);
186+
} catch (KuduException ignored) {
187+
LOGGER.warn("Failed to close Kudu Client.", ignored);
158188
} finally {
159189
kuduClient = null;
160190
kuduSession = null;

0 commit comments

Comments
 (0)