Skip to content

Allow transform expressions on same column as source and sink.#8413

Closed
amrishlal wants to merge 1 commit intoapache:masterfrom
amrishlal:single-transform-column
Closed

Allow transform expressions on same column as source and sink.#8413
amrishlal wants to merge 1 commit intoapache:masterfrom
amrishlal:single-transform-column

Conversation

@amrishlal
Copy link
Contributor

Description

This PR allows for having column names in Pinot that have same name as the corresponding column name in an incoming online or offline (avro) dataset even after applying an Ingestion Transform Function. To do this, we modify ExpressionTransformer.topologicalSort function so that Ingestion Transform function dependency is not considered cyclic if an Ingestion Transform Function has the same column name as both source and sink:

               "transformConfigs": [{
                  "columnName": "jsonColumn",
                  "transformFunction": "JSON_FORMAT(jsonColumn)"
                }]

Note that there is no actual cyclic dependency here since the function can still be safely evaluated without getting into an infinite loop.

  • ExpressionTransformer.java has changes to allow for specifying Ingestion Transform functions where source and sink column names are the same.
  • ExpressionTransformTest.java has unit tests for validating the change.
  • JsonIngestionFromAvroQueriesTest.java was added as a real usecase testcase. This test involves ingesting avro complex type fields into JSON column when an Ingestion Transform function is used to map the avro complex type field to Pinot JSON column of the same name.
  • AvroIngestionSchemaValidator.java changes allow for validating type compatibility between Avro complex type fields and JSON column.

Upgrade Notes

Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)

  • Yes (Please label as backward-incompat, and complete the section below on Release Notes)

Does this PR fix a zero-downtime upgrade introduced earlier?

  • Yes (Please label this as backward-incompat, and complete the section below on Release Notes)

Does this PR otherwise need attention when creating release notes? Things to consider:

  • New configuration options
  • Deprecation of configurations
  • Signature changes to public methods/interfaces
  • New plugins added or old plugins removed
  • Yes (Please label this PR as release-notes and complete the section on Release Notes)

Release Notes

Documentation

@codecov-commenter
Copy link

codecov-commenter commented Mar 25, 2022

Codecov Report

Merging #8413 (66f8f7e) into master (37fcfb8) will decrease coverage by 6.72%.
The diff coverage is 32.00%.

@@             Coverage Diff              @@
##             master    #8413      +/-   ##
============================================
- Coverage     70.77%   64.05%   -6.73%     
- Complexity     4278     4281       +3     
============================================
  Files          1655     1610      -45     
  Lines         86607    84739    -1868     
  Branches      13064    12866     -198     
============================================
- Hits          61296    54278    -7018     
- Misses        21068    26540    +5472     
+ Partials       4243     3921     -322     
Flag Coverage Δ
integration1 ?
integration2 ?
unittests1 66.98% <32.00%> (-0.02%) ⬇️
unittests2 14.10% <0.00%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...inputformat/avro/AvroIngestionSchemaValidator.java 0.00% <0.00%> (ø)
...local/recordtransformer/ExpressionTransformer.java 100.00% <100.00%> (ø)
...va/org/apache/pinot/core/routing/RoutingTable.java 0.00% <0.00%> (-100.00%) ⬇️
...va/org/apache/pinot/common/config/NettyConfig.java 0.00% <0.00%> (-100.00%) ⬇️
...a/org/apache/pinot/common/metrics/MinionMeter.java 0.00% <0.00%> (-100.00%) ⬇️
...g/apache/pinot/common/metrics/ControllerMeter.java 0.00% <0.00%> (-100.00%) ⬇️
.../apache/pinot/common/metrics/BrokerQueryPhase.java 0.00% <0.00%> (-100.00%) ⬇️
.../apache/pinot/common/metrics/MinionQueryPhase.java 0.00% <0.00%> (-100.00%) ⬇️
...apache/pinot/common/helix/ExtraInstanceConfig.java 0.00% <0.00%> (-100.00%) ⬇️
...ache/pinot/server/access/AccessControlFactory.java 0.00% <0.00%> (-100.00%) ⬇️
... and 381 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 37fcfb8...66f8f7e. Read the comment docs.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's hold a little bit on merging this and have some high level discussion first.

We intentionally reject ingestion transform with same input and output column because it is not idempotent, and can cause unexpected behavior if by any chance the same record is transformed twice. Also, in certain scenarios, the input data might already have the final column generated, and we just skip the transform.
I would be super careful on this change because we need to ensure the record is never transformed twice. Another concern is that if the ingestion transform changes, there is no way to re-generate the derived column because the original values are already changed.
IMO, loose this restriction can easily cause unexpected behavior, and might not worth it.

@amrishlal
Copy link
Contributor Author

amrishlal commented Mar 25, 2022

Let's hold a little bit on merging this and have some high level discussion first.

We intentionally reject ingestion transform with same input and output column because it is not idempotent, and can cause unexpected behavior if by any chance the same record is transformed twice. Also, in certain scenarios, the input data might already have the final column generated, and we just skip the transform. I would be super careful on this change because we need to ensure the record is never transformed twice. Another concern is that if the ingestion transform changes, there is no way to re-generate the derived column because the original values are already changed. IMO, loose this restriction can easily cause unexpected behavior, and might not worth it.

The problem that we are running into is that for GDPR etc., we need to be able to purge records from a segment based on values of a particular field and if we change the name of the field that is being ingested into Pinot, then we loose information that column 'x' in the Pinot table actually came from field 'y' in the Kafka event / avro schema and hence cannot purge records automatically based on orginal avro schema field name 'y' in minion.

Definitely open to suggestions and discussion, but my understanding is that ingestion transform functions are applied only during ingestion where the original field is in kafka/avro and the transformed value goes into Pinot column, so this should be safe right? If you have any particular usecase that may not be safe I can try them out?

@Jackie-Jiang
Copy link
Contributor

The problem that we are running into is that for GDPR etc., we need to be able to purge records from a segment based on values of a particular field and if we change the name of the field that is being ingested into Pinot, then we loose information that column 'x' in the Pinot table actually came from field 'y' in the Kafka event / avro schema and hence cannot purge records automatically based on orginal avro schema field name 'y' in minion.

If you transform the value within column 'x', even if you can find the column, the value is no longer the original value, how do you apply the purge logic?
Also, even if you can modify the value properly, when generating the new segment, it will use the record transformer to process the records again, which will cause the transform twice problem. If the record transform step is skipped, then there is no guarantee that the value type is correct.
Making all transforms idempotent can make it much more robust.

Definitely open to suggestions and discussion, but my understanding is that ingestion transform functions are applied only during ingestion where the original field is in kafka/avro and the transformed value goes into Pinot column, so this should be safe right? If you have any particular usecase that may not be safe I can try them out?

Ingestion transforms can be used during ingestion and also during reload to generate the derived column. Also, on the minion side, the segment can be read as source file, and fed into the ingestion engine again, which may transform the records again. We have to take extra care to make it right if the transform is not idempotent.

@amrishlal
Copy link
Contributor Author

@Jackie-Jiang Let me look into it a bit more. Will ping you offline.

@amrishlal
Copy link
Contributor Author

Closing as #8426 has been merged.

@amrishlal amrishlal closed this Mar 30, 2022
@amrishlal amrishlal deleted the single-transform-column branch March 31, 2022 17:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants