Skip to content

Commit

Permalink
[Improve][Connector-V2]Kudu Sink Connector Support to upsert row
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaomin1423 committed Sep 26, 2022
1 parent 50c08f5 commit 1ece805
Showing 1 changed file with 48 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Upsert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,26 +46,25 @@ public class KuduOutputFormat

private static final Logger LOGGER = LoggerFactory.getLogger(KuduOutputFormat.class);

private String kuduMaster;
private String kuduTableName;
public static final long TIMEOUTMS = 18000;
public static final long SESSIONTIMEOUTMS = 100000;

private final String kuduMaster;
private final String kuduTableName;
private final KuduSinkConfig.SaveMode saveMode;
private KuduClient kuduClient;
private KuduSession kuduSession;
private KuduTable kuduTable;
public static final long TIMEOUTMS = 18000;
public static final long SESSIONTIMEOUTMS = 100000;

public KuduOutputFormat(KuduSinkConfig kuduSinkConfig) {
this.kuduMaster = kuduSinkConfig.getKuduMaster();
this.kuduTableName = kuduSinkConfig.getKuduTableName();
this.saveMode = kuduSinkConfig.getSaveMode();
init();
}

public void write(SeaTunnelRow element) {

Insert insert = kuduTable.newInsert();
Schema schema = kuduTable.getSchema();

private void transform(PartialRow row, SeaTunnelRow element, Schema schema) {
int columnCount = schema.getColumnCount();
PartialRow row = insert.getRow();
for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
ColumnSchema col = schema.getColumnByIndex(columnIndex);
try {
Expand Down Expand Up @@ -114,24 +114,54 @@ public void write(SeaTunnelRow element) {
throw new IllegalArgumentException("Unsupported column type: " + col.getType());
}
} catch (ClassCastException e) {
e.printStackTrace();
throw new IllegalArgumentException(
"Value type does not match column type " + col.getType() +
" for column " + col.getName());
}

}
}

private void upsert(SeaTunnelRow element) {
Upsert upsert = kuduTable.newUpsert();
Schema schema = kuduTable.getSchema();
PartialRow row = upsert.getRow();
transform(row, element, schema);
try {
kuduSession.apply(upsert);
} catch (KuduException e) {
LOGGER.error("Failed to upsert.", e);
throw new RuntimeException("Failed to upsert.", e);
}
}

private void insert(SeaTunnelRow element) {
Insert insert = kuduTable.newInsert();
Schema schema = kuduTable.getSchema();
PartialRow row = insert.getRow();
transform(row, element, schema);
try {
kuduSession.apply(insert);
} catch (KuduException e) {
LOGGER.warn("kudu session insert data fail.", e);
throw new RuntimeException("kudu session insert data fail.", e);
LOGGER.error("Failed to insert.", e);
throw new RuntimeException("Failed to insert.", e);
}
}

public void write(SeaTunnelRow element) {
switch (saveMode) {
case APPEND:
insert(element);
break;
case OVERWRITE:
upsert(element);
break;
default:
throw new IllegalArgumentException(String.format("Unsupported saveMode: %s.", saveMode.name()));
}
}

public void init() {
private void init() {
KuduClient.KuduClientBuilder kuduClientBuilder = new
KuduClient.KuduClientBuilder(kuduMaster);
kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
Expand All @@ -142,19 +172,19 @@ public void init() {
try {
kuduTable = kuduClient.openTable(kuduTableName);
} catch (KuduException e) {
LOGGER.warn("Failed to initialize the Kudu client.", e);
LOGGER.error("Failed to initialize the Kudu client.", e);
throw new RuntimeException("Failed to initialize the Kudu client.", e);
}
LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
LOGGER.info("The Kudu client for Master: {} is initialized successfully.", kuduMaster);
}

public void closeOutputFormat() {
if (kuduClient != null) {
try {
kuduClient.close();
kuduSession.close();
} catch (KuduException e) {
LOGGER.warn("Kudu Client close failed.", e);
} catch (KuduException ignored) {
LOGGER.warn("Failed to close Kudu Client.", ignored);
} finally {
kuduClient = null;
kuduSession = null;
Expand Down

0 comments on commit 1ece805

Please sign in to comment.