Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/AvroRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,7 @@ const AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(Sc
auto schema = schema_registry->getSchema(schema_id);
AvroDeserializer deserializer(
output.getHeader(), schema, format_settings.avro.allow_missing_fields, format_settings.null_as_default, format_settings);
it = deserializer_cache.emplace(schema_id, deserializer).first;
it = deserializer_cache.emplace(schema_id, std::move(deserializer)).first;
}
return it->second;
}
Expand Down
6 changes: 6 additions & 0 deletions src/Processors/Formats/Impl/AvroRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ class AvroDeserializer
public:
AvroDeserializer(const Block & header, avro::ValidSchema schema, bool allow_missing_fields, bool null_as_default_, const FormatSettings & settings_);
AvroDeserializer(DataTypePtr data_type, const std::string & column_name, avro::ValidSchema schema, bool allow_missing_fields, bool null_as_default_, const FormatSettings & settings_);

AvroDeserializer(const AvroDeserializer &) = delete;
AvroDeserializer & operator=(const AvroDeserializer &) = delete;
AvroDeserializer(AvroDeserializer &&) = default;
AvroDeserializer & operator=(AvroDeserializer &&) = delete;

void deserializeRow(MutableColumns & columns, avro::Decoder & decoder, RowReadExtension & ext) const;

using DeserializeFn = std::function<bool(IColumn & column, avro::Decoder & decoder)>;
Expand Down
41 changes: 41 additions & 0 deletions tests/integration/test_format_avro_confluent/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,47 @@ def test_select(started_cluster):
]


def test_select_skip_symbolic(started_cluster):
# type: (ClickHouseCluster) -> None

reg_url = "http://localhost:{}".format(started_cluster.schema_registry_port)
schema_registry_client = CachedSchemaRegistryClient({"url": reg_url})
serializer = MessageSerializer(schema_registry_client)

schema = avro.schema.make_avsc_object(
{
"name": "Node",
"type": "record",
"fields": [
{"name": "value", "type": "long"},
{"name": "next", "type": ["null", "Node"]},
],
}
)

record = {"value": 0, "next": {"value": 1, "next": {"value": 2, "next": None}}}
data = serializer.encode_record_with_schema(
"test_subject_skip_symbolic", schema, record
)

instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
schema_registry_url = "http://{}:{}".format(
started_cluster.schema_registry_host, started_cluster.schema_registry_port
)
settings = {"format_avro_schema_registry_url": schema_registry_url}
run_query(
instance,
"create table avro_data_skip_symbolic(value Int64) engine = Memory()",
)
run_query(
instance,
"insert into avro_data_skip_symbolic format AvroConfluent",
data,
settings,
)
assert run_query(instance, "select value from avro_data_skip_symbolic").strip() == "0"


def test_select_auth(started_cluster):
# type: (ClickHouseCluster) -> None

Expand Down