Hey there!
I am researching schema evolution in pinot.
I use pinot, kafka and avro schema registry.
Messages in kafka are sending in avro format.
I have created a pinot realtime table that based on avro schema:
{"namespace": "some.domen", "type": "record", "name": "Product", "fields": [ {"type": "double", "name": "productID"}, {"name": "name", "type": "string"}, {"name": "published_date", "type": ["string", "null"]}, {"name": "cost", "type": "double"}, {"name": "timestamp", "type": "long"} ] }
First I converted avro schema into pinot schema and then created a pinot table using this config file:
{ "tableName": "product", "tableType": "REALTIME", "segmentsConfig": { "timeColumnName": "timestamp", "timeType": "MILLISECONDS", "schemaName": "product", "replicasPerPartition": "1" }, "tenants": {}, "tableIndexConfig": { "loadMode": "MMAP", "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.topic.name": "product-topic", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.zk.broker.url": "localhost:2181/kafka", "stream.kafka.broker.list": "localhost:9092", "realtime.segment.flush.threshold.time": "3600000", "realtime.segment.flush.threshold.size": "50000", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.decoder.prop.schema.registry.rest.url": "http://localhost:8081" } }, "metadata": { "customConfigs": {} } }
After that I added a new column 'country' to avro schema. Converted into pinot schema too and updated the existing schema:
./bin/pinot-admin.sh UpdateSchema -schemaFile /tmp/schemas/product.json -exec
Then I executed:
curl -X POST "http://localhost:9000/segments/product/reload?type=REALTIME" -H "accept: application/json"
New column successfully added, I see null values in country column.
But new data that have not empty country column are importing with null values into pinot table too.
Is it a bug? Is it possible to import new data from kafka with new schema format without reloading pinot?
Hey there!
I am researching schema evolution in pinot.
I use pinot, kafka and avro schema registry.
Messages in kafka are sending in avro format.
I have created a pinot realtime table that based on avro schema:
{"namespace": "some.domen", "type": "record", "name": "Product", "fields": [ {"type": "double", "name": "productID"}, {"name": "name", "type": "string"}, {"name": "published_date", "type": ["string", "null"]}, {"name": "cost", "type": "double"}, {"name": "timestamp", "type": "long"} ] }First I converted avro schema into pinot schema and then created a pinot table using this config file:
{ "tableName": "product", "tableType": "REALTIME", "segmentsConfig": { "timeColumnName": "timestamp", "timeType": "MILLISECONDS", "schemaName": "product", "replicasPerPartition": "1" }, "tenants": {}, "tableIndexConfig": { "loadMode": "MMAP", "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.topic.name": "product-topic", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.zk.broker.url": "localhost:2181/kafka", "stream.kafka.broker.list": "localhost:9092", "realtime.segment.flush.threshold.time": "3600000", "realtime.segment.flush.threshold.size": "50000", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.decoder.prop.schema.registry.rest.url": "http://localhost:8081" } }, "metadata": { "customConfigs": {} } }After that I added a new column 'country' to avro schema. Converted into pinot schema too and updated the existing schema:
./bin/pinot-admin.sh UpdateSchema -schemaFile /tmp/schemas/product.json -execThen I executed:
curl -X POST "http://localhost:9000/segments/product/reload?type=REALTIME" -H "accept: application/json"New column successfully added, I see null values in country column.
But new data that have not empty country column are importing with null values into pinot table too.
Is it a bug? Is it possible to import new data from kafka with new schema format without reloading pinot?