-
Notifications
You must be signed in to change notification settings - Fork 220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
增加 source & sink 端kafka #179
Comments
查看airbyte 的kafka source端配置,主要有以下几项:
kafka的sink端 配置基本如 source端,配置清单:https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/destination-kafka/src/main/resources/spec.json |
kafka 发送数据样例: {
"id": "76d6d59f-5d08-41e8-a460-4ec3e03ad4d1",
"tableName": "orderinfo",
"occure_time": "1679274580528",
"event": "insert" / "update" / "delete"
"data": {
"order_id": "aabbbdddd",
"product_name": "aabbcc"
}
}
可以参考debezium 数据格式:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/formats/debezium/ |
目前已经实现了一个一个雏形,不足的是 在发送到kafka 中的update 消息目前之后 内部有两种debeziumJson和CanalJson两种格式实现:
|
已经测试通过: 采用debezium-json 格式: {
"before": null,
"after": {
"instance_id": "000102514520a464014",
"order_id": "10000015361171282085747c1f92005a",
"batch_msg": "",
"type": 0,
"ext": "{\"hitPrice\":0,\"isWait\":0,\"optionalType\":0}",
"waitinginstance_id": "",
"kind": 3,
"parent_id": "",
"pricemode": 1,
"name": "test02",
"makename": "",
"taste": "",
"spec_detail_name": "",
"num": 1,
"account_num": 1,
"unit": "?",
"account_unit": "",
"price": 1,
"member_price": 1,
"fee": 1,
"ratio": 100,
"ratio_fee": 1,
"ratio_cause": "",
"status": 2,
"kindmenu_id": "8000001564d710600164f9cd0c510172",
"kindmenu_name": "??",
"menu_id": "0",
"memo": "",
"is_ratio": 0,
"entity_id": "80000015",
"is_valid": 1,
"create_time": 1535257651481,
"op_time": 1535257670907,
"last_ver": 3,
"load_time": 1536117832,
"modify_time": 1678704159,
"draw_status": 0,
"bookmenu_id": "",
"make_id": "",
"make_price": 0,
"prodplan_id": "",
"is_wait": 0,
"specdetail_id": "",
"specdetail_price": 0,
"makeprice_mode": 1,
"original_price": "1.0",
"is_buynumber_changed": 1,
"ratio_operator_id": "",
"child_id": "",
"kind_bookmenu_id": "",
"specprice_mode": 2,
"worker_id": "05eb689d72304a8d9064b20f58953d91",
"is_backauth": 1,
"service_fee_mode": 0,
"service_fee": "0.0",
"orign_id": "",
"addition_price": 0,
"has_addition": 0,
"seat_id": ""
},
"op": "c",
"source": {
"table": "instancedetail"
},
"ts_ms": 1681615789046
} 使用canal-json 传输的消息例子: {
"data": [{
"instance_id": "000102514520a464010",
"order_id": "10000015361171282085747c1f92005a",
"batch_msg": "",
"type": 0,
"ext": "{\"hitPrice\":0,\"isWait\":0,\"optionalType\":0}",
"waitinginstance_id": "",
"kind": 3,
"parent_id": "",
"pricemode": 1,
"name": "test02",
"makename": "",
"taste": "",
"spec_detail_name": "",
"num": 1,
"account_num": 1,
"unit": "?",
"account_unit": "",
"price": 1,
"member_price": 1,
"fee": 1,
"ratio": 100,
"ratio_fee": 1,
"ratio_cause": "",
"status": 2,
"kindmenu_id": "8000001564d710600164f9cd0c510172",
"kindmenu_name": "美丽",
"menu_id": "0",
"memo": "",
"is_ratio": 0,
"entity_id": "80000015",
"is_valid": 1,
"create_time": 1535257651481,
"op_time": 1535257670907,
"last_ver": 3,
"load_time": 1536117832,
"modify_time": 1678704159,
"draw_status": 0,
"bookmenu_id": "",
"make_id": "",
"make_price": 0,
"prodplan_id": "",
"is_wait": 0,
"specdetail_id": "",
"specdetail_price": 0,
"makeprice_mode": 1,
"original_price": "1.0",
"is_buynumber_changed": 1,
"ratio_operator_id": "",
"child_id": "",
"kind_bookmenu_id": "",
"specprice_mode": 2,
"worker_id": "05eb689d72304a8d9064b20f58953d91",
"is_backauth": 1,
"service_fee_mode": 0,
"service_fee": "0.0",
"orign_id": "",
"addition_price": 0,
"has_addition": 0,
"seat_id": ""
}],
"type": "INSERT",
"table": "instancedetail",
"ts": 1681616142062
} |
TIS 开源平台中 没有实现 kafks端。请求实现。谢谢
The text was updated successfully, but these errors were encountered: