Skip to content

Commit

Permalink
HttpHook: Use request factory and respect defaults (#14701)
Browse files Browse the repository at this point in the history
Use Request's session.request factory for HTTP request initiation, this will use
environment variables and sensible defaults for requests.

Also use verify option only if it is provided to run method, as requests library
already defaults to True.

Our organization uses firewalls and custom SSL certificates to communicate
between systems, this can be achieved via `CURL_CA_BUNDLE` and
`REQUESTS_CA_BUNDLE` environment variables.  Requests library takes both into
account and uses them as default value for verify option when sending request to
remote system.

Current implementation is setting verify to True, which overwrites defaults and
as results requests can not be made due to SSL verification issues. This PR is
fixing the problem.
  • Loading branch information
ngaranko committed May 4, 2021
1 parent cf6324e commit ca432ee
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 9 deletions.
25 changes: 16 additions & 9 deletions airflow/providers/http/hooks/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,23 @@ def run_and_check(
"""
extra_options = extra_options or {}

settings = session.merge_environment_settings(
prepped_request.url,
proxies=extra_options.get("proxies", {}),
stream=extra_options.get("stream", False),
verify=extra_options.get("verify"),
cert=extra_options.get("cert"),
)

# Send the request.
send_kwargs = {
"timeout": extra_options.get("timeout"),
"allow_redirects": extra_options.get("allow_redirects", True),
}
send_kwargs.update(settings)

try:
response = session.send(
prepped_request,
stream=extra_options.get("stream", False),
verify=extra_options.get("verify", True),
proxies=extra_options.get("proxies", {}),
cert=extra_options.get("cert"),
timeout=extra_options.get("timeout"),
allow_redirects=extra_options.get("allow_redirects", True),
)
response = session.send(prepped_request, **send_kwargs)

if extra_options.get('check_response', True):
self.check_response(response)
Expand Down
74 changes: 74 additions & 0 deletions tests/providers/http/hooks/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
# specific language governing permissions and limitations
# under the License.
import json
import os
import unittest
from collections import OrderedDict
from unittest import mock

import pytest
Expand Down Expand Up @@ -279,5 +281,77 @@ def match_obj1(request):
# will raise NoMockAddress exception if obj1 != request.json()
HttpHook(method=method).run('v1/test', json=obj1)

@mock.patch('airflow.providers.http.hooks.http.requests.Session.send')
def test_verify_set_to_true_by_default(self, mock_session_send):
with mock.patch(
'airflow.hooks.base.BaseHook.get_connection', side_effect=get_airflow_connection_with_port
):
self.get_hook.run('/some/endpoint')
mock_session_send.assert_called_once_with(
mock.ANY,
allow_redirects=True,
cert=None,
proxies=OrderedDict(),
stream=False,
timeout=None,
verify=True,
)

@mock.patch('airflow.providers.http.hooks.http.requests.Session.send')
@mock.patch.dict(os.environ, {"REQUESTS_CA_BUNDLE": "/tmp/test.crt"})
def test_requests_ca_bundle_env_var(self, mock_session_send):
with mock.patch(
'airflow.hooks.base.BaseHook.get_connection', side_effect=get_airflow_connection_with_port
):

self.get_hook.run('/some/endpoint')

mock_session_send.assert_called_once_with(
mock.ANY,
allow_redirects=True,
cert=None,
proxies=OrderedDict(),
stream=False,
timeout=None,
verify='/tmp/test.crt',
)

@mock.patch('airflow.providers.http.hooks.http.requests.Session.send')
@mock.patch.dict(os.environ, {"REQUESTS_CA_BUNDLE": "/tmp/test.crt"})
def test_verify_respects_requests_ca_bundle_env_var(self, mock_session_send):
with mock.patch(
'airflow.hooks.base.BaseHook.get_connection', side_effect=get_airflow_connection_with_port
):

self.get_hook.run('/some/endpoint', extra_options={'verify': True})

mock_session_send.assert_called_once_with(
mock.ANY,
allow_redirects=True,
cert=None,
proxies=OrderedDict(),
stream=False,
timeout=None,
verify='/tmp/test.crt',
)

@mock.patch('airflow.providers.http.hooks.http.requests.Session.send')
@mock.patch.dict(os.environ, {"REQUESTS_CA_BUNDLE": "/tmp/test.crt"})
def test_verify_false_parameter_overwrites_set_requests_ca_bundle_env_var(self, mock_session_send):
with mock.patch(
'airflow.hooks.base.BaseHook.get_connection', side_effect=get_airflow_connection_with_port
):
self.get_hook.run('/some/endpoint', extra_options={'verify': False})

mock_session_send.assert_called_once_with(
mock.ANY,
allow_redirects=True,
cert=None,
proxies=OrderedDict(),
stream=False,
timeout=None,
verify=False,
)


send_email_test = mock.Mock()
3 changes: 3 additions & 0 deletions tests/providers/http/sensors/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ def prepare_request(self, request):
self.response._content += ('/' + request.params['date']).encode('ascii', 'ignore')
return self.response

def merge_environment_settings(self, _url, **kwargs):
return kwargs


class TestHttpOpSensor(unittest.TestCase):
def setUp(self):
Expand Down

0 comments on commit ca432ee

Please sign in to comment.