Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ksqlDB engine does not infer the Struct type correctly from protobuf schema #7642

Closed
cdadia opened this issue Jun 6, 2021 · 8 comments
Closed
Assignees
Labels
bug P0 Denotes must-have for a given milestone streaming-engine Tickets owned by the ksqlDB Streaming Team
Milestone

Comments

@cdadia
Copy link

cdadia commented Jun 6, 2021

Describe the bug
ksqlDB infers incorrect Struct type for fields in the stream collection for topic with a registered protobuf schema. In the described example, the correct type should have been StringValue but it is inferred as VARCHAR(STRING)

In this example, it happens to be a type defined in the reference from google/protobuf/wrappers.proto but I suspect this is true for any Struct type defined in a reference.

To Reproduce
Steps to reproduce the behavior, include:

  1. The version of KSQ: 0.18.0.
  2. Any SQL statements you ran:
    Executed the statements in the following order

Create a topic

  kafka-topics \
  --bootstrap-server kafka:29092 \
  --topic test_topic \
  --replication-factor 1 \
  --partitions 1 \
  --config cleanup.policy=compact \
  --create

Register the schema with schema registry for reference paymentEnums.proto

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "syntax = \"proto3\"; package paymentEnums; enum PaymentStatus { CANCELLED = 0; AUTHORIZED = 1; CAPTURED = 2; PARTIALLY_REFUNDED = 3; REFUNDED = 4; } enum TenderType { VENMO = 0; STRIPE = 1; } enum Action { ORDER_CAPTURE_SUCCESS = 0; ORDER_CAPTURE_FAILURE = 1;}", "schemaType": "PROTOBUF"}' \
http://localhost:8081/subjects/paymentEnums.proto/versions

Register the schema with schema registry for test_topic-value

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "syntax = \"proto3\"; import \"google/protobuf/wrappers.proto\"; import \"paymentEnums.proto\"; package paymentSerdes; message PaymentTransaction { string id = 1; int64 date = 2; paymentEnums.Action action = 3; paymentEnums.TenderType tender_type = 4; google.protobuf.StringValue reason_code = 5; string order_id = 6; string profile_id = 7; string payment_method_id = 8; google.protobuf.StringValue charge_id = 9; google.protobuf.StringValue refund_id = 10; string payment_intent_id = 11; int64 amount = 12; google.protobuf.StringValue transaction_id = 13; map<string, string> metadata = 14; google.protobuf.StringValue order_item_id = 15; }", "schemaType": "PROTOBUF", "references": [ { "name": "google/protobuf/timestamp.proto", "subject":  "google/protobuf/timestamp.proto", "version": 1 }, { "name": "google/protobuf/wrappers.proto", "subject":  "google/protobuf/wrappers.proto", "version": 1 }, { "name": "paymentEnums.proto", "subject":  "paymentEnums.proto", "version": 1 } ]}' \
http://localhost:8081/subjects/test_topic-value/versions

Check schema was registered correctly:

curl -s -XGET localhost:8081/subjects/

Start a ksqlDB cli client and execute the following steps:
Set offset to earliest

SET 'auto.offset.reset'='earliest';

Create the stream

create stream test (key STRING KEY) WITH (
    KAFKA_TOPIC='test_topic',
    KEY_FORMAT='KAFKA',
    VALUE_FORMAT='PROTOBUF'
);

List all the streams

LIST STREAMS;

Describe the stream

DESCRIBE test;

Open separate ksqldb-cli client sessions and watch for processing results
client session 1:

SET 'auto.offset.reset'='earliest';
SELECT * FROM KSQL_PROCESSING_LOG EMIT CHANGES;

client session 2:

SET 'auto.offset.reset'='earliest';
PRINT test_topic;

client session 3:

SET 'auto.offset.reset'='earliest';
SELECT * FROM TEST EMIT CHANGES;

Expected behavior

  • Expected types for reason_code, charge_id, refund_id, transaction_id and order_item_id should be StringValue but it is inferred as VARCHAR(STRING)
  • Stream collection to process messages from the source topic

Actual behaviour

  1. list topics:
ksql> list topics;

 Kafka Topic                                                 | Partitions | Partition Replicas
-----------------------------------------------------------------------------------------------
 local_ksql_processing_log                        | 1               | 1
 test_topic                                                    | 1               | 1
-----------------------------------------------------------------------------------------------
  1. Register schemas
› curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "syntax = \"proto3\"; import \"google/protobuf/wrappers.proto\"; import \"paymentEnums.proto\"; package paymentSerdes; message PaymentTransaction { string id = 1; int64 date = 2; paymentEnums.Action action = 3; paymentEnums.TenderType tender_type = 4; google.protobuf.StringValue reason_code = 5; string order_id = 6; string profile_id = 7; string payment_method_id = 8; google.protobuf.StringValue charge_id = 9; google.protobuf.StringValue refund_id = 10; string payment_intent_id = 11; int64 amount = 12; google.protobuf.StringValue transaction_id = 13; map<string, string> metadata = 14; google.protobuf.StringValue order_item_id = 15; }", "schemaType": "PROTOBUF", "references": [ { "name": "google/protobuf/timestamp.proto", "subject":  "google/protobuf/timestamp.proto", "version": 1 }, { "name": "google/protobuf/wrappers.proto", "subject":  "google/protobuf/wrappers.proto", "version": 1 }, { "name": "paymentEnums.proto", "subject":  "paymentEnums.proto", "version": 1 } ]}' \
http://localhost:8081/subjects/test_topic-value/versions
{"id":4}%
  1. List all registered subjects
› curl -s -XGET localhost:8081/subjects/

["test_topic-value","paymentEnums.proto"]%
  1. Create the stream
ksql> create stream test (key STRING KEY) WITH (
>    KAFKA_TOPIC='test_topic',
>    KEY_FORMAT='KAFKA',
>    VALUE_FORMAT='PROTOBUF'
>);

 Message
----------------
 Stream created
----------------
  1. List all streams
ksql> list streams;

 Stream Name                        | Kafka Topic                               | Key Format | Value Format   | Windowed
---------------------------------------------------------------------------------------------------
 KSQL_PROCESSING_LOG  | local_ksql_processing_log      | KAFKA         | JSON               | false
 TEST                                     | test_topic                                   | KAFKA         | PROTOBUF     | false
----------------------------------------------------------------------------------------------
  1. Describe the stream
ksql> describe test;

Name                                     : TEST
 Field                                      | Type
--------------------------------------------------
 KEY                                       | VARCHAR(STRING)  (key)
 ID                                          | VARCHAR(STRING)
 DATE                                     | BIGINT
 ACTION                                | VARCHAR(STRING)
 TENDER_TYPE                    | VARCHAR(STRING)
 REASON_CODE                  | VARCHAR(STRING)
 ORDER_ID                           | VARCHAR(STRING)
 PROFILE_ID                        | VARCHAR(STRING)
 PAYMENT_METHOD_ID    | VARCHAR(STRING)
 CHARGE_ID                        | VARCHAR(STRING)
 REFUND_ID                        | VARCHAR(STRING)
 PAYMENT_INTENT_ID       | VARCHAR(STRING)
 AMOUNT                            | BIGINT
 TRANSACTION_ID             | VARCHAR(STRING)
 METADATA                         | MAP<STRING, VARCHAR(STRING)>
 ORDER_ITEM_ID               | VARCHAR(STRING)
--------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
  1. Produce some events on to the topic and watch for processing results
    client session 1:
ksql> select * from ksql_processing_log emit changes;
+--------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+
|LOGGER                                |LEVEL                                 |TIME                                  |MESSAGE                               |
+--------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+

Press CTRL-C to interrupt

client session 2:

ksql> PRINT test_topic;

rowtime: 2021/06/06 01:05:18.915 Z, key: [24f78cb1-c01f-4e98-bbcd-40ff@3832623980245825848/-], value: tender_type: STRIPE reason_code { value: "9" } order_id: "1180a95b-10da-4ce5-9870-b826c036050f" profile_id: "6961cd3d-31c9-4525-aeef-5256a87759ef" payment_method_id: "b6563578-6b0e-4ff0-8056-06d0a4fea81d" charge_id { value: "16ee3df1-f0ba-4941-9231-d3af98f87537" } refund_id { value: "1b2217dc-2380-4114-9b36-0cb18eead93e" } payment_intent_id: "qa-journal-009" amount: 1000 transaction_id { value: "qa-journal-009" }, partition: 0
rowtime: 2021/06/06 01:05:18.933 Z, key: [33d7b2ec-565c-4bfa-827b-f85f@3474589991733179745/-], value: tender_type: STRIPE reason_code { value: "10" } order_id: "6269da81-0244-42ec-87b9-40faebd1c82f" profile_id: "02b326a4-c112-4877-8d4e-1b59d8cfedfe" payment_method_id: "55ee5549-3e45-45af-8e0e-2c9a0d11c28d" charge_id { value: "948023ac-7103-4cf4-bd76-89c624c62694" } refund_id { value: "dd31270b-ca84-462b-9bd0-7bff3c5b76b7" } payment_intent_id: "qa-journal-0010" amount: 1000 transaction_id { value: "qa-journal-0010" }, partition: 0

Press CTRL-C to interrupt

client session 3:

ksql> SELECT * FROM TEST EMIT CHANGES;

+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|KEY     |ID      |DATE    |ACTION  |TENDER_T|REASON_C|ORDER_ID|PROFILE_|PAYMENT_|CHARGE_I|REFUND_I|PAYMENT_|AMOUNT  |TRANSACT|METADATA|ORDER_IT|
|        |        |        |        |YPE     |ODE     |        |ID      |METHOD_I|D       |D       |INTENT_I|        |ION_ID  |        |EM_ID   |
|        |        |        |        |        |        |        |        |D       |        |        |D       |        |        |        |        |
+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+

Press CTRL-C to interrupt

Additional context
Google wrapper proto file:
https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/wrappers.proto

Also, used this documentation as a reference to check if nested fields are supported in ksqlDB
https://docs.ksqldb.io/en/0.8.x-ksqldb/operate-and-deploy/installation/server-config/avro-schema/#supported-functionality

@mikebin
Copy link

mikebin commented Jun 7, 2021

Also tested and reproduced the issue with a trimmed-down schema:

syntax = "proto3";
package com.acme;

import "google/protobuf/wrappers.proto";

option java_package = "com.acme";

message MyMessage {
  string value = 1;
  google.protobuf.StringValue wrappedValue = 2;
}
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schemaType":"PROTOBUF","schema":"syntax = \"proto3\";\npackage google.protobuf;\n\noption csharp_namespace = \"Google.Protobuf.WellKnownTypes\";\noption cc_enable_arenas = true;\noption go_package = \"google.golang.org/protobuf/types/known/wrapperspb\";\noption java_package = \"com.google.protobuf\";\noption java_outer_classname = \"WrappersProto\";\noption java_multiple_files = true;\noption objc_class_prefix = \"GPB\";\n\nmessage DoubleValue {\n  // The double value.\n  double value = 1;\n}\nmessage FloatValue {\n  // The float value.\n  float value = 1;\n}\nmessage Int64Value {\n  // The int64 value.\n  int64 value = 1;\n}\nmessage UInt64Value {\n  // The uint64 value.\n  uint64 value = 1;\n}\nmessage Int32Value {\n  // The int32 value.\n  int32 value = 1;\n}\nmessage UInt32Value {\n  // The uint32 value.\n  uint32 value = 1;\n}\nmessage BoolValue {\n  // The bool value.\n  bool value = 1;\n}\nmessage StringValue {\n  // The string value.\n  string value = 1;\n}\nmessage BytesValue {\n  // The bytes value.\n  bytes value = 1;\n}\n"}' \
http://localhost:8081/subjects/google%2Fprotobuf%2Fwrappers.proto/versions
{"id":1}

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{
  "schemaType": "PROTOBUF",
  "references": [
    {
      "name": "google/protobuf/wrappers.proto",
      "subject": "google/protobuf/wrappers.proto",
      "version": 1
    }
  ],
  "schema": "syntax = \"proto3\";\n\npackage com.acme;\noption java_package = \"com.acme\";\n\nimport \"google/protobuf/wrappers.proto\";\n\nmessage MyMessage {\nstring value = 1;\ngoogle.protobuf.StringValue wrappedValue = 2;\n}\n"
}'  \
http://localhost:8081/subjects/test-value/versions
{"id":2}

kafka-protobuf-console-producer --bootstrap-server localhost:9092 --property value.schema.id=2 --topic test <<EOF
{"value":"hi","wrappedValue":"there"}
EOF

Use of wrapper types is working fine with ksqlDB 6.1.1 and through 0.15.0 with SR 6.1.1, inferring STRUCT for wrapped type:

ksql> create stream s with (kafka_topic='test', value_format='protobuf');

 Message
----------------
 Stream created
----------------

ksql> describe s;

Name                 : S
 Field        | Type
----------------------------------------------
 VALUE        | VARCHAR(STRING)
 WRAPPEDVALUE | STRUCT<VALUE VARCHAR(STRING)>
----------------------------------------------

ksql> select * from s emit changes;
+--------------------+--------------------+
|VALUE               |WRAPPEDVALUE        |
+--------------------+--------------------+
|hi                  |{VALUE=there}       |

In ksqlDB 0.17.0 and 0.18.0, ksqlDB creates the wrapped field as a primitive type, which results in the error at processing time:

ksql> create stream s with (kafka_topic='test', value_format='protobuf');

 Message
----------------
 Stream created
----------------

ksql> describe s;

Name                 : S
 Field        | Type
--------------------------------
 VALUE        | VARCHAR(STRING)
 WRAPPEDVALUE | VARCHAR(STRING)
--------------------------------

ksql> select * from s emit changes;
<no output>
ksql> select * from ksql_processing_log emit changes;
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|LOGGER                          |LEVEL                           |TIME                            |MESSAGE                         |
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
|processing.8849308567551831337.K|ERROR                           |1623044082059                   |{TYPE=0, DESERIALIZATIONERROR={T|
|sqlTopic.Source.deserializer    |                                |                                |ARGET=value, ERRORMESSAGE=Error |
|                                |                                |                                |deserializing message from topic|
|                                |                                |                                |: test, RECORDB64=null, CAUSE=[I|
|                                |                                |                                |nvalid class for string type, ex|
|                                |                                |                                |pecting String or CharSequence b|
|                                |                                |                                |ut found class com.google.protob|
|                                |                                |                                |uf.DynamicMessage], topic=test},|
|                                |                                |                                | RECORDPROCESSINGERROR=null, PRO|
|                                |                                |                                |DUCTIONERROR=null, SERIALIZATION|
|                                |                                |                                |ERROR=null, KAFKASTREAMSTHREADER|
|                                |                                |                                |ROR=null}                       |

@spena
Copy link
Member

spena commented Jun 7, 2021

Seems a bug with the Protobuf converter. The bug is reported here - https://confluentinc.atlassian.net/browse/DGS-1726

@spena spena added the streaming-engine Tickets owned by the ksqlDB Streaming Team label Jun 8, 2021
@spena
Copy link
Member

spena commented Jun 14, 2021

Just noticed the link posted above is not public. For more info, the issue is caused by a SchemaRegistry bug which caused protobuf converter not converting wrappers to structs. This is a regression issue in SchemaRegistry.

Update: The issue in SR is fixed now. The fix in ksqlDB should be available in master as soon as CP artifacts with the SR bugfix are ready.

@vcrfxia vcrfxia added P0 Denotes must-have for a given milestone and removed needs-triage labels Jun 15, 2021
@colinhicks
Copy link
Member

The fix for this issue will be included in the next ksqlDB standalone and cloud release: 0.19.0.

@colinhicks colinhicks added this to the 0.19.0 milestone Jun 30, 2021
@cdadia
Copy link
Author

cdadia commented Jul 20, 2021

@colinhicks I see that 0.19 is out
https://docs.ksqldb.io/en/latest/operate-and-deploy/changelog/

but don't see the fix in the changelog notes also this PR is still open. did it not make 0.19? Or is it planned for a fix version of 0.19?

@colinhicks
Copy link
Member

Hi @cdadia, thanks for pinging about this. Since the team addressed the issue in an upstream dependency and not directly in the ksql codebase, the workflow was a bit different in this case. My intention was to close this at the time of the official release announcement for 0.19 – slated for today. We will also add it to the changelog, which as of now comprises changes from PRs into this repository.

@cdadia
Copy link
Author

cdadia commented Jul 20, 2021

@colinhicks Thanks. I have validated that the use case outlined in the description of this ticket is fixed with 0.19 release.

@colinhicks
Copy link
Member

Great. Thanks for verifying!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug P0 Denotes must-have for a given milestone streaming-engine Tickets owned by the ksqlDB Streaming Team
Projects
None yet
Development

No branches or pull requests

5 participants