Cross Worker Events for Nginx in Pure Lua
Lua Makefile
Latest commit 1514e8a Nov 15, 2016 @Tieske Tieske committed on GitHub fix(log) implemented a warning if an event handler takes too long to …
…process an event (#2)

README.markdown

Name

lua-resty-worker-events - Inter process events for Nginx worker processes

Table of Contents

Status

This library is still under early development.

Synopsis

http {
    lua_package_path "/path/to/lua-resty-worker-events/lib/?.lua;;";

    # the size depends on the number of event to handle:
    lua_shared_dict process_events 1m;

    init_worker_by_lua_block {
        local ev = require "resty.worker.events"

        local handler = function(data, event, source, pid)
            print("received event; source=",source,
                  ", event=",event,
                  ", data=", tostring(data),
                  ", from process ",pid)
        end

        ev.register(handler)

        local ok, err = ev.configure {
            shm = "process_events", -- defined by "lua_shared_dict"
            timeout = 2,            -- life time of event data in shm
            interval = 1,           -- poll interval (seconds)

            wait_interval = 0.010,  -- wait before retry fetching event data
            wait_max = 0.5,         -- max wait time before discarding event
        }
        if not ok then
            ngx.log(ngx.ERR, "failed to start event system: ", err)
            return
        end
    }

    server {
        ...

        # example for polling:
        location = /some/path {

            default_type text/plain;
            content_by_lua_block {
                -- manually call `poll` to stay up to date, can be used instead,
                -- or together with the timer interval. Polling is efficient,
                -- so if staying up-to-date is important, this is preferred.
                require("resty.worker.events").poll()

                -- do regular stuff here

            }
        }
    }
}

Description

Back to TOC

This module provides a way to send events to the other worker processes in an Nginx server. Communication is through a shared memory zone where event data will be stored.

The order of events in all workers is guaranteed to be the same.

The workerprocess will setup a timer to check for events in the background. The module follows a singleton pattern and hence runs once per worker. If staying up-to-date is important though, the interval can be set to a lesser frequency and a call to poll upon each request received makes sure everything is handled as soon as possible.

The design allows for 3 usecases;

  1. broadcast an event to all workers processes, see post. In this case the order of the events is guaranteed to be the same in all worker processes. Example; a healthcheck running in one worker, but informing all workers of a failed upstream node.
  2. broadcast an event to the local worker only, see post_local.
  3. coalesce external events to a single action. Example; all workers watch external events indicating an in-memory cache needs to be refreshed. When receiving it they all post it with a unique event hash (all workers generate the same hash), see unique parameter of post. Now only 1 worker will receive the event only once, so only one worker will hit the upstream database to refresh the in-memory data.

This module itself will fire two events with source="resty-worker-events";

  • event="started" when the module is first configured (note: the event handler must be registered before calling configure to be able to catch the event)
  • event="stopping" when the worker process exits (based on a timer premature setting)

See event_list for using events without hardcoded magic values/strings.

Methods

Back to TOC

configure

syntax: success, err = events.configure(opts)

Will initialize the event listener. The opts parameter is a Lua table with named options

  • shm: (required) name of the shared memory to use
  • timeout: (optional) timeout of event data stored in shm (in seconds), default 2
  • interval: (optional) interval to poll for events (in seconds), default 1
  • wait_interval: (optional) interval between two tries when a new eventid is found, but the data is not available yet (due to asynchronous behaviour of the worker processes)
  • wait_max: (optional) max time to wait for data when event id is found, before discarding the event. This is a fail-safe setting in case something went wrong.

The return value will be true, or nil and an error message.

This method can be called repeatedly to update the settings, except for the shm value which cannot be changed after the initial configuration.

NOTE: the wait_interval is executed using the ngx.sleep function. In contexts where this function is not available (eg. init_worker) it will execute a busy-wait to execute the delay.

Back to TOC

configured

syntax: is_already_configured = events.configured()

The events module runs as a singelton per workerprocess. The configured function allows to check whether it is already up and running. A check before starting any dependencies is recommended;

local events = require "resty.worker.events"

local initialization_of_my_module = function()
    assert(events.configured(), "Please configure the 'lua-resty-worker-events' "..
           "module before using my_module")

    -- do initialization here
end

Back to TOC

event_list

syntax: _M.events = events.event_list(sourcename, event1, event2, ...)

Utility function to generate event lists and prevent typos in magic strings. Accessing a non-existing event on the returned table will result in an 'unknown event error'. The first parameter sourcename is a unique name that identifies the event source, which will be available as field _source. All following parameters are the named events generated by the event source.

Example usage;

local ev = require "resty.worker.events"

-- Event source example

local events = ev.event_list(
        "my-module-event-source", -- available as _M.events._source
        "started",                -- available as _M.events.started 
        "event2"                  -- available as _M.events.event2
    )

local raise_event = function(event, data)
    return ev.post(events._source, event, data)
end

-- Post my own 'started' event
raise_event(events.started, nil) -- nil for clarity, no eventdata is passed

-- define my module table
local _M = {
  events = events   -- export events table

  -- implementation goes here
}  
return _M



-- Event client example;
local mymod = require("some_module")  -- module with an `events` table

-- define a callback and use source modules events table
local my_callback = function(data, event, source, pid)
    if event == mymod.events.started then  -- 'started' is the event name

        -- started event from the resty-worker-events module

    elseif event == mymod.events.stoppping then  -- 'stopping' is the event name

        -- the above will throw an error because of the typo in `stoppping` 

    end
end

ev.register(my_callback, mymod.events._source) 

Back to TOC

poll

syntax: success, err = events.poll()

Will poll for new events and handle them all (call the registered callbacks). The implementation is efficient, it will only check a single shared memory value and return immediately if no new events are available.

The return value will be true when it handled all events, false if it was already in a polling-loop, or nil + error if something went wrong. The false result generally happens when posting an event from an eventhandler. The eventhandler was called from poll, and when posting an event, the post methods will also call poll after posting the event, causing a loop. The false result simply means that the event was succesfully posted, but not handled yet, due to other events ahead of it that need to be handled first.

Back to TOC

post

syntax: success, err = events.post(source, event, data, unique)

Will post a new event. source and event are both strings. data can be anything (including nil) as long as it is (de)serializable by the cjson module.

If the unique parameter is provided then only one worker will execute the event, the other workers will ignore it. Also any follow up events with the same unique value will be ignored (for the timeout period specified to configure). The process executing the event will not necessarily be the process posting the event.

Before returning, it will call poll to handle all events up to and including the newly posted event. Check the return value to make sure it completed, see poll.

The return value will be the result from poll.

Note: the worker process sending the event, will also receive the event! So if the eventsource will also act upon the event, it should not do so from the event posting code, but only when receiving it.

Back to TOC

post_local

syntax: success, err = events.post_local(source, event, data)

The same as post except that the event will be local to the worker process, it will not be broadcasted to other workers. With this method, the data element will not be jsonified.

Before returning, it will call poll to first handle the posted event and then handle all other newly posted events. Check the return value to make sure it completed, see poll.

The return value will be the result from poll.

Back to TOC

register

syntax: events.register(callback, source, event1, event2, ...)

Will register a callback function to receive events. If source and event are omitted, then the callback will be executed on every event, if source is provided, then only events with a matching source will be passed. If (one or more) event name is given, then only when both source and event match the callback is invoked.

The callback should have the following signature;

syntax: callback = function(data, event, source, pid)

The parameters will be the same as the ones provided to post, except for the extra value pid which will be the pid of the originating worker process, or nil if it was a local event only. Any return value from callback will be discarded. Note: data may be a reference type of data (eg. a Lua table type). The same value is passed to all callbacks, so do not change the value in your handler, unless you know what you are doing!

The return value of register will be true, or it will throw an error if callback is not a function value.

WARNING: event handlers must return quickly. If a handler takes more time than the configured timeout value, events will be dropped!

Note: to receive the process own started event, the handler must be registered before calling configure

Back to TOC

unregister

syntax: events.unregister(callback, source, event1, event2, ...)

Will unregister the callback function and prevent it from receiving further events. The parameters work exactly the same as with register.

The return value will be true if it was removed, false if it was not in the handlers list, or it will throw an error if callback is not a function value.

Back to TOC

Installation

Nothing special is required, install like any other pure Lua module. Just make sure its location is in the module search path.

Back to TOC

TODO

Back to TOC

  • activate and implement the first test, after fixing the "stopping" event

Community

Back to TOC

English Mailing List

The openresty-en mailing list is for English speakers.

Back to TOC

Chinese Mailing List

The openresty mailing list is for Chinese speakers.

Back to TOC

Bugs and Patches

Please report bugs or submit patches by

  1. creating a ticket on the GitHub Issue Tracker,
  2. or posting to the OpenResty community.

Back to TOC

Author

Thijs Schreijer thijs@mashape.com, Mashape Inc.

Back to TOC

Copyright and License

This module is licensed under the Apache 2.0 license.

Copyright (C) 2016, by Thijs Schreijer, Mashape Inc.

All rights reserved.

Back to TOC

See Also

Back to TOC