Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

配置 'connector' = 'kudu' 写入数据异常 #8

Closed
zhiyuan192shine opened this issue Aug 20, 2021 · 13 comments
Closed

配置 'connector' = 'kudu' 写入数据异常 #8

zhiyuan192shine opened this issue Aug 20, 2021 · 13 comments
Assignees
Labels
bug Something isn't working

Comments

@zhiyuan192shine
Copy link

我进行如下配置,不能写入数据:
'connector' = 'kudu',
'kudu.masters' = 'xxx1,xxx2,xxx3',
'kudu.table' = 'impala::g2link_stream.kudu_g2park_inout_record',
'kudu.hash-columns' = 'park_code',
'kudu.primary-key-columns' = 'inout_id'

异常如下:
Row error for primary key="1704794203843944449", tablet=null, server=32df83c9ff4a472cb5f0d0abaf1c3c56, status=Not found: key not found (error 0)
Row error for primary key="1704794203843944449", tablet=null, server=32df83c9ff4a472cb5f0d0abaf1c3c56, status=Not found: key not found (error 0)

at org.colloh.flink.kudu.connector.internal.failure.DefaultKuduFailureHandler.onFailure(DefaultKuduFailureHandler.java:37) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.checkAsyncErrors(KuduWriter.java:156) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
at org.colloh.flink.kudu.connector.internal.writer.KuduWriter.write(KuduWriter.java:97) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
at org.colloh.flink.kudu.connector.table.sink.KuduSink.invoke(KuduSink.java:93) ~[flink-connector-kudu_2.11-1.2.1.jar:?]
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49) ~[flink-dist_2.11-1.12.2.jar:1.12.2]
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:72) ~[flink-table-blink_2.11-1.12.2.jar:1.12.2]
@collabH
Copy link
Owner

collabH commented Aug 21, 2021

写表应该也无需ddl,创建kudu catalog即可,你试试直接使用kudu catalog插入试试,这个问题以前我们遇到是因为update的原因(先找primary key发现找不到就报这个错误,可以讲kudu版本升到1.12.0,支持了update找不到就忽略的配置,这个我的代码里应该已经兼容了)

@collabH
Copy link
Owner

collabH commented Aug 21, 2021

@zhiyuan192shine
Copy link
Author

写表应该也无需ddl,创建kudu catalog即可,你试试直接使用kudu catalog插入试试,这个问题以前我们遇到是因为update的原因(先找primary key发现找不到就报这个错误,可以讲kudu版本升到1.12.0,支持了update找不到就忽略的配置,这个我的代码里应该已经兼容了)

我也发现这个问题了,但是目前我们集群依赖的CDH6,升级就很难了,也不会让我们轻易升级。

@collabH
Copy link
Owner

collabH commented Aug 21, 2021

这块我们当时使用的是监听binlog,如果binlog数据脏了比如只有update没有insert的就会出现这个问题,我们处理方式只能是暴力的在全量重写采集一下这个表。

@zhiyuan192shine
Copy link
Author

我现在是把你代码中的 UpsertOperationMapper 中的 createBaseOperation 全部换成了 table.newUpsert(),先全部把Kafka binlog数据加载进来。 请问能否做个兼容低版本的Kudu update

@collabH
Copy link
Owner

collabH commented Aug 21, 2021

public Optional<Operation> createBaseOperation(RowData input, KuduTable table) {
return Optional.of(input.getRowKind().equals(RowKind.INSERT)
|| input.getRowKind().equals(RowKind.UPDATE_AFTER) ? table.newUpsert() : table.newDelete());

主要问题应该是这里,使用flink采集的binlog是react stream,会有update before和update after 和insert、delete,我当时想法是kudu是支持主键upsert的所以只考虑了update after的数据,这样出来update before就回去删除,我这里考虑下这里拆分的更细,我去改一下。

@collabH
Copy link
Owner

collabH commented Aug 21, 2021

我现在是把你代码中的 UpsertOperationMapper 中的 createBaseOperation 全部换成了 table.newUpsert(),先全部把Kafka binlog数据加载进来。 请问能否做个兼容低版本的Kudu update

UpsertOperationMapper这个可以不用懂,因为你用的配置的是connector=kudu,这样只会用我重写的动态数据源,动态数据源使用的是RowDataUpsertOperationMapper这个,这个我改下看看有无问题,不过我这里没办法测试目前,本地没有kudu集群。

@zhiyuan192shine
Copy link
Author

public Optional<Operation> createBaseOperation(RowData input, KuduTable table) {
return Optional.of(input.getRowKind().equals(RowKind.INSERT)
|| input.getRowKind().equals(RowKind.UPDATE_AFTER) ? table.newUpsert() : table.newDelete());

主要问题应该是这里,使用flink采集的binlog是react stream,会有update before和update after 和insert、delete,我当时想法是kudu是支持主键upsert的所以只考虑了update after的数据,这样出来update before就回去删除,我这里考虑下这里拆分的更细,我去改一下。

我就是改的这里面的,你改了后,我可以帮忙测试,我这边可以连公司的集群

@collabH
Copy link
Owner

collabH commented Aug 21, 2021

public Optional<Operation> createBaseOperation(RowData input, KuduTable table) {
return Optional.of(input.getRowKind().equals(RowKind.INSERT)
|| input.getRowKind().equals(RowKind.UPDATE_AFTER) ? table.newUpsert() : table.newDelete());

主要问题应该是这里,使用flink采集的binlog是react stream,会有update before和update after 和insert、delete,我当时想法是kudu是支持主键upsert的所以只考虑了update after的数据,这样出来update before就回去删除,我这里考虑下这里拆分的更细,我去改一下。

我就是改的这里面的,你改了后,我可以帮忙测试,我这边可以连公司的集群

👌🏻

@collabH
Copy link
Owner

collabH commented Aug 21, 2021

拉下代码吧,可以看下readme,这样改造后存在一些问题,不过delete的场景应该不多,个人建议还是升级kudu合适一些。

@collabH
Copy link
Owner

collabH commented Aug 21, 2021

注意使用tag: v.1.0.0版本

@collabH collabH self-assigned this Aug 21, 2021
@collabH collabH added the bug Something isn't working label Aug 21, 2021
@zhiyuan192shine
Copy link
Author

拉下代码吧,可以看下readme,这样改造后存在一些问题,不过delete的场景应该不多,个人建议还是升级kudu合适一些。

我用 tagv1.0.0 打包后,还是出现 status=Not found: key not found (error 0)

@collabH
Copy link
Owner

collabH commented Aug 21, 2021

if (operation instanceof Delete) {
KuduScanner.KuduScannerBuilder scannerBuilder = client.newScannerBuilder(table);
// 找到主键的index
List<ColumnSchema> primaryKeyColumns = table.getSchema().getPrimaryKeyColumns();
for (ColumnSchema primaryKeyColumn : primaryKeyColumns) {
int primaryKeyColumnIndex = table.getSchema().getColumnIndex(primaryKeyColumn.getName());
Object value = operationMapper.getField(input, primaryKeyColumnIndex);
scannerBuilder.addPredicate(KuduPredicate.newComparisonPredicate(primaryKeyColumn,
KuduPredicate.ComparisonOp.EQUAL, value));
}
KuduScanner scanner = scannerBuilder.build();
// 如果根据主键查不到数据则不需要delete
if (!scanner.hasMoreRows()) {
return;
}
}

这块debug下看看,not key我理解只会存在在删除的情况下,我删除回去根据组件scanner一次,如果差不多就不会删除理论上不会有not key问题了。

@collabH collabH closed this as completed Aug 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants