Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New schema version registered on INSERT VALUES #6091

Closed
mikebin opened this issue Aug 25, 2020 · 8 comments
Closed

New schema version registered on INSERT VALUES #6091

mikebin opened this issue Aug 25, 2020 · 8 comments
Assignees
Labels
bug P0 Denotes must-have for a given milestone schema-registry-integration streaming-engine Tickets owned by the ksqlDB Streaming Team

Comments

@mikebin
Copy link

mikebin commented Aug 25, 2020

Describe the bug
When using Avro with Schema Registry in ksqlDB, a schema is registered for the CREATE STREAM statement (if it doesn't already exist). Then when inserting records with INSERT VALUES, a second version of the schema (with one minor metadata difference - no connect.name attribute) is subsequently registered. The second version seems unnecessary.

To Reproduce
Version: 0.11.0

Create a new Avro-based stream:

create stream test (a varchar ) with ( kafka_topic='test', value_avro_schema_full_name='test.name', partitions='1', value_format='avro');

View the registered schema:

curl -s http://localhost:8081/subjects/test-value/versions/1 | jq -r '.schema' | jq
{
  "type": "record",
  "name": "name",
  "namespace": "test",
  "fields": [
    {
      "name": "A",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ],
  "connect.name": "test.name"
}

Insert a record:

insert into test (a) values ('hi');

View the new schema version:

curl -s http://localhost:8081/subjects/test-value/versions/2 | jq -r '.schema' | jq
{
  "type": "record",
  "name": "name",
  "namespace": "test",
  "fields": [
    {
      "name": "A",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

Note that the only difference in the second schema version is no connect.name metadata attribute.

Expected behavior
Only one schema version is created when the stream is defined

Actual behaviour
See above

@vpapavas vpapavas added this to the 0.13.0 milestone Aug 31, 2020
@vpapavas vpapavas added the P0 Denotes must-have for a given milestone label Aug 31, 2020
@apurvam apurvam removed this from the 0.13.0 milestone Sep 2, 2020
@agavra
Copy link
Contributor

agavra commented Sep 2, 2020

tl;dr This is a real bug, but I'm going down downgrade it from p0 because it's mostly cosmetic.


To add some background on the issue - we are eagerly registering schemas when we create streams/tables that don't have any to make sure that the compatibility will be maintained going forward. The default behavior for connect-based serializers is to register the schema when data is produced.

Unfortunately, it seems that the serializers are using a different mechanism for building the schema (namely, we don't construct the schema using the basic AvroConverter and instead we use the AvroSchemas class to generate schemas).

TBH, I'm not entirely sure why we don't just use the vanilla converter (as we do for both JSON_SR and PROTOBUF formats). The javadoc mentions:

/**
 * Translates KSQL data and schemas to Avro equivalents.
 *
 * <p>Responsible for converting the KSQL schema to a version ready for connect to convert to an
 * avro schema.
 *
 * <p>This includes ensuring field names are valid Avro field names and that nested types do not
 * have name clashes.
 */

I don't know if this is something legacy or if this is actually needed. We need to dig more into that, but the long story short is that this shouldn't cause us any true issues - the schema registry would not let us register the new schema if it was incompatible with the old one, and it will only register it on the first INSERT VALUES call.

@agavra agavra added fix-it-week and removed P0 Denotes must-have for a given milestone needs-triage labels Sep 2, 2020
@agavra agavra removed their assignment Sep 2, 2020
@mikebin
Copy link
Author

mikebin commented Apr 6, 2021

@agavra - would it be correct to say that this issue applies to any serialization that happens after stream creation, not just INSERT VALUES?

Also, would the loss of the Connect metadata attributes have any effect to schema compatibility/validation downstream for data types like decimal, which include attributes like precision and scale as special Connect attributes? Or would AvroConverter just derive these Connect metadata attributes again if needed from the standard Avro schema attributes? In other words, would any of these Connect metadata attributes that get dropped have any significance if a sink connector with AvroConverter were used downstream? I'm thinking no, but wanted to confirm.

@agavra
Copy link
Contributor

agavra commented Apr 7, 2021

would it be correct to say that this issue applies to any serialization that happens after stream creation, not just INSERT VALUES?

What other types of serialization were you thinking of? This might also be a problem for INSERT INTO but I'm not sure (I can verify if this is important). Otherwise, source tables don't have ksql writing into them (and whoever is producing into them using other means can specify their own schema however they'd like)

In other words, would any of these Connect metadata attributes that get dropped have any significance if a sink connector with AvroConverter were used downstream? I'm thinking no, but wanted to confirm.

I believe we correctly account for precision/scale in decimal specifically, but I think there might be some compatibility issues with certain schema registry configurations (i.e. see #7174 which is a very similar root cause) not so much on the Connect side of things. That being said, this is one of those situations where we'd need to test every combination of things to check if a problem could exist - but since we still use the connect serializers as well as schema registry, we should never register a truly incompatible schema.

@mjsax
Copy link
Member

mjsax commented Jan 31, 2022

tl;dr This is a real bug, but I'm going down downgrade it from p0 because it's mostly cosmetic.

Why cosmetic? Seems it actually breaks INSERT INTO VALUES statements. Cf. #7211

Bumping to P0.

@mjsax mjsax added the P0 Denotes must-have for a given milestone label Jan 31, 2022
@hli21
Copy link
Contributor

hli21 commented Feb 26, 2022

Below is another test. The following sqls create two internal topics: _confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-GroupBy-repartition and _confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-Aggregate-Materialize-changelog.

create stream data (col1 int key, col2 string key, col3 string) with (kafka_topic='data', format='avro', partitions=1);
create table materialized_data as select col1, col2, count(1) as cnt from data group by col1, col2;
insert into data (col1, col2, col3) values (10, 'b', 'c');

curl -X GET http://localhost:8081/subjects/_confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-GroupBy-repartition-key/versions/1

{
"subject": "_confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-GroupBy-repartition-key",
"version": 1,
"id": 23,
"schema": "{"type":"record","name":"DataKey","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"COL1","type":["null","int"],"default":null},{"name":"COL2","type":["null","string"],"default":null}]}"
}

curl -X GET http://localhost:8081/subjects/_confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-Aggregate-Materialize-changelog-key/versions/1

{
"subject": "_confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-Aggregate-Materialize-changelog-key",
"version": 1,
"id": 23,
"schema": "{"type":"record","name":"DataKey","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"COL1","type":["null","int"],"default":null},{"name":"COL2","type":["null","string"],"default":null}]}"
}

Should the names of the two schemas be "MaterializedDataKey", not "DataKey"?

@lihaosky
Copy link
Member

How we register schema during insertion for avro: https://github.com/confluentinc/ksql/blob/master/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/KsqlAvroSerdeFactory.java#L171

How we register schema during creation for avro: https://github.com/confluentinc/ksql/blob/master/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemaTranslator.java#L34

Notice during creation time, there's no: avroConfig.put(AvroDataConfig.CONNECT_META_DATA_CONFIG, false);. That's why connect.name is in creation time schema but not in insertion time schema.

We should unify the config to be in one place for schema registration during creation and insertion.

@colinhicks
Copy link
Member

Thanks for the analysis and details, @lihaosky. Do you happen to have a sense of an LOE for unifying the schema registration config?

@lihaosky
Copy link
Member

Hi @colinhicks , I think it take about 1.5 weeks to design and implement and 1 more week to do proper testing and fixing broken QTT tests etc (since it may not be backward compatible, we need to find ways to fix ~1000 broken QTT test and do other verifications necessary to make sure it doesn't break anything. This could be time consuming). So totally 2.5 - 3 weeks maybe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug P0 Denotes must-have for a given milestone schema-registry-integration streaming-engine Tickets owned by the ksqlDB Streaming Team
Projects
None yet
Development

No branches or pull requests

10 participants