Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: call the webhook asynchronously in event triggers #5352

Merged
merged 2 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

(Add entries here in the order of: server, console, cli, docs, others)

- server: process events generated by the event triggers asynchronously (close #5189) (#5352)
- console: display line number that error originated from in GraphQL editor (close #4849) (#4942)

## `v1.3.0-beta.4`
Expand Down
7 changes: 5 additions & 2 deletions server/src-lib/Hasura/Eventing/EventTrigger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ module Hasura.Eventing.EventTrigger
) where


import Control.Concurrent.Async (wait, withAsync)
import Control.Concurrent.Async (async, link, wait, withAsync)
import Control.Concurrent.Extended (sleep)
import Control.Concurrent.STM.TVar
import Control.Monad.Catch (MonadMask, bracket_)
Expand Down Expand Up @@ -189,10 +189,13 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx
eventsNext <- withAsync popEventsBatch $ \eventsNextA -> do
-- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE:
forM_ events $ \event -> do
runReaderT (withEventEngineCtx eeCtx $ (processEvent event)) (logger, httpMgr)
t <- async $ runReaderT (withEventEngineCtx eeCtx $ (processEvent event)) (logger, httpMgr)
-- removing an event from the _eeCtxLockedEvents after the event has
-- been processed
removeEventFromLockedEvents (eId event) leEvents
link t

-- return when next batch ready; some 'processEvent' threads may be running.
wait eventsNextA

let lenEvents = length events
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
type: bulk
args:
- type: run_sql
args:
sql: |
create table hge_tests.test_t1(
c1 int,
c2 text
);
- type: track_table
args:
schema: hge_tests
name: test_t1
- type: create_event_trigger
args:
name: t1_timeout_long
table:
schema: hge_tests
name: test_t1
insert:
columns: "*"
update:
columns: "*"
delete:
columns: "*"
webhook: http://127.0.0.1:5592/timeout_long
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type: bulk
args:
- type: delete_event_trigger
args:
name: t1_timeout_long
- type: run_sql
args:
sql: |
drop table hge_tests.test_t1;
34 changes: 34 additions & 0 deletions server/tests-py/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,3 +588,37 @@ def test_basic(self, hge_ctx, evts_webhook):
assert st_code == 200, resp
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/manual_events/disabled.yaml')
assert st_code == 400, resp

@usefixtures('per_method_tests_db_state')
class TestEventsAsynchronousExecution(object):

@classmethod
def dir(cls):
return 'queries/event_triggers/async_execution'

def test_async_execution(self,hge_ctx,evts_webhook):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"""
A test to check if the events generated by the graphql-engine are
processed asynchronously. This test measures the time taken to process
all the events and that time should definitely be lesser than the time
taken if the events were to be executed sequentially.
This test inserts 5 rows and the webhook(/timeout_long) takes
~5 seconds to process one request. So, if the graphql-engine
were to process the events sequentially it will take 5 * 5 = 25 seconds.
Theorotically, all the events should have been processed in ~5 seconds,
adding a 5 seconds buffer to the comparision, so that this test
doesn't flake in the CI.
"""
table = {"schema": "hge_tests", "name": "test_t1"}

payload = range(1,6)
rows = list(map(lambda x: {"c1": x, "c2": "hello"}, payload))
st_code, resp = insert_many(hge_ctx, table, rows)
start_time = time.perf_counter()
assert st_code == 200, resp
for i in range(1,6):
_ = evts_webhook.get_event(7) # webhook takes 5 seconds to process a request
end_time = time.perf_counter()
time_elapsed = end_time - start_time
assert time_elapsed < 10