Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate external transform wrappers using a script #29834

Merged
merged 72 commits into from
Feb 22, 2024

Conversation

ahmedabu98
Copy link
Contributor

@ahmedabu98 ahmedabu98 commented Dec 20, 2023

Implementing a script that generates wrappers for external SchemaTransforms, according to Option #3 in the following design doc: https://s.apache.org/autogen-wrappers

The script's workflow takes place in setup.py, which can be invoked for local setup or for building the SDK for a Beam release. Files are generated in a subdirectory that is ignored by git. From there, the wrappers can be imported into relevant __init__.py files.

Wrappers are generated along with any documentation provided from the underlying SchemaTransform, and is in compliance with existing linting and pydoc rules.

With the expansion service YAML config, the following transform YAML config is generated:

Transform YAML config
- default_service: sdks:java:io:expansion-service:shadowJar
  description: 'Outputs a PCollection of Beam Rows, each containing a single INT64
    number called "value". The count is produced from the given "start"value and either
    up to the given "end" or until 2^63 - 1.

    To produce an unbounded PCollection, simply do not specify an "end" value. Unbounded
    sequences can specify a "rate" for output elements.

    In all cases, the sequence of numbers is generated in parallel, so there is no
    inherent ordering between the generated values'
  destinations:
    python: apache_beam/io
  fields:
    end:
      description: The maximum number to generate (exclusive). Will be an unbounded
        sequence if left unspecified.
      nullable: true
      type: numpy.int64
    rate:
      description: Specifies the rate to generate a given number of elements per a
        given number of seconds. Applicable only to unbounded sequences.
      nullable: true
      type: Row(seconds=typing.Union[numpy.int64, NoneType], elements=<class 'numpy.int64'>)
    start:
      description: The minimum number to generate (inclusive).
      nullable: false
      type: numpy.int64
  identifier: beam:schematransform:org.apache.beam:generate_sequence:v1
  name: GenerateSequence

From this config, external transform wrappers are created and stored in appropriate modules. For example, our config gives us the following apache_beam/transforms/xlang/io.py file:

GenerateSequence wrapper
# NOTE: This file contains autogenerated external transform(s)
# and should not be edited by hand.
# Refer to gen_xlang_wrappers.py for more info.

"""Cross-language transforms in this module can be imported from the
:py:mod:`apache_beam.io` package."""

# pylint:disable=line-too-long

from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external_transform_provider import ExternalTransform


class GenerateSequence(ExternalTransform):
  """
  Outputs a PCollection of Beam Rows, each containing a single INT64 number
  called "value". The count is produced from the given "start" value and either
  up to the given "end" or until 2^63 - 1.
  To produce an unbounded PCollection, simply do not specify an "end" value.
  Unbounded sequences can specify a "rate" for output elements.
  In all cases, the sequence of numbers is generated in parallel, so there is no
  inherent ordering between the generated values
  """
  identifier = "beam:schematransform:org.apache.beam:generate_sequence:v1"

  def __init__(self, start, end=None, rate=None, expansion_service=None):
    """
    :param start: (numpy.int64)
      The minimum number to generate (inclusive). 
    :param end: (numpy.int64)
      The maximum number to generate (exclusive). Will be an unbounded
      sequence if left unspecified. 
    :param rate: (Row(seconds=typing.Union[numpy.int64, NoneType], elements=<class 'numpy.int64'>))
      Specifies the rate to generate a given number of elements per a given
      number of seconds. Applicable only to unbounded sequences. 
    """
    self.default_expansion_service = BeamJarExpansionService(
        "sdks:java:io:expansion-service:shadowJar")
    super().__init__(
        start=start, end=end, rate=rate, expansion_service=expansion_service)

Including documentation for how the script is used, and unit tests for different parts of the script.

Adding a gradle command ./gradlew generateExternalTransformsConfig to build the configs (it takes care of building the relevant expansion jars beforehand). Once th

Also adding a PreCommit test that generates the transform config from scratch and compares it with the existing one. This serves to indicate whether a change will render the existing config out of sync. To resolve, one would re-generate the config (with ./gradlew generateExternalTransformsConfig) and commit the changes.

@ahmedabu98 ahmedabu98 changed the title Generate Python wrappers for external transforms in setup.py Generate external transform wrappers using a script Dec 20, 2023
@ahmedabu98 ahmedabu98 marked this pull request as ready for review December 22, 2023 00:11
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @jrmccluskey for label python.
R: @damccorm for label build.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@damccorm
Copy link
Contributor

R: @robertwb @tvalentyn @chamikaramj

(manually requesting to make the review bot happy)

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

// setup test env
def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
executable 'sh'
args '-c', ". ${project.ext.envdir}/bin/activate && $pythonDir/scripts/run_expansion_services.sh stop --group_id ${project.name} && $pythonDir/scripts/run_expansion_services.sh start $serviceArgs"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't do this now, could you at least file a bug and drop a TODO.

sdks/python/setup.py Outdated Show resolved Hide resolved
sdks/standard_expansion_services.yaml Outdated Show resolved Hide resolved
@robertwb
Copy link
Contributor

robertwb commented Feb 12, 2024 via email

@tvalentyn
Copy link
Contributor

I think we need to put the import back in the try:, except: pass block?

That will work, alternatively, you can have dedicated functions: to generate config, and to generate wrappers, and make necessary imports only in functions where imports are required; you might have to add # pylint: disable=g-import-not-at-top.

@ahmedabu98
Copy link
Contributor Author

ahmedabu98 commented Feb 14, 2024

@tvalentyn this is already the case, the steps are in different functions. e.g. we're only importing apache_beam when using the function to generate transform configs.
But this function is unusable in a clean setup (one that doesn't have generated modules) unless we make imports like from apache_beam.transforms.xlang.io import * optional.

…lly starting each one beforehand; skip existing transforms from gcp service config; cleanup precommit test config
@github-actions github-actions bot added the gcp label Feb 15, 2024
@ahmedabu98
Copy link
Contributor Author

The newly added precommit that performs a transform config sync check (beam_PreCommit_Xlang_Generated_Transforms) is running green on my fork: https://github.com/ahmedabu98/beam/actions/runs/7933231703/

Will merge after current tests pass

@ahmedabu98 ahmedabu98 merged commit 11f9bce into apache:master Feb 22, 2024
108 checks passed
Abacn added a commit to Abacn/beam that referenced this pull request Feb 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants