Skip to content

Commit f5e7709

Browse files
committed
scripting/lua: Rework handling of coroutines and promises
Hopefully this can serve as an alternative to #157
1 parent 1341a23 commit f5e7709

File tree

1 file changed

+67
-128
lines changed

1 file changed

+67
-128
lines changed

data/shared/citizen/scripting/lua/scheduler.lua

+67-128
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,38 @@
11
local threads = {}
2-
local curThread
3-
local curThreadIndex
2+
3+
--[[
4+
5+
Thread handling
6+
7+
]]
8+
local function resumeThread(coro) -- Internal utility
9+
if coroutine.status(coro) == "dead" then
10+
threads[coro] = nil
11+
return false
12+
end
13+
14+
local ok, wakeTimeOrErr = coroutine.resume(coro)
15+
16+
if ok then
17+
local thread = threads[coro]
18+
if thread then
19+
thread.wakeTime = wakeTimeOrErr or 0
20+
end
21+
else
22+
Citizen.Trace("Error resuming coroutine: " .. debug.traceback(coro, wakeTimeOrErr) .. "\n")
23+
end
24+
-- Return not finished
25+
return coroutine.status(coro) ~= "dead"
26+
end
427

528
function Citizen.CreateThread(threadFunction)
6-
table.insert(threads, {
7-
coroutine = coroutine.create(threadFunction),
29+
threads[coroutine.create(threadFunction)] = {
830
wakeTime = 0
9-
})
31+
}
1032
end
1133

1234
function Citizen.Wait(msec)
13-
curThread.wakeTime = GetGameTimer() + msec
14-
15-
coroutine.yield()
35+
coroutine.yield(GetGameTimer() + msec)
1636
end
1737

1838
-- legacy alias (and to prevent people from calling the game's function)
@@ -21,156 +41,75 @@ CreateThread = Citizen.CreateThread
2141

2242
function Citizen.CreateThreadNow(threadFunction)
2343
local coro = coroutine.create(threadFunction)
24-
25-
local t = {
26-
coroutine = coro,
44+
threads[coro] = {
2745
wakeTime = 0
2846
}
29-
30-
-- add new thread and save old thread
31-
local oldThread = curThread
32-
curThread = t
33-
34-
local result, err = coroutine.resume(coro)
35-
36-
local resumedThread = curThread
37-
-- restore last thread
38-
curThread = oldThread
39-
40-
if err then
41-
error('Failed to execute thread: ' .. debug.traceback(coro, err))
42-
end
43-
44-
if resumedThread and coroutine.status(coro) ~= 'dead' then
45-
table.insert(threads, t)
46-
end
47-
48-
return coroutine.status(coro) ~= 'dead'
47+
return resumeThread(coro)
4948
end
5049

51-
local inNext
52-
5350
function Citizen.Await(promise)
54-
if not curThread then
51+
local coro = coroutine.running()
52+
if not coro then
5553
error("Current execution context is not in the scheduler, you should use CreateThread / SetTimeout or Event system (AddEventHandler) to be able to Await")
5654
end
5755

58-
-- Remove current thread from the pool (avoid resume from the loop)
59-
if curThreadIndex then
60-
table.remove(threads, curThreadIndex)
61-
end
62-
63-
curThreadIndex = nil
64-
local resumableThread = curThread
65-
66-
inNext = true
67-
local nextResult
68-
local nextErr
69-
local resolved
70-
71-
promise:next(function (result)
72-
-- was already resolved? then resolve instantly
73-
if inNext then
74-
nextResult = result
75-
resolved = true
76-
77-
return
78-
end
79-
80-
-- Reattach thread
81-
table.insert(threads, resumableThread)
82-
83-
curThread = resumableThread
84-
curThreadIndex = #threads
56+
-- Indicates if the promise has already been resolved or rejected
57+
-- This is a hack since the API does not expose its state
58+
local isDone = false
59+
local result, err
60+
promise = promise:next(function(...)
61+
isDone = true
62+
result = {...}
63+
end,function(error)
64+
isDone = true
65+
err = error
66+
end)
8567

86-
local result, err = coroutine.resume(resumableThread.coroutine, result)
68+
if not isDone then
69+
local threadData = threads[coro]
70+
threads[coro] = nil
8771

88-
if err then
89-
error('Failed to resume thread: ' .. debug.traceback(resumableThread.coroutine, err))
72+
local function reattach()
73+
threads[coro] = threadData
74+
resumeThread(coro)
9075
end
9176

92-
return result
93-
end, function (err)
94-
if err then
95-
-- if already rejected, handle rejection instantly
96-
if inNext then
97-
nextErr = err
98-
resolved = true
99-
100-
return
101-
end
102-
103-
-- resume with error
104-
local result, coroErr = coroutine.resume(resumableThread.coroutine, nil, err)
105-
106-
if coroErr then
107-
Citizen.Trace('Await failure: ' .. debug.traceback(resumableThread.coroutine, coroErr, 2))
108-
end
109-
end
110-
end)
111-
112-
inNext = false
113-
114-
if resolved then
115-
if nextErr then
116-
error(nextErr)
117-
end
118-
119-
return nextResult
77+
promise:next(reattach, reattach)
78+
Citizen.Wait(0)
12079
end
121-
122-
curThread = nil
123-
local result, err = coroutine.yield()
124-
80+
12581
if err then
12682
error(err)
12783
end
128-
129-
return result
130-
end
13184

132-
-- SetTimeout
133-
local timeouts = {}
85+
return table.unpack(result)
86+
end
13487

13588
function Citizen.SetTimeout(msec, callback)
136-
table.insert(threads, {
137-
coroutine = coroutine.create(callback),
89+
local coro = coroutine.create(callback)
90+
threads[coro] = {
13891
wakeTime = GetGameTimer() + msec
139-
})
92+
}
14093
end
14194

14295
SetTimeout = Citizen.SetTimeout
14396

14497
Citizen.SetTickRoutine(function()
14598
local curTime = GetGameTimer()
14699

147-
for i = #threads, 1, -1 do
148-
local thread = threads[i]
149-
100+
for coro, thread in pairs(threads) do
150101
if curTime >= thread.wakeTime then
151-
curThread = thread
152-
curThreadIndex = i
153-
154-
local status = coroutine.status(thread.coroutine)
155-
156-
if status == 'dead' then
157-
table.remove(threads, i)
158-
else
159-
local result, err = coroutine.resume(thread.coroutine)
160-
161-
if not result then
162-
Citizen.Trace("Error resuming coroutine: " .. debug.traceback(thread.coroutine, err) .. "\n")
163-
164-
table.remove(threads, i)
165-
end
166-
end
102+
resumeThread(coro)
167103
end
168104
end
169-
170-
curThread = nil
171-
curThreadIndex = nil
172105
end)
173106

107+
--[[
108+
109+
Event handling
110+
111+
]]
112+
174113
local alwaysSafeEvents = {
175114
["playerDropped"] = true,
176115
["playerConnecting"] = true
@@ -568,7 +507,7 @@ end
568507

569508
-- RPC INVOCATION
570509
InvokeRpcEvent = function(source, ref, args)
571-
if not curThread then
510+
if not coroutine.running() then
572511
error('RPC delegates can only be invoked from a thread.')
573512
end
574513

@@ -628,7 +567,7 @@ local funcref_mt = {
628567
local rvs = msgpack.unpack(rv)
629568

630569
-- handle async retvals from refs
631-
if rvs and type(rvs[1]) == 'table' and rawget(rvs[1], '__cfx_async_retval') and curThread then
570+
if rvs and type(rvs[1]) == 'table' and rawget(rvs[1], '__cfx_async_retval') and coroutine.running() then
632571
local p = promise.new()
633572

634573
rvs[1].__cfx_async_retval(function(r, e)

0 commit comments

Comments
 (0)