Skip to content

MongoToS3Operator failed when running with a single query (not aggregate pipeline) #15679

@amatellanes

Description

@amatellanes

Apache Airflow version: 2.0.2

What happened:

MongoToS3Operator failed when running with a single query (not aggregate pipeline):

Traceback (most recent call last):
  File "/home/airflow//bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow//lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/home/airflow//lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow//lib/python3.8/site-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow//lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 385, in task_test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/home/airflow//lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow//lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1413, in run
    self._run_raw_task(
  File "/home/airflow//lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow//lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow//lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow//lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow//lib/python3.8/site-packages/airflow/providers/amazon/aws/transfers/mongo_to_s3.py", line 116, in execute
    results = MongoHook(self.mongo_conn_id).find(
  File "/home/airflow//lib/python3.8/site-packages/airflow/providers/mongo/hooks/mongo.py", line 144, in find
    return collection.find(query, **kwargs)
  File "/home/airflow//lib/python3.8/site-packages/pymongo/collection.py", line 1523, in find
    return Cursor(self, *args, **kwargs)
TypeError: __init__() got an unexpected keyword argument 'allowDiskUse'

What you expected to happen:

I expect the data from MongoDB to be exported to a file in S3 with no errors.

How to reproduce it:

Run the following operator with a single mongo_query (no aggregate pipeline):

export_to_s3 = MongoToS3Operator(
    task_id='export_to_s3',
    mongo_conn_id=Variable.get('mongo_conn_id'),
    s3_conn_id=Variable.get('aws_conn_id'),
    mongo_collection='my_mongo_collection',
    mongo_query={},
    s3_bucket=Variable.get('s3_bucket'),
    s3_key="my_data.json",
    replace=True,
    dag=dag,
)

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions