Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: MLTransform drops elements if they are already transformed before. #29600

Closed
1 of 16 tasks
AnandInguva opened this issue Dec 4, 2023 · 0 comments · Fixed by #29542
Closed
1 of 16 tasks

[Bug]: MLTransform drops elements if they are already transformed before. #29600

AnandInguva opened this issue Dec 4, 2023 · 0 comments · Fixed by #29542
Labels
awaiting triage bug done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python

Comments

@AnandInguva
Copy link
Contributor

AnandInguva commented Dec 4, 2023

What happened?

When duplicate elements are present in the input PColl, the MLTransform will only output the elements once and drops the remaining duplicate transformed elements. This is not an expected behavior.

Note: MLTransform is intended to be an experimental feature in 2.50.0 to 2.52.0 and this bug suggests not to use MLTransform with those versions if your data have identical elements.

For 2.53.0, the fix will be introduced in PR #29542


Simple repro:

import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
import tempfile
data = [
    {
        'x': 'I'
    },
    {
        'x': 'love'
    },
    {
        'x': 'Beam'
    },
    {
        'x': 'Beam'
    },
    {
        'x': 'is'
    },
    {
        'x': 'awesome'
    },
]
artifact_location = tempfile.mkdtemp()
compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x'])
with beam.Pipeline() as p:
  transformed_data = (
      p
      | beam.Create(data)
      | MLTransform(write_artifact_location=artifact_location).with_transform(
          compute_and_apply_vocabulary_fn)
      | beam.Map(print))

Expected output

Row(x=array([4]))
Row(x=array([1]))
Row(x=array([0]))
Row(x=array([0]))
Row(x=array([2]))
Row(x=array([3]))

Actual output

Row(x=array([4]))
Row(x=array([1]))
Row(x=array([0]))
Row(x=array([2]))
Row(x=array([3]))

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@AnandInguva AnandInguva added this to the 2.53.0 Release milestone Dec 4, 2023
@damccorm damccorm added the done & done Issue has been reviewed after it was closed for verification, followups, etc. label Dec 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
awaiting triage bug done & done Issue has been reviewed after it was closed for verification, followups, etc. P2 python
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants