Skip to content

Commit

Permalink
Kuba openfaas sync call (#13356)
Browse files Browse the repository at this point in the history
* adds invoke_function() - synchronous call in addition to existing asynchronous call

* adds invoke_function() - synchronous call in addition to existing asynchronous call

* airflow faas upgrade

* passing body consistently without json=body

* removing return() statement that must be causing static checks to fail

* removing return() statement that must be causing static checks to fail

* resolving static check issues

* resolving static check issues

* resolving static check issues

* resolving static check issues
  • Loading branch information
kubatyszko committed Jan 1, 2021
1 parent 57143d6 commit abf34b8
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
18 changes: 16 additions & 2 deletions airflow/providers/openfaas/hooks/openfaas.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class OpenFaasHook(BaseHook):

GET_FUNCTION = "/system/function/"
INVOKE_ASYNC_FUNCTION = "/async-function/"
INVOKE_FUNCTION = "/function/"
DEPLOY_FUNCTION = "/system/functions"
UPDATE_FUNCTION = "/system/functions"

Expand Down Expand Up @@ -68,16 +69,29 @@ def deploy_function(self, overwrite_function_if_exist: bool, body: Dict[str, Any
self.log.info("Function deployed %s", self.function_name)

def invoke_async_function(self, body: Dict[str, Any]) -> None:
"""Invoking function"""
"""Invoking function asynchronously"""
url = self.get_conn().host + self.INVOKE_ASYNC_FUNCTION + self.function_name
self.log.info("Invoking function %s", url)
self.log.info("Invoking function asynchronously %s", url)
response = requests.post(url, body)
if response.ok:
self.log.info("Invoked %s", self.function_name)
else:
self.log.error("Response status %d", response.status_code)
raise AirflowException('failed to invoke function')

def invoke_function(self, body: Dict[str, Any]) -> None:
"""Invoking function synchronously, will block until function completes and returns"""
url = self.get_conn().host + self.INVOKE_FUNCTION + self.function_name
self.log.info("Invoking function synchronously %s", url)
response = requests.post(url, body)
if response.ok:
self.log.info("Invoked %s", self.function_name)
self.log.info("Response code %s", response.status_code)
self.log.info("Response %s", response.text)
else:
self.log.error("Response status %d", response.status_code)
raise AirflowException('failed to invoke function')

def update_function(self, body: Dict[str, Any]) -> None:
"""Update OpenFaaS function"""
url = self.get_conn().host + self.UPDATE_FUNCTION
Expand Down
28 changes: 28 additions & 0 deletions tests/providers/openfaas/hooks/test_openfaas.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
class TestOpenFaasHook(unittest.TestCase):
GET_FUNCTION = "/system/function/"
INVOKE_ASYNC_FUNCTION = "/async-function/"
INVOKE_FUNCTION = "/function/"
DEPLOY_FUNCTION = "/system/functions"
UPDATE_FUNCTION = "/system/functions"

Expand Down Expand Up @@ -88,6 +89,33 @@ def test_update_function_false(self, mock_get_connection, m):
self.hook.update_function({})
self.assertIn('failed to update ' + FUNCTION_NAME, str(context.exception))

@mock.patch.object(BaseHook, 'get_connection')
@requests_mock.mock()
def test_invoke_function_false(self, mock_get_connection, m):
m.post(
"http://open-faas.io" + self.INVOKE_FUNCTION + FUNCTION_NAME,
json=self.mock_response,
status_code=400,
)
mock_connection = Connection(host="http://open-faas.io")
mock_get_connection.return_value = mock_connection

with self.assertRaises(AirflowException) as context:
self.hook.invoke_function({})
self.assertIn('failed to invoke function', str(context.exception))

@mock.patch.object(BaseHook, 'get_connection')
@requests_mock.mock()
def test_invoke_function_true(self, mock_get_connection, m):
m.post(
"http://open-faas.io" + self.INVOKE_FUNCTION + FUNCTION_NAME,
json=self.mock_response,
status_code=200,
)
mock_connection = Connection(host="http://open-faas.io")
mock_get_connection.return_value = mock_connection
self.assertEqual(self.hook.invoke_function({}), None)

@mock.patch.object(BaseHook, 'get_connection')
@requests_mock.mock()
def test_invoke_async_function_false(self, mock_get_connection, m):
Expand Down

0 comments on commit abf34b8

Please sign in to comment.