Skip to content

[Feature Request]: documentation how to override connector logic to be reflected when executing pipeline #23457

@IOR88

Description

@IOR88

What would you like to happen?

Hi I am using mongodbio connector and needed to overwrite its logic of write

to not use ReplaceOne but UpdateOne.

In the main function which has the pipeline logic I added sth like that bellow:

beam.io.mongodbio._MongoSink.write = my_custom_function

and this is my pipeline logic:

import apach_beam as beam

def custom_write(self, documents):
    """
    beam.io.WriteToMongoDB._MongoSink update to merge data not override
    :param self:
    :param documents:
    :return:
    """
    if self.client is None:
      self.client = MongoClient(host=self.uri, **self.spec)
    requests = []
    for doc in documents:
      # match document based on _id field, if not found in current collection,
      # insert new one, otherwise overwrite it.
      requests.append(
          UpdateOne(
              filter={"_id": doc.get("_id", None)},
              update={"$set": doc},
              upsert=True))
    resp = self.client[self.db][self.coll].bulk_write(requests)
    beam.io.mongodbio._LOGGER.debug(
        "BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, "
        "nMatched:%d, Errors:%s" % (
            resp.modified_count,
            resp.upserted_count,
            resp.matched_count,
            resp.bulk_api_result.get("writeErrors"),
        ))

beam.io.mongodbio._MongoSink.write = custom_write

def main(argv=None):
    parser = argparse.ArgumentParser()

    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)

    with beam.Pipeline(options=pipeline_options) as pipeline:

        (pipeline
         | f"QueryTable" >> beam.io.ReadFromBigQuery(
            query="")
         | f"TransformingData" >> beam.Map()
         | f'WritingToMongo' >> beam.io.WriteToMongoDB(uri=uri,
                                                       db=db,
                                                       coll=coll,
                                                       batch_size=limit)
         )


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    main()

This works when I use DirectRunner but it doesn't work when using DataflowRunner

I am using my own docker image with dependencies already installed.

My question is how I can make a change to existing connector so it is reflected when executing pipeline ? Like how I can dynamically replace some logic or functions ?

Issue Priority

Priority: 2

Issue Component

Component: io-py-mongodb

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions