Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Evaluate schema transform expressions during ingestion #5238

Merged
merged 17 commits into from
Apr 15, 2020

Conversation

npawar
Copy link
Contributor

@npawar npawar commented Apr 10, 2020

This PR adds the feature to execute transform functions written in the schema. This transformation will happen during ingestion. Detailed design doc listing all changes and design: Column Transformation during ingestion in Pinot.

Changes mainly are:

  1. transformFunction can be written in the schema for any fieldSpec using Groovy. The convention to follow is:
    "transformFunction": "FunctionType({function}, argument1, argument2,...argumentN)"
    For example:
    "transformFunction" : "Groovy({firstName + ' ' + lastName}, firstName, lastName)"
  2. The RecordReader will provide the Pinot schema to the SourceFieldNamesExtractor utility to get source field names.
  3. RecordExtractor interface is introduced, one per input format. The RecordReader will pass the source field names and the source record to the RecordExtractor, which will provide the destination GenericRow.
  4. The ExpressionTransformer will create ExpressionEvaluator for each transform function and execute the functions.
  5. ExpressionTransformer will go before all other RecordTransformers, so that every other transformer has the real values.

I'll be happy to break this down into smaller PRs, if this is getting too big to review.
I'm finding it hard to break this down, because the AvroRecordExtractor is already used in realtime decoders

Pending

  1. Add transform functions in some integration test
  2. JMH benchmarks

Some open questions

  1. We no longer know the data type of the source fields. This is a problem in CSV and JSON.
    CSV
    a) Everything is read as String, right until the DataTypeTransformer. The function will have to take care of handling the type conversion.
    b) Cannot distinguish between MV columns of single value vs single value column. Function will have to take care
    c) All empty values will be null values. Cannot distinguish between genuine “” and null in String
    JSON
    a) Cannot distinguish between INT/LONG and DOUBLE/FLOAT, until DataTypeTransformer.
  2. What should we do if any of the inputs to the transform function is null? Currently, it is skipped. But should we make it the responsibility of the function to handle this?
  3. KafkaJSONDecoder needs to create JSONRecordExtractor by default. But we cannot access JSONRecordExtractor of input-format module in the stream-ingestion module. Did not face this problem in Avro, because everything is in input-format
  4. Before ExpressionTransformer, the GenericRecord contains only source columns. After ExpressionTransformer, the GenericRecord contains source + destination columns, all the way up to the indexing. Should we introduce a Transformer which will create new GenericRecord with only the destination columns, to avoid the memory consumption by the extra columns?

Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, splitting it into a couple of PRs may help -- since you offered. Perhaps starting with the record extractor, but you decide.
Regarding your open questions:
1 . We face this problem because json and csv dont have any schema. Should we introduce the concept of an input schema ? Should we pick the schema from the first record? (what if it has null for some field, and we find the field later? Should we dictate that the first record MUST have all the fields they ever expect to see in the input, and take that to be the schema? Just adding some ideas for consideration. Some related discussion in PR #4968
2. Can we assume pinot defaults if input to transform function is null? This will work until we support more input types than pinot itself supports -- far-fetched I think.
3. ?
4. Since we are clearing and re-using GenericRecord, it should be OK I think.

Copy link
Member

@kishoreg kishoreg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is amazing!. Minor comments - mostly around refactoring.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be nice if we can completely decouple schema from RecordReader and RecordExtractor, which will make future development much easier

@npawar
Copy link
Contributor Author

npawar commented Apr 13, 2020

Will be nice if we can completely decouple schema from RecordReader and RecordExtractor, which will make future development much easier

It's already decoupled from RecordExtractor. Do you mean pull up the schema even more, such that RecordReader also doesn't need Schema? What would we achieve by doing that?

@Jackie-Jiang
Copy link
Contributor

Jackie-Jiang commented Apr 13, 2020

Will be nice if we can completely decouple schema from RecordReader and RecordExtractor, which will make future development much easier

It's already decoupled from RecordExtractor. Do you mean pull up the schema even more, such that RecordReader also doesn't need Schema? What would we achieve by doing that?

@npawar RecordExtractor is for stream ingestion, and RecordReader is for batch ingestion. Think of some users trying to add a new record reader, they don't need to understand what schema is, they only need to know here are the fields that should be read.
This might be bigger change, so we can add a TODO and address it separately.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the tests, can we hardcode more than one records to catch the problem such as ORC case?

Copy link
Contributor

@mcvsubbu mcvsubbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial

@npawar
Copy link
Contributor Author

npawar commented Apr 14, 2020

Will be nice if we can completely decouple schema from RecordReader and RecordExtractor, which will make future development much easier

It's already decoupled from RecordExtractor. Do you mean pull up the schema even more, such that RecordReader also doesn't need Schema? What would we achieve by doing that?

@npawar RecordExtractor is for stream ingestion, and RecordReader is for batch ingestion. Think of some users trying to add a new record reader, they don't need to understand what schema is, they only need to know here are the fields that should be read.
This might be bigger change, so we can add a TODO and address it separately.

As per offline sync up, StreamMessageDecoder is the entry point for realtime, and RecordReader is the entry point for batch. The RecordExtractor is expected to be common to both of them. Picture in the design doc linked in the description.
And I've added a TODO in RecordReader class to further pull out Schema. For consistency, we should do the same in StreamMessageDecoder as well then. Since this is a bigger change, will leave it out for the scope of this PR.

@npawar
Copy link
Contributor Author

npawar commented Apr 14, 2020

For the tests, can we hardcode more than one records to catch the problem such as ORC case?

The RecordExtractor tests are on more than 1 record

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise, great work

@mayankshriv
Copy link
Contributor

We missed to discuss the pros/cons of using Groovy on the design doc, apologies. One thing to note is that opening the door for executing an external script/code is a security risk. For example, we need to protect against malicious intent such as System.exit(), while (true), forced OOM, etc. Some of these can be prevented by using Protected/Priviliged mode in Groovy. But we should consider whether there are easier alternatives that don't require inventing another DSL.

@npawar npawar force-pushed the expression_parser_evaluation branch from e12c035 to 2570bf7 Compare April 14, 2020 22:14
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@npawar
Copy link
Contributor Author

npawar commented Apr 15, 2020

We missed to discuss the pros/cons of using Groovy on the design doc, apologies. One thing to note is that opening the door for executing an external script/code is a security risk. For example, we need to protect against malicious intent such as System.exit(), while (true), forced OOM, etc. Some of these can be prevented by using Protected/Priviliged mode in Groovy. But we should consider whether there are easier alternatives that don't require inventing another DSL.

Let's continue the discussion on the design doc. We can always remove the Groovy function type if we decide against it. The rest of the changes should remain the same.
Regarding Protected/Privileged mode - can you point me to some docs, I can't seem to find what you're referring to.

@npawar
Copy link
Contributor Author

npawar commented Apr 15, 2020

#5135

@npawar
Copy link
Contributor Author

npawar commented Apr 15, 2020

Yes, splitting it into a couple of PRs may help -- since you offered. Perhaps starting with the record extractor, but you decide.
Regarding your open questions:
1 . We face this problem because json and csv dont have any schema. Should we introduce the concept of an input schema ? Should we pick the schema from the first record? (what if it has null for some field, and we find the field later? Should we dictate that the first record MUST have all the fields they ever expect to see in the input, and take that to be the schema? Just adding some ideas for consideration. Some related discussion in PR #4968

InputSchema seems like the cleanest solution. But that's more configuration and more scope for error for the user. Giving it more thought, I think the current implementation will work fine, because most often, the same input format will be used for all pushes to the same table. The problem arises only if someone keeps switching between input formats of the data.

  1. Can we assume pinot defaults if input to transform function is null? This will work until we support more input types than pinot itself supports -- far-fetched I think.

Right now, I think the best way forward is to let the function handle it. The function definition will change only if someone uses data with different input formats to push to the same schema (if they use Avro once and it succeeds, but then they use CSV and the function fails) - which is not a common practical case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants