Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2]Kudu Sink Connector Support to upsert row #2881

Merged
merged 1 commit into from
Sep 26, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Upsert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,26 +46,25 @@ public class KuduOutputFormat

private static final Logger LOGGER = LoggerFactory.getLogger(KuduOutputFormat.class);

private String kuduMaster;
private String kuduTableName;
public static final long TIMEOUTMS = 18000;
public static final long SESSIONTIMEOUTMS = 100000;

private final String kuduMaster;
private final String kuduTableName;
private final KuduSinkConfig.SaveMode saveMode;
private KuduClient kuduClient;
private KuduSession kuduSession;
private KuduTable kuduTable;
public static final long TIMEOUTMS = 18000;
public static final long SESSIONTIMEOUTMS = 100000;

public KuduOutputFormat(KuduSinkConfig kuduSinkConfig) {
this.kuduMaster = kuduSinkConfig.getKuduMaster();
this.kuduTableName = kuduSinkConfig.getKuduTableName();
this.saveMode = kuduSinkConfig.getSaveMode();
init();
}

public void write(SeaTunnelRow element) {

Insert insert = kuduTable.newInsert();
Schema schema = kuduTable.getSchema();

private void transform(PartialRow row, SeaTunnelRow element, Schema schema) {
int columnCount = schema.getColumnCount();
PartialRow row = insert.getRow();
for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
ColumnSchema col = schema.getColumnByIndex(columnIndex);
try {
Expand Down Expand Up @@ -114,24 +114,54 @@ public void write(SeaTunnelRow element) {
throw new IllegalArgumentException("Unsupported column type: " + col.getType());
}
} catch (ClassCastException e) {
e.printStackTrace();
throw new IllegalArgumentException(
"Value type does not match column type " + col.getType() +
" for column " + col.getName());
}

}
}

private void upsert(SeaTunnelRow element) {
Upsert upsert = kuduTable.newUpsert();
Schema schema = kuduTable.getSchema();
PartialRow row = upsert.getRow();
transform(row, element, schema);
try {
kuduSession.apply(upsert);
} catch (KuduException e) {
LOGGER.error("Failed to upsert.", e);
throw new RuntimeException("Failed to upsert.", e);
}
}

private void insert(SeaTunnelRow element) {
Insert insert = kuduTable.newInsert();
Schema schema = kuduTable.getSchema();
PartialRow row = insert.getRow();
transform(row, element, schema);
try {
kuduSession.apply(insert);
} catch (KuduException e) {
LOGGER.warn("kudu session insert data fail.", e);
throw new RuntimeException("kudu session insert data fail.", e);
LOGGER.error("Failed to insert.", e);
throw new RuntimeException("Failed to insert.", e);
}
}

public void write(SeaTunnelRow element) {
switch (saveMode) {
case APPEND:
insert(element);
break;
case OVERWRITE:
upsert(element);
break;
default:
throw new IllegalArgumentException(String.format("Unsupported saveMode: %s.", saveMode.name()));
}
}

public void init() {
private void init() {
KuduClient.KuduClientBuilder kuduClientBuilder = new
KuduClient.KuduClientBuilder(kuduMaster);
kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
Expand All @@ -142,19 +172,19 @@ public void init() {
try {
kuduTable = kuduClient.openTable(kuduTableName);
} catch (KuduException e) {
LOGGER.warn("Failed to initialize the Kudu client.", e);
LOGGER.error("Failed to initialize the Kudu client.", e);
throw new RuntimeException("Failed to initialize the Kudu client.", e);
}
LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
LOGGER.info("The Kudu client for Master: {} is initialized successfully.", kuduMaster);
}

public void closeOutputFormat() {
if (kuduClient != null) {
try {
kuduClient.close();
kuduSession.close();
} catch (KuduException e) {
LOGGER.warn("Kudu Client close failed.", e);
} catch (KuduException ignored) {
LOGGER.warn("Failed to close Kudu Client.", ignored);
} finally {
kuduClient = null;
kuduSession = null;
Expand Down