Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle bad records in "data contract" mode #135

Closed
adrianbr opened this issue Feb 17, 2023 · 3 comments
Closed

Handle bad records in "data contract" mode #135

adrianbr opened this issue Feb 17, 2023 · 3 comments
Assignees

Comments

@adrianbr
Copy link
Contributor

adrianbr commented Feb 17, 2023

We need to be able to enforce schemas. We should be able to set a pipelne to a "fixed schema" mode or "data contract" mode

In this mode, the schema persisted at destination becomes the "valid" schema. If there is no schema yet, then a schema is created by parsing some data. This would enable a person wanting to track data to generate a schema from an event, and enforce it.

When attempting to load the records into the frozen schema, the pipeline should handle bad records into a "bad records" table, with the info

  • load id or timestamp
  • record as string if possible, if not truncated to waht is possible.
  • stack trace as string. If too long, truncate to what is possible keeping the useful part of the trace.

Once the feature is implemented, please also create documentation or an example.

@rudolfix
Copy link
Collaborator

may be related to #40

imo generating contract should happen during development, not on production system. so I'd suggest that a complete frozen schema is bundled with a source after being generated from an event by an engineer.

the bad data table should probably also say the table and field name.

what are the bad records?

  • invalid data types on existing records?
  • new columns and tables?

@rudolfix rudolfix mentioned this issue Aug 31, 2023
4 tasks
@rudolfix
Copy link
Collaborator

rudolfix commented Sep 1, 2023

I'd like to describe the behavior we implement here better (and also take some community feedback):
Where the contract applies
How schema evolves and which types of contracts we want to implement now:

  1. a new table appears - now
  2. a new column appears - now
  3. a data type conflict - new variant column appears - now
  4. data breaks custom validators (ie. regexp or enumes) - later

The interface
How we apply contracts

  • global/schema level - in @source decorator, (and maybe directly in schema)
  • resource/table level - in @resource decorator
  • global overrides - in pipeline.run and pipeline.normalize (same behavior as write_disposition)

contract settings should be stored in schema. you can drop sealed property and add new. settings will be stored on schema and table level. schema enginge upgrade is required.

we can also implement hooks but not in this ticket

Contract modes
@sh-rp could you specify the current behavior?

following community feedback I'd add a mode where we do not allow for variant to appear. my take is that we have three types of contracts so we should accept three (slightly different) types of settings:

  • for new tables (evolve, raise, drop etc.)
  • for new columns (the ones we have)
  • for data type conflicts (variant, raise, drop)

Contract vs schema inference
How the contracts appear? People can OFC define columns and table names. but we also want to support schemas inferred from the data. Users should be able to define such behavior ie.
if we allow for new tables but not allow for new columns we basically allow to infer the schema once from data and on next run we freeze it
@sh-rp WDYT?

@sh-rp
Copy link
Collaborator

sh-rp commented Sep 3, 2023

The current PR has a global setting that can be set to the current values:

  • evolve: The current standard behavior, adapt the schema of the destination to the incoming data.
  • freeze-and-trim: Freeze the schema, and trim additional fields and subtables of incoming rows. This will still store all the incoming rows but will discard fields and subtables unknown to the schema.
  • freeze-and-raise: If any of the incoming rows do not fit the current schema, fail the current load. This will not store any data in the destination for this load and also not change the schema.
  • freeze-and-discard: Freeze the schema and completely discard any row that does not fit the current schema. Rows that fit the schema will be stored in the destination.

Right now our PR will evolve the schema in the first run if there is only an empty schema present, regardless of the schema mode.

Based on what you wrote @rudolfix I would specify settings that can be set on the source or the resource level as well as a global override as follows:

for new tables and new columns that are encountered we can each specify one of the above settings.

for new column variants that are encountered, we could use the same settings, which means evolve allows new variants, and the other settings work the same way, so either variants are ignored, fail the whole load or rows that would produce variants are discarded completely. We could think about an option for automatic casting, so say we have a column that is a string, it will convert an incoming integer for this column to a string, but this will not work in many other cases, say a column is a date time and some random string comes in...

We could also have a specific setting that allows for controlling the creation of new subtables, but I am not sure wether this is needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

No branches or pull requests

3 participants