Skip to content

Commit

Permalink
server: call the webhook asynchronously in event triggers (#5352)
Browse files Browse the repository at this point in the history
* server: call the webhook asynchronosly in event triggers
  • Loading branch information
codingkarthik committed Jul 10, 2020
1 parent cefe06e commit 0ef5229
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

(Add entries here in the order of: server, console, cli, docs, others)

- server: process events generated by the event triggers asynchronously (close #5189) (#5352)
- console: display line number that error originated from in GraphQL editor (close #4849) (#4942)

## `v1.3.0-beta.4`
Expand Down
7 changes: 5 additions & 2 deletions server/src-lib/Hasura/Eventing/EventTrigger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ module Hasura.Eventing.EventTrigger
) where


import Control.Concurrent.Async (wait, withAsync)
import Control.Concurrent.Async (async, link, wait, withAsync)
import Control.Concurrent.Extended (sleep)
import Control.Concurrent.STM.TVar
import Control.Monad.Catch (MonadMask, bracket_)
Expand Down Expand Up @@ -189,10 +189,13 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx
eventsNext <- withAsync popEventsBatch $ \eventsNextA -> do
-- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE:
forM_ events $ \event -> do
runReaderT (withEventEngineCtx eeCtx $ (processEvent event)) (logger, httpMgr)
t <- async $ runReaderT (withEventEngineCtx eeCtx $ (processEvent event)) (logger, httpMgr)
-- removing an event from the _eeCtxLockedEvents after the event has
-- been processed
removeEventFromLockedEvents (eId event) leEvents
link t

-- return when next batch ready; some 'processEvent' threads may be running.
wait eventsNextA

let lenEvents = length events
Expand Down
26 changes: 26 additions & 0 deletions server/tests-py/queries/event_triggers/async_execution/setup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
type: bulk
args:
- type: run_sql
args:
sql: |
create table hge_tests.test_t1(
c1 int,
c2 text
);
- type: track_table
args:
schema: hge_tests
name: test_t1
- type: create_event_trigger
args:
name: t1_timeout_long
table:
schema: hge_tests
name: test_t1
insert:
columns: "*"
update:
columns: "*"
delete:
columns: "*"
webhook: http://127.0.0.1:5592/timeout_long
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type: bulk
args:
- type: delete_event_trigger
args:
name: t1_timeout_long
- type: run_sql
args:
sql: |
drop table hge_tests.test_t1;
34 changes: 34 additions & 0 deletions server/tests-py/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,3 +588,37 @@ def test_basic(self, hge_ctx, evts_webhook):
assert st_code == 200, resp
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/manual_events/disabled.yaml')
assert st_code == 400, resp

@usefixtures('per_method_tests_db_state')
class TestEventsAsynchronousExecution(object):

@classmethod
def dir(cls):
return 'queries/event_triggers/async_execution'

def test_async_execution(self,hge_ctx,evts_webhook):
"""
A test to check if the events generated by the graphql-engine are
processed asynchronously. This test measures the time taken to process
all the events and that time should definitely be lesser than the time
taken if the events were to be executed sequentially.
This test inserts 5 rows and the webhook(/timeout_long) takes
~5 seconds to process one request. So, if the graphql-engine
were to process the events sequentially it will take 5 * 5 = 25 seconds.
Theorotically, all the events should have been processed in ~5 seconds,
adding a 5 seconds buffer to the comparision, so that this test
doesn't flake in the CI.
"""
table = {"schema": "hge_tests", "name": "test_t1"}

payload = range(1,6)
rows = list(map(lambda x: {"c1": x, "c2": "hello"}, payload))
st_code, resp = insert_many(hge_ctx, table, rows)
start_time = time.perf_counter()
assert st_code == 200, resp
for i in range(1,6):
_ = evts_webhook.get_event(7) # webhook takes 5 seconds to process a request
end_time = time.perf_counter()
time_elapsed = end_time - start_time
assert time_elapsed < 10

0 comments on commit 0ef5229

Please sign in to comment.