Skip to content

Add template_fields support to SalesforceBulkOperator #62375

@felipearcaro-buildium

Description

@felipearcaro-buildium

Apache Airflow Provider(s)

salesforce

Versions of Apache Airflow Providers

apache-airflow-providers-salesforce 5.12.2

Apache Airflow version

2.11.0

Operating System

Windows 11

Deployment

Astronomer

Deployment details

No response

What happened

When using Jinja templating to pass data from a previous task to SalesforceBulkOperator:

def get_salesforce_records():
    return [
        {
            'Id': 1,
            'Name': 'example1',
            'Status': 'Active',
        },
        {
            'Id': 2,
            'Name': 'example2',
            'Status': 'Active',
        }
    ]

get_records = PythonOperator(
    task_id='get_records',
    python_callable=get_salesforce_records
)

upsert_to_salesforce = SalesforceBulkOperator(
    task_id='upsert_to_salesforce',
    object_name='Account',
    operation='insert',
    payload="{{ ti.xcom_pull(task_ids='get_records') }}" 
)

The payload parameter receives the literal string {{ ti.xcom_pull(task_ids='upstream_task') }} instead of the actual XCom value, causing the operation to fail.

What you think should happen instead

The Jinja template should be evaluated at runtime, similar to how other Airflow operators work. The operator should support templating for key parameters: payload, object_name, operation, and external_id_field.

How to reproduce

Create a generic DAG and create the following tasks:

def get_salesforce_records():
    return [
        {
            'Id': 1,
            'Name': 'example1',
            'Status': 'Active',
        },
        {
            'Id': 2,
            'Name': 'example2',
            'Status': 'Active',
        }
    ]

get_records = PythonOperator(
    task_id='get_records',
    python_callable=get_salesforce_records
)

upsert_to_salesforce = SalesforceBulkOperator(
    task_id='upsert_to_salesforce',
    object_name='Account',
    operation='insert',
    payload="{{ ti.xcom_pull(task_ids='get_records') }}" 
)

Anything else

When I added template_fields to my custom operator (extending SalesforceBulkOperator), I noticed that templated parameters are evaluated as strings, not as their original Python types. This required additional parsing logic in the execute() method:

def execute(self, context):
    # Convert the templated string back to a Python object
    if isinstance(self.payload, str):
        try:
            # Try ast.literal_eval first - handles Python string representation with single quotes
            # e.g., "[{'key': 'value'}]" from XCom
            self.payload = ast.literal_eval(self.payload)
        except (ValueError, SyntaxError):
            # Fallback to json.loads for valid JSON strings (with double quotes)
            self.payload = json.loads(self.payload)
    
    return super().execute(context)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions