Skip to content

Commit

Permalink
Add SalesforceApexRestOperator (#18819)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariotaddeucci committed Oct 8, 2021
1 parent 42dc076 commit e9a72a4
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime

from airflow import DAG
from airflow.providers.salesforce.operators.salesforce_apex_rest import SalesforceApexRestOperator

with DAG(
dag_id="salesforce_apex_rest_operator_dag",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:

# [START howto_salesforce_apex_rest_operator]
payload = {"activity": [{"user": "12345", "action": "update page", "time": "2014-04-21T13:00:15Z"}]}

apex_operator = SalesforceApexRestOperator(
task_id="apex_task", method='POST', endpoint='User/Activity', payload=payload
)
# [END howto_salesforce_apex_rest_operator]
2 changes: 0 additions & 2 deletions airflow/providers/salesforce/hooks/salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ def get_available_fields(self, obj: str) -> List[str]:
:return: the names of the fields.
:rtype: list(str)
"""
self.get_conn()

obj_description = self.describe_object(obj)

return [field['name'] for field in obj_description['fields']]
Expand Down
66 changes: 66 additions & 0 deletions airflow/providers/salesforce/operators/salesforce_apex_rest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.models import BaseOperator
from airflow.providers.salesforce.hooks.salesforce import SalesforceHook


class SalesforceApexRestOperator(BaseOperator):
"""
Execute a APEX Rest API action
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:SalesforceApexRestOperator`
:param endpoint: The REST endpoint for the request.
:type endpoint: str
:param method: HTTP method for the request (default GET)
:type method: str
:param payload: A dict of parameters to send in a POST / PUT request
:type payload: str
:param salesforce_conn_id: The :ref:`Salesforce Connection id <howto/connection:SalesforceHook>`.
:type salesforce_conn_id: str
"""

def __init__(
self,
*,
endpoint: str,
method: str = 'GET',
payload: dict = None,
salesforce_conn_id: str = 'salesforce_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.endpoint = endpoint
self.method = method
self.payload = payload
self.salesforce_conn_id = salesforce_conn_id

def execute(self, context: dict) -> dict:
"""
Makes an HTTP request to an APEX REST endpoint and pushes results to xcom.
:param context: The task context during execution.
:type context: dict
:return: Apex response
:rtype: dict
"""
sf_hook = SalesforceHook(salesforce_conn_id=self.salesforce_conn_id)
conn = sf_hook.get_conn()
result = conn.apexecute(action=self.endpoint, method=self.method, data=self.payload)
if self.do_xcom_push:
return result
3 changes: 3 additions & 0 deletions airflow/providers/salesforce/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ additional-dependencies:
integrations:
- integration-name: Salesforce
external-doc-url: https://www.salesforce.com/
how-to-guide:
- /docs/apache-airflow-providers-salesforce/operators/salesforce_apex_rest.rst
logo: /integration-logos/salesforce/Salesforce.png
tags: [service]

operators:
- integration-name: Salesforce
python-modules:
- airflow.providers.salesforce.operators.salesforce_apex_rest
- airflow.providers.salesforce.operators.tableau_refresh_workbook

sensors:
Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow-providers-salesforce/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Content
:caption: Guides

Connection types <connections/salesforce>
Operators <operators/index>

.. toctree::
:maxdepth: 1
Expand Down
26 changes: 26 additions & 0 deletions docs/apache-airflow-providers-salesforce/operators/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Salesforce Operators
====================

.. toctree::
:maxdepth: 1

salesforce_apex_rest
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. _howto/operator:SalesforceApexRestOperator:

SalesforceApexRestOperator
==========================

Use the :class:`~airflow.providers.salesforce.operators.salesforce_apex_rest.SalesforceApexRestOperator` to execute Apex Rest.


Using the Operator
^^^^^^^^^^^^^^^^^^
You can also use this library to call custom Apex methods:

This would call the endpoint ``https://<instance>.salesforce.com/services/apexrest/User/Activity`` with ``payload`` as
the body content encoded with ``json.dumps``

.. exampleinclude:: /../../airflow/providers/salesforce/example_dags/example_salesforce_apex_rest.py
:language: python
:start-after: [START howto_salesforce_apex_rest_operator]
:end-before: [END howto_salesforce_apex_rest_operator]

You can read more about Apex on the
`Force.com Apex Code Developer's Guide <https://developer.salesforce.com/docs/atlas.en-us.apexcode.meta/apexcode/apex_dev_guide.htm>`__.
4 changes: 1 addition & 3 deletions tests/providers/salesforce/hooks/test_salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,17 +289,15 @@ def test_describe_object(self, mock_salesforce):
mock_salesforce.return_value.__getattr__(obj).describe.assert_called_once_with()
assert obj_description == mock_salesforce.return_value.__getattr__(obj).describe.return_value

@patch("airflow.providers.salesforce.hooks.salesforce.SalesforceHook.get_conn")
@patch(
"airflow.providers.salesforce.hooks.salesforce.SalesforceHook.describe_object",
return_value={"fields": [{"name": "field_1"}, {"name": "field_2"}]},
)
def test_get_available_fields(self, mock_describe_object, mock_get_conn):
def test_get_available_fields(self, mock_describe_object):
obj = "obj_name"

available_fields = self.salesforce_hook.get_available_fields(obj)

mock_get_conn.assert_called_once_with()
mock_describe_object.assert_called_once_with(obj)
assert available_fields == ["field_1", "field_2"]

Expand Down
49 changes: 49 additions & 0 deletions tests/providers/salesforce/operators/test_salesforce_apex_rest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
from unittest.mock import Mock, patch

from airflow.providers.salesforce.operators.salesforce_apex_rest import SalesforceApexRestOperator


class TestSalesforceApexRestOperator(unittest.TestCase):
"""
Test class for SalesforceApexRestOperator
"""

@patch('airflow.providers.salesforce.operators.salesforce_apex_rest.SalesforceHook.get_conn')
def test_execute_salesforce_apex_rest(self, mock_get_conn):
"""
Test execute apex rest
"""

endpoint = 'User/Activity'
method = 'POST'
payload = {"activity": [{"user": "12345", "action": "update page", "time": "2014-04-21T13:00:15Z"}]}

mock_get_conn.return_value.apexecute = Mock()

operator = SalesforceApexRestOperator(
task_id='task', endpoint=endpoint, method=method, payload=payload
)

operator.execute(context={})

mock_get_conn.return_value.apexecute.assert_called_once_with(
action=endpoint, method=method, data=payload
)

0 comments on commit e9a72a4

Please sign in to comment.