Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for Avro output format. #1673

Merged
merged 3 commits into from
Apr 26, 2024
Merged

Initial support for Avro output format. #1673

merged 3 commits into from
Apr 26, 2024

Conversation

ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented Apr 25, 2024

Avro connector that encodes output updates according to a user-specified Avro schema. Can optionally post the supplied schema to a schema registry, but does not yet support downloading schema from the registry.

The tricky part of this commit is getting serde to generate precisely the encoding required by the Avro schema, where the same SQL type can be represented in multiple ways. In particular, timestamps can be in microseconds or milliseconds, and decimals are represented as bigints with fixed scale given as part of type definition. To support this, I implemented a special serializer that traverses the Avro schema in parallel with the serialized object and encodes each field according to its schema.

There are currently several limitations, documented in avro/output.rs, including:

  • Currently only supports raw Avro format, i.e., deletes cannot be represented.
  • Only works with user-provided schemas, no support for downloading the schema from the registry yet.
  • Does not support complex schemas with cross-references.

In addition, the connector requires the user to craft an Avro schema that matches the SQL view precisely (or define the view to precisely match an existing schema). In the future, we may want to simplify this workflow by generating an Avro schema from a view definition.

Is this a user-visible change (yes/no): yes

Resolves #1663

@ryzhyk ryzhyk added the adapters Issues related to the adapters crate label Apr 25, 2024
@ryzhyk ryzhyk requested review from gz and snkas April 25, 2024 06:21
Copy link
Contributor

@snkas snkas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, looking forward to also the input variant later on.

A couple of minor comments:

  1. I see the advantage of having the registry in the format (i.e., if there is a schema change in the registry, and a pipeline is restarted, the connector does not need to be explicitly updated). I'm not entirely sure whether this resolution should be happening at adapters-crate level, or at pipeline-manager-crate level when it is converted into its final form that is passed to the pipeline. The latter would have the final deployment configuration be more visible. Possibly out-of-scope for this current PR.

  2. What would be the best place to validate the schema with the table view? This could already happen when a connector with the format is attached to a view for instance, potentially giving off a warning if they are incompatible.

  3. Initially I thought that "schema" might become an extra top-level field next to "format" and "transport", with Avro being one type of schema. As in, that it would just be an indicator "these are the data types in the source/sink" and compare it to the view declaration. I understand now that there is a tight coupling to format: the presence of such a schema effectively implies that the format is Avro binary encoding.

CHANGELOG.md Outdated Show resolved Hide resolved
crates/adapters/Cargo.toml Outdated Show resolved Hide resolved
crates/adapters/src/catalog.rs Show resolved Hide resolved
crates/adapters/src/format/avro/output.rs Show resolved Hide resolved
crates/adapters/src/format/avro/output.rs Outdated Show resolved Hide resolved
crates/adapters/src/format/avro/output.rs Show resolved Hide resolved
/// Avro schema used to encode output records.
///
/// Specified as a string containing schema definition in JSON format.
/// This schema must match precisely the SQL view definition, including
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which view?
doe the schema json contain the view name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whatever view it is connected to. The schema does contain a name (it's just the name of the top-level record type in the schema), but the name doesn't matter for schema compatibility, it can differ from the name of the view.

crates/adapters/src/format/avro/serializer.rs Show resolved Hide resolved
where
T: Serialize,
{
let field_schema_idx = *self.schema.lookup.get(name).ok_or_else(|| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this lookup get cached?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already a map indexed by name, not a lot to speed up here, without changing the whole design.

// Never constructed, required by `trait Serializer`.
pub struct TupleVariantSerializer;

impl SerializeTupleVariant for TupleVariantSerializer {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should you use a macro to generate these?

}
}

fn serialize_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no unsigned value in SQL. So I suspect these are also unreachable.

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Apr 26, 2024

Looks good, looking forward to also the input variant later on.

A couple of minor comments:

1. I see the advantage of having the registry in the format (i.e., if there is a schema change in the registry, and a pipeline is restarted, the connector does not need to be explicitly updated). I'm not entirely sure whether this resolution should be happening at adapters-crate level, or at pipeline-manager-crate level when it is converted into its final form that is passed to the pipeline. The latter would have the final deployment configuration be more visible. Possibly out-of-scope for this current PR.

I think this design is compatible with moving the schema interaction logic to the manager (specifically, to the Avro schema registry service, when we have it) or even the compiler. The way it will work is that the manager will resolve the schema by either retrieving an existing schema by name or id or by generating an Avro schema from SQL and pushing that to the registry. It will then create a connector config with the schema string and schema id (this last bit will need to be added), but without registry URL, to prevent the connector from talking to the registry.

2. What would be the best place to validate the schema with the table view? This could already happen when a connector with the format is attached to a view for instance, potentially giving off a warning if they are incompatible.

Agreed.

3. Initially I thought that "schema" might become an extra top-level field next to "format" and "transport", with Avro being one type of schema. As in, that it would just be an indicator "these are the data types in the source/sink" and compare it to the view declaration. I understand now that there is a tight coupling to format: the presence of such a schema effectively implies that the format is Avro binary encoding.

Yes, these are Avro schemas, in contrast to, e.g., our internal SQL table schemas.

Leonid Ryzhyk added 3 commits April 26, 2024 14:42
Avro connector that encodes output updates according to a user-specified
Avro schema.  Can optionally post the supplied schema to a schema
registry, but does not yet support downloading schema from the registry.

The tricky part of this commit is getting serde to generate precisely the
encoding required by the Avro schema, where the same SQL type can be
represented in multiple ways. In particular, timestamps can be in
microseconds or milliseconds, and decimals are represented as bigints
with fixed scale given as part of type definition.  To support this, I
implemented a special serializer that traverses the Avro schema in
parallel with the serialized object and encodes each field according to
its schema.

There are currently several limitations, documented in `avro/output.rs`,
including:

- Currently only supports raw Avro format, i.e., deletes cannot be represented.
- Only works with user-provided schemas, no support for downloading the
  schema from the registry yet.
- Does not support complex schemas with cross-references.

In addition, the connector requires the user to craft an Avro schema
that matches the SQL view precisely (or define the view to precisely
match an existing schema).  In the future, we may want to simplify this
workflow by generating an Avro schema from a view definition.

Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
- Feature gate Avro for faster compilation when it's not needed.
- Some comments and typos.

Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
Avro and Delta table connectors do not currently support deletes, so we
drop them on the floor.  This is sometimes ok, when the delete is
paired with an insert with the same key and the destination supports
upserts, but in general this is not a correct behavior, so we log it as
an error.

Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
@@ -326,6 +339,31 @@ impl TestStruct2 {
]))
}

pub fn avro_schema() -> &'static str {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parquet test code has a struct with all the types we support.. maybe we should use that for tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i moved it here

@gz
Copy link
Collaborator

gz commented Apr 26, 2024

this is more of a general comment, but I don't like how crates like rust-decimal and chrono become more and more invasive in our crates.. IMO for all data we store we should have our own defined types (and then define Into/From for Decimal/Chrono etc.) this means we can evolve these independently and implement traits for them without having to submit upstream patches

@ryzhyk ryzhyk merged commit b76d4a9 into main Apr 26, 2024
5 checks passed
@ryzhyk ryzhyk deleted the avro branch April 26, 2024 22:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
adapters Issues related to the adapters crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for Avro output format
4 participants