Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge pull request #250 from Tungstenium/scheduler-rework
scripting/lua: Rework handling of coroutines and promises
  • Loading branch information
blattersturm committed Jul 2, 2019
2 parents 050fa31 + f5e7709 commit eff4de7
Showing 1 changed file with 67 additions and 128 deletions.
195 changes: 67 additions & 128 deletions data/shared/citizen/scripting/lua/scheduler.lua
@@ -1,18 +1,38 @@
local threads = {}
local curThread
local curThreadIndex

--[[
Thread handling
]]
local function resumeThread(coro) -- Internal utility
if coroutine.status(coro) == "dead" then
threads[coro] = nil
return false
end

local ok, wakeTimeOrErr = coroutine.resume(coro)

if ok then
local thread = threads[coro]
if thread then
thread.wakeTime = wakeTimeOrErr or 0
end
else
Citizen.Trace("Error resuming coroutine: " .. debug.traceback(coro, wakeTimeOrErr) .. "\n")
end
-- Return not finished
return coroutine.status(coro) ~= "dead"
end

function Citizen.CreateThread(threadFunction)
table.insert(threads, {
coroutine = coroutine.create(threadFunction),
threads[coroutine.create(threadFunction)] = {
wakeTime = 0
})
}
end

function Citizen.Wait(msec)
curThread.wakeTime = GetGameTimer() + msec

coroutine.yield()
coroutine.yield(GetGameTimer() + msec)
end

-- legacy alias (and to prevent people from calling the game's function)
Expand All @@ -21,156 +41,75 @@ CreateThread = Citizen.CreateThread

function Citizen.CreateThreadNow(threadFunction)
local coro = coroutine.create(threadFunction)

local t = {
coroutine = coro,
threads[coro] = {
wakeTime = 0
}

-- add new thread and save old thread
local oldThread = curThread
curThread = t

local result, err = coroutine.resume(coro)

local resumedThread = curThread
-- restore last thread
curThread = oldThread

if err then
error('Failed to execute thread: ' .. debug.traceback(coro, err))
end

if resumedThread and coroutine.status(coro) ~= 'dead' then
table.insert(threads, t)
end

return coroutine.status(coro) ~= 'dead'
return resumeThread(coro)
end

local inNext

function Citizen.Await(promise)
if not curThread then
local coro = coroutine.running()
if not coro then
error("Current execution context is not in the scheduler, you should use CreateThread / SetTimeout or Event system (AddEventHandler) to be able to Await")
end

-- Remove current thread from the pool (avoid resume from the loop)
if curThreadIndex then
table.remove(threads, curThreadIndex)
end

curThreadIndex = nil
local resumableThread = curThread

inNext = true
local nextResult
local nextErr
local resolved

promise:next(function (result)
-- was already resolved? then resolve instantly
if inNext then
nextResult = result
resolved = true

return
end

-- Reattach thread
table.insert(threads, resumableThread)

curThread = resumableThread
curThreadIndex = #threads
-- Indicates if the promise has already been resolved or rejected
-- This is a hack since the API does not expose its state
local isDone = false
local result, err
promise = promise:next(function(...)
isDone = true
result = {...}
end,function(error)
isDone = true
err = error
end)

local result, err = coroutine.resume(resumableThread.coroutine, result)
if not isDone then
local threadData = threads[coro]
threads[coro] = nil

if err then
error('Failed to resume thread: ' .. debug.traceback(resumableThread.coroutine, err))
local function reattach()
threads[coro] = threadData
resumeThread(coro)
end

return result
end, function (err)
if err then
-- if already rejected, handle rejection instantly
if inNext then
nextErr = err
resolved = true

return
end

-- resume with error
local result, coroErr = coroutine.resume(resumableThread.coroutine, nil, err)

if coroErr then
Citizen.Trace('Await failure: ' .. debug.traceback(resumableThread.coroutine, coroErr, 2))
end
end
end)

inNext = false

if resolved then
if nextErr then
error(nextErr)
end

return nextResult
promise:next(reattach, reattach)
Citizen.Wait(0)
end

curThread = nil
local result, err = coroutine.yield()


if err then
error(err)
end

return result
end

-- SetTimeout
local timeouts = {}
return table.unpack(result)
end

function Citizen.SetTimeout(msec, callback)
table.insert(threads, {
coroutine = coroutine.create(callback),
local coro = coroutine.create(callback)
threads[coro] = {
wakeTime = GetGameTimer() + msec
})
}
end

SetTimeout = Citizen.SetTimeout

Citizen.SetTickRoutine(function()
local curTime = GetGameTimer()

for i = #threads, 1, -1 do
local thread = threads[i]

for coro, thread in pairs(threads) do
if curTime >= thread.wakeTime then
curThread = thread
curThreadIndex = i

local status = coroutine.status(thread.coroutine)

if status == 'dead' then
table.remove(threads, i)
else
local result, err = coroutine.resume(thread.coroutine)

if not result then
Citizen.Trace("Error resuming coroutine: " .. debug.traceback(thread.coroutine, err) .. "\n")

table.remove(threads, i)
end
end
resumeThread(coro)
end
end

curThread = nil
curThreadIndex = nil
end)

--[[
Event handling
]]

local alwaysSafeEvents = {
["playerDropped"] = true,
["playerConnecting"] = true
Expand Down Expand Up @@ -568,7 +507,7 @@ end

-- RPC INVOCATION
InvokeRpcEvent = function(source, ref, args)
if not curThread then
if not coroutine.running() then
error('RPC delegates can only be invoked from a thread.')
end

Expand Down Expand Up @@ -628,7 +567,7 @@ local funcref_mt = {
local rvs = msgpack.unpack(rv)

-- handle async retvals from refs
if rvs and type(rvs[1]) == 'table' and rawget(rvs[1], '__cfx_async_retval') and curThread then
if rvs and type(rvs[1]) == 'table' and rawget(rvs[1], '__cfx_async_retval') and coroutine.running() then
local p = promise.new()

rvs[1].__cfx_async_retval(function(r, e)
Expand Down

0 comments on commit eff4de7

Please sign in to comment.