Skip to content

Abstraction over Lua coroutines to nicely build and synchronize asynchronous operations.

License

Notifications You must be signed in to change notification settings

ImagicTheCat/Luaseq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

48 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Luaseq

Luaseq is an abstraction over Lua coroutines (aka. cooperative threads or VM threads [1]) to nicely build and synchronize asynchronous operations.

An asynchronous operation relative to a system thread can be synchronous relative to a coroutine.

Install

🔥
PUC Lua 5.1 is not supported (lack of xpcall arguments and yield across pcall).

Concept

Task

A task represents an asynchronous operation; it may be a standalone handle or an asynchronous function call wrapped as a coroutine.

Other operations can wait for the task completion, e.g. from multiple coroutines. This may be used to design complex asynchronous dependencies.

Mutex

A mutex (Mutual Exclusion) is useful even for cooperative VM threads (coroutines). While the fundamental operations are not executed in parallel, higher asynchronous operations semantically can.

⚠️
VM thread mutexes, as for OS thread mutexes, must be carefully used to avoid deadlocks (in this case, between coroutines).

Semaphore

A semaphore for coroutines is also useful to implement some synchronization patterns.

In this case, the terminology of the semaphore is about the management of resource units. E.g. demand, supply and units instead of POSIX’s wait, post and value.

API

⚠️

To prevent coroutine resume errors from being silently handled, they are propagated to the caller (e.g. task completion or mutex unlocking) and will interrupt resuming of the other waiting coroutines. Instead of catching errors from the resuming side, it is probably better to catch them from the coroutines themselves, where it matters.

This is already handled by the async() API which propagates the errors to the task handle.

ℹ️
Resume errors are recursively propagated using debug.traceback, which may result in multiple stack tracebacks.
⚠️
Task wait/completion, mutex lock/unlocking or semaphore demand/supply, as with callbacks, may transfer the execution to "third-party" code; thus the execution state must be carefully analyzed.

Task

A task is a table where the array part is the list of waiting coroutines/callbacks.

When done, a field is added:

task.r

packed task values (ok, …​) (see table.pack())

Luaseq.async(f, …​)

Asynchronous operation.

No arguments: create a standalone task handle.

With arguments: create a task wrapping an asynchronous function call. I.e. it executes the passed function as a coroutine, like a detached job.

f

function

…​

arguments

Return created task.

task:wait([callback])

Wait for task completion (still usable when done).

No arguments (sync): yield the current coroutine if the task is not done yet. It returns the task return values or propagates the task error.

With arguments (async):

callback(task)

called when the task is done (completion or error)

task(ok, …​)

Task return (completion or termination).

Waiting coroutines/callbacks are resumed in the same order of task:wait calls. Subsequent calls will throw an error.

(ok, …​)

Common soft error handling interface. When ok is truthy, …​ holds the return values, otherwise an error message.

task:complete(…​)

Complete task (equivalent to task(true, …​)).

…​

task return values

task:error(err)

Terminate task with an error (equivalent to task(false, err)).

err

error message

task:done()

Check if the task is done (completed or terminated with an error). Return boolean.

Mutex

A mutex is a table where the array part is the list of locking coroutines, the first being the active one followed by the waiting ones.

mutex.locks

number of active thread locks

mutex.reentrant

exist/true if reentrant

Luaseq.mutex([mode])

Create a mutex.

mode

"reentrant" to make the mutex reentrant

mutex:lock()

Lock the mutex.

mutex:unlock()

Unlock the mutex.

Waiting coroutines are resumed in the same order of mutex:lock calls.

mutex:locked()

Check if the mutex is locked. Return boolean

Semaphore

A semaphore is a table where the array part is the list of demanding/waiting coroutines.

semaphore.units

amount of available units

Luaseq.semaphore(units)

Create a semaphore.

units

initial amount of units, must be >= 0

semaphore:supply()

Supply a unit.

Waiting coroutines are resumed in the same order of demand calls, one per supply call.

semaphore:demand()

Demand a unit.

Yield the current coroutine if no unit is available.

Examples

Example 1. Basic usage

If we have an asynchronous process, like fetching an URL:

local Luaseq = require("Luaseq")
async = Luaseq.async

-- Create the async download function.
function download(url)
  local task = async() -- create task
  http_request(url, function(ok, content_or_error)
    task(ok, content_or_error) -- not simplified for clarity
  end)
  return task:wait() -- wait for the returned values
end

-- Download 10 URLs synchronously.
local download_task = async(function()
  for i=1,10 do
    local content = download("http://foo.bar/"..i..".txt")
    print(content)
  end
end)
Example 2. Mutex

If we have an asynchronous process which saves data to a SQL database:

local Luaseq = require("Luaseq")
async = Luaseq.async

local txn = Luaseq.mutex()

-- Save the state of something using a transaction.
-- query() could be asynchronous too.
function save(thing)
  txn:lock()
  query("START TRANSACTION")
  query("UPDATE ...")
  some_async_task()
  query("UPDATE ...")
  some_async_task()
  query("UPDATE ...")
  query("COMMIT")
  txn:unlock()
end

Now save(thing) can be called from parallel (not fundamentally) tasks without corrupting the transaction.

Example 3. Semaphore

If we have work to queue, but only 4 processing units are available:

local Luaseq = require("Luaseq")
async = Luaseq.async

local UNITS = 4
local sem = Luaseq.semaphore(UNITS)

local function some_async_operation(i, callback)
  -- ...
end

-- release the claimed unit when done
local function finished() sem:supply() end

local task = async(function()
  -- do all the work
  for i=1,1e3 do
    sem:demand() -- claim a unit
    some_async_operation(i, finished)
  end
  -- reclaim all units: wait end of processing
  for i=1,UNITS do sem:demand() end
end)

1. At the exception of VM threads which are not coroutines, e.g. the main thread.

About

Abstraction over Lua coroutines to nicely build and synchronize asynchronous operations.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages