Permalink
Fetching contributors…
Cannot retrieve contributors at this time
308 lines (266 sloc) 6.05 KB
local skynet = require "skynet"
local socket = require "socket"
local socketchannel = require "socketchannel"
local table = table
local string = string
local assert = assert
local redis = {}
local command = {}
local meta = {
__index = command,
-- DO NOT close channel in __gc
}
---------- redis response
local redcmd = {}
redcmd[36] = function(fd, data) -- '$'
local bytes = tonumber(data)
if bytes < 0 then
return true,nil
end
local firstline = fd:read(bytes+2)
return true,string.sub(firstline,1,-3)
end
redcmd[43] = function(fd, data) -- '+'
return true,data
end
redcmd[45] = function(fd, data) -- '-'
return false,data
end
redcmd[58] = function(fd, data) -- ':'
-- todo: return string later
return true, tonumber(data)
end
local function read_response(fd)
local result = fd:readline "\r\n"
local firstchar = string.byte(result)
local data = string.sub(result,2)
return redcmd[firstchar](fd,data)
end
redcmd[42] = function(fd, data) -- '*'
local n = tonumber(data)
if n < 0 then
return true, nil
end
local bulk = {}
local noerr = true
for i = 1,n do
local ok, v = read_response(fd)
if not ok then
noerr = false
end
bulk[i] = v
end
return noerr, bulk
end
-------------------
function command:disconnect()
self[1]:close()
setmetatable(self, nil)
end
-- msg could be any type of value
local function make_cache(f)
return setmetatable({}, {
__mode = "kv",
__index = f,
})
end
local header_cache = make_cache(function(t,k)
local s = "\r\n$" .. k .. "\r\n"
t[k] = s
return s
end)
local command_cache = make_cache(function(t,cmd)
local s = "\r\n$"..#cmd.."\r\n"..cmd:upper()
t[cmd] = s
return s
end)
local count_cache = make_cache(function(t,k)
local s = "*" .. k
t[k] = s
return s
end)
local function compose_message(cmd, msg)
local t = type(msg)
local lines = {}
if t == "table" then
lines[1] = count_cache[#msg+1]
lines[2] = command_cache[cmd]
local idx = 3
for _,v in ipairs(msg) do
v= tostring(v)
lines[idx] = header_cache[#v]
lines[idx+1] = v
idx = idx + 2
end
lines[idx] = "\r\n"
else
msg = tostring(msg)
lines[1] = "*2"
lines[2] = command_cache[cmd]
lines[3] = header_cache[#msg]
lines[4] = msg
lines[5] = "\r\n"
end
return lines
end
local function redis_login(auth, db)
if auth == nil and db == nil then
return
end
return function(so)
if auth then
so:request(compose_message("AUTH", auth), read_response)
end
if db then
so:request(compose_message("SELECT", db), read_response)
end
end
end
function redis.connect(db_conf)
local channel = socketchannel.channel {
host = db_conf.host,
port = db_conf.port or 6379,
auth = redis_login(db_conf.auth, db_conf.db),
nodelay = true,
}
-- try connect first only once
channel:connect(true)
return setmetatable( { channel }, meta )
end
setmetatable(command, { __index = function(t,k)
local cmd = string.upper(k)
local f = function (self, v, ...)
if type(v) == "table" then
return self[1]:request(compose_message(cmd, v), read_response)
else
return self[1]:request(compose_message(cmd, {v, ...}), read_response)
end
end
t[k] = f
return f
end})
local function read_boolean(so)
local ok, result = read_response(so)
return ok, result ~= 0
end
function command:exists(key)
local fd = self[1]
return fd:request(compose_message ("EXISTS", key), read_boolean)
end
function command:sismember(key, value)
local fd = self[1]
return fd:request(compose_message ("SISMEMBER", {key, value}), read_boolean)
end
local function compose_table(lines, msg)
local tinsert = table.insert
tinsert(lines, count_cache[#msg])
for _,v in ipairs(msg) do
v = tostring(v)
tinsert(lines,header_cache[#v])
tinsert(lines,v)
end
tinsert(lines, "\r\n")
return lines
end
function command:pipeline(ops,resp)
assert(ops and #ops > 0, "pipeline is null")
local fd = self[1]
local cmds = {}
for _, cmd in ipairs(ops) do
compose_table(cmds, cmd)
end
if resp then
return fd:request(cmds, function (fd)
for i=1, #ops do
local ok, out = read_response(fd)
table.insert(resp, {ok = ok, out = out})
end
return true, resp
end)
else
return fd:request(cmds, function (fd)
local ok, out
for i=1, #ops do
ok, out = read_response(fd)
end
-- return last response
return ok,out
end)
end
end
--- watch mode
local watch = {}
local watchmeta = {
__index = watch,
__gc = function(self)
self.__sock:close()
end,
}
local function watch_login(obj, auth)
return function(so)
if auth then
so:request(compose_message("AUTH", auth), read_response)
end
for k in pairs(obj.__psubscribe) do
so:request(compose_message ("PSUBSCRIBE", k))
end
for k in pairs(obj.__subscribe) do
so:request(compose_message("SUBSCRIBE", k))
end
end
end
function redis.watch(db_conf)
local obj = {
__subscribe = {},
__psubscribe = {},
}
local channel = socketchannel.channel {
host = db_conf.host,
port = db_conf.port or 6379,
auth = watch_login(obj, db_conf.auth),
nodelay = true,
}
obj.__sock = channel
-- try connect first only once
channel:connect(true)
return setmetatable( obj, watchmeta )
end
function watch:disconnect()
self.__sock:close()
setmetatable(self, nil)
end
local function watch_func( name )
local NAME = string.upper(name)
watch[name] = function(self, ...)
local so = self.__sock
for i = 1, select("#", ...) do
local v = select(i, ...)
so:request(compose_message(NAME, v))
end
end
end
watch_func "subscribe"
watch_func "psubscribe"
watch_func "unsubscribe"
watch_func "punsubscribe"
function watch:message()
local so = self.__sock
while true do
local ret = so:response(read_response)
local type , channel, data , data2 = ret[1], ret[2], ret[3], ret[4]
if type == "message" then
return data, channel
elseif type == "pmessage" then
return data2, data, channel
elseif type == "subscribe" then
self.__subscribe[channel] = true
elseif type == "psubscribe" then
self.__psubscribe[channel] = true
elseif type == "unsubscribe" then
self.__subscribe[channel] = nil
elseif type == "punsubscribe" then
self.__psubscribe[channel] = nil
end
end
end
return redis