Skip to content

Commit

Permalink
refactor(grpc plugins) extract common grpc code
Browse files Browse the repository at this point in the history
common functions for:

- loading a .proto file
  - from common subdirectories
  - installs "well-known types" transcoding (only Timestamp for now)
  - applies a function to analize each defined RPC method.
- (un)framing protobuf messages in a data stream.
  • Loading branch information
javierguerragiraldez committed Oct 12, 2021
1 parent d4d670f commit 0be37e0
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 159 deletions.
1 change: 1 addition & 0 deletions kong-2.6.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ build = {
["kong.status"] = "kong/status/init.lua",

["kong.tools.dns"] = "kong/tools/dns.lua",
["kong.tools.grpc"] = "kong/tools/grpc.lua",
["kong.tools.utils"] = "kong/tools/utils.lua",
["kong.tools.timestamp"] = "kong/tools/timestamp.lua",
["kong.tools.stream_api"] = "kong/tools/stream_api.lua",
Expand Down
143 changes: 32 additions & 111 deletions kong/plugins/grpc-gateway/deco.lua
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
-- Copyright (c) Kong Inc. 2020

package.loaded.lua_pack = nil -- BUG: why?
require "lua_pack"
local cjson = require "cjson"
local protoc = require "protoc"
local pb = require "pb"
local pl_path = require "pl.path"
local date = require "date"
local grpc_tools = require "kong.tools.grpc"
local grpc_frame = grpc_tools.frame
local grpc_unframe = grpc_tools.unframe

local setmetatable = setmetatable

local bpack = string.pack -- luacheck: ignore string
local bunpack = string.unpack -- luacheck: ignore string

local ngx = ngx
local re_gsub = ngx.re.gsub
local re_match = ngx.re.match
Expand Down Expand Up @@ -80,46 +75,6 @@ local function parse_options_path(path)
return path_regex, match_groups
end

local function safe_set_type_hook(type, dec, enc)
if not pcall(pb.hook, type) then
ngx.log(ngx.NOTICE, "no type '" .. type .. "' defined")
return
end

if not pb.hook(type) then
pb.hook(type, dec)
end

if not pb.encode_hook(type) then
pb.encode_hook(type, enc)
end
end

local function set_hooks()
pb.option("enable_hooks")
local epoch = date.epoch()

safe_set_type_hook(
".google.protobuf.Timestamp",
function (t)
if type(t) ~= "table" then
error(string.format("expected table, got (%s)%q", type(t), tostring(t)))
end

return date(t.seconds):fmt("${iso}")
end,
function (t)
if type(t) ~= "string" then
error (string.format("expected time string, got (%s)%q", type(t), tostring(t)))
end

local ds = date(t) - epoch
return {
seconds = ds:spanseconds(),
nanos = ds:getticks() * 1000,
}
end)
end

-- parse, compile and load .proto file
-- returns a table mapping valid request URLs to input/output types
Expand All @@ -130,52 +85,37 @@ local function get_proto_info(fname)
return info
end

local dir, name = pl_path.splitpath(pl_path.abspath(fname))
local p = protoc.new()
p:addpath("/usr/include")
p:addpath("/usr/local/opt/protobuf/include/")
p:addpath("/usr/local/kong/lib/")
p:addpath("kong")

p.include_imports = true
p:addpath(dir)
p:loadfile(name)
set_hooks()
local parsed = p:parsefile(name)

info = {}

for _, srvc in ipairs(parsed.service) do
for _, mthd in ipairs(srvc.method) do
local options_bindings = {
safe_access(mthd, "options", "options", "google.api.http"),
safe_access(mthd, "options", "options", "google.api.http", "additional_bindings")
}
for _, options in ipairs(options_bindings) do
for http_method, http_path in pairs(options) do
http_method = http_method:lower()
if valid_method[http_method] then
local preg, grp, err = parse_options_path(http_path)
if err then
ngx.log(ngx.ERR, "error ", err, "parsing options path ", http_path)
else
if not info[http_method] then
info[http_method] = {}
end
table.insert(info[http_method], {
regex = preg,
varnames = grp,
rewrite_path = ("/%s.%s/%s"):format(parsed.package, srvc.name, mthd.name),
input_type = mthd.input_type,
output_type = mthd.output_type,
body_variable = options.body,
})
grpc_tools.each_method(fname, function(parsed, srvc, mthd)
local options_bindings = {
safe_access(mthd, "options", "options", "google.api.http"),
safe_access(mthd, "options", "options", "google.api.http", "additional_bindings")
}
for _, options in ipairs(options_bindings) do
for http_method, http_path in pairs(options) do
http_method = http_method:lower()
if valid_method[http_method] then
local preg, grp, err = parse_options_path(http_path)
if err then
ngx.log(ngx.ERR, "error ", err, "parsing options path ", http_path)
else
if not info[http_method] then
info[http_method] = {}
end
table.insert(info[http_method], {
regex = preg,
varnames = grp,
rewrite_path = ("/%s.%s/%s"):format(parsed.package, srvc.name, mthd.name),
input_type = mthd.input_type,
output_type = mthd.output_type,
body_variable = options.body,
})
end
end
end
end
end
end)

_proto_info[fname] = info
return info
Expand Down Expand Up @@ -228,25 +168,6 @@ function deco.new(method, path, protofile)
}, deco)
end


local function frame(ftype, msg)
return bpack("C>I", ftype, #msg) .. msg
end

local function unframe(body)
if not body or #body <= 5 then
return nil, body
end

local pos, ftype, sz = bunpack(body, "C>I") -- luacheck: ignore ftype
local frame_end = pos + sz - 1
if frame_end > #body then
return nil, body
end

return body:sub(pos, frame_end), body:sub(frame_end + 1)
end

--[[
// Set value `v` at `path` in table `t`
// Path contains value address in dot-syntax. For example:
Expand Down Expand Up @@ -313,8 +234,8 @@ function deco:upstream(body)
if not err then
for k, v in pairs(args) do
--[[
// According to [spec](https://github.com/googleapis/googleapis/blob/master/google/api/http.proto#L113)
// non-repeated message fields are supported.
// According to [spec](https://github.com/googleapis/googleapis/blob/master/google/api/http.proto#L113)
// non-repeated message fields are supported.
//
// For example: `GET /v1/messages/123456?revision=2&sub.subfield=foo`
// translates into `payload = { sub = { subfield = "foo" }}`
Expand All @@ -323,7 +244,7 @@ function deco:upstream(body)
end
end
end
body = frame(0x0, pb.encode(self.endpoint.input_type, payload))
body = grpc_frame(0x0, pb.encode(self.endpoint.input_type, payload))

return body
end
Expand All @@ -333,14 +254,14 @@ function deco:downstream(chunk)
local body = (self.downstream_body or "") .. chunk

local out, n = {}, 1
local msg, body = unframe(body)
local msg, body = grpc_unframe(body)

while msg do
msg = encode_json(pb.decode(self.endpoint.output_type, msg))

out[n] = msg
n = n + 1
msg, body = unframe(body)
msg, body = grpc_unframe(body)
end

self.downstream_body = body
Expand Down
63 changes: 15 additions & 48 deletions kong/plugins/grpc-web/deco.lua
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
-- Copyright (c) Kong Inc. 2020

require"lua_pack"
local cjson = require "cjson"
local protoc = require "protoc"
local pb = require "pb"
local pl_path = require "pl.path"
local grpc_tools = require "kong.tools.grpc"
local grpc_frame = grpc_tools.frame
local grpc_unframe = grpc_tools.unframe

local setmetatable = setmetatable

local bpack=string.pack -- luacheck: ignore string
local bunpack=string.unpack -- luacheck: ignore string

local ngx = ngx
local decode_base64 = ngx.decode_base64
local encode_base64 = ngx.encode_base64
Expand Down Expand Up @@ -62,26 +59,15 @@ local function get_proto_info(fname)
return info
end

local dir, name = pl_path.splitpath(pl_path.abspath(fname))
local p = protoc.new()
p.include_imports = true
p:addpath(dir)
local parsed = p:parsefile(name)

info = {}

for _, srvc in ipairs(parsed.service) do
for _, mthd in ipairs(srvc.method) do
info[("/%s.%s/%s"):format(parsed.package, srvc.name, mthd.name)] = {
mthd.input_type,
mthd.output_type,
}
end
end
grpc_tools.each_method(fname, function(parsed, srvc, mthd)
info[("/%s.%s/%s"):format(parsed.package, srvc.name, mthd.name)] = {
mthd.input_type,
mthd.output_type,
}
end)

_proto_info[fname] = info

p:loadfile(name)
return info
end

Expand Down Expand Up @@ -130,25 +116,6 @@ function deco.new(mimetype, path, protofile)
end


local function frame(ftype, msg)
return bpack("C>I", ftype, #msg) .. msg
end

local function unframe(body)
if not body or #body <= 5 then
return nil, body
end

local pos, ftype, sz = bunpack(body, "C>I") -- luacheck: ignore ftype
local frame_end = pos + sz - 1
if frame_end > #body then
return nil, body
end

return body:sub(pos, frame_end), body:sub(frame_end + 1)
end


function deco:upstream(body)
if self.text_encoding == "base64" then
body = decode_base64(body)
Expand All @@ -157,10 +124,10 @@ function deco:upstream(body)
if self.msg_encoding == "json" then
local msg = body
if self.framing == "grpc" then
msg = unframe(body)
msg = grpc_unframe(body)
end

body = frame(0x0, pb.encode(self.input_type, decode_json(msg)))
body = grpc_frame(0x0, pb.encode(self.input_type, decode_json(msg)))
end

return body
Expand All @@ -172,17 +139,17 @@ function deco:downstream(chunk)
local body = (self.downstream_body or "") .. chunk

local out, n = {}, 1
local msg, body = unframe(body)
local msg, body = grpc_unframe(body)

while msg do
msg = encode_json(pb.decode(self.output_type, msg))
if self.framing == "grpc" then
msg = frame(0x0, msg)
msg = grpc_frame(0x0, msg)
end

out[n] = msg
n = n + 1
msg, body = unframe(body)
msg, body = grpc_unframe(body)
end

self.downstream_body = body
Expand All @@ -198,7 +165,7 @@ end


function deco:frame(ftype, msg)
local f = frame(ftype, msg)
local f = grpc_frame(ftype, msg)

if self.text_encoding == "base64" then
f = ngx.encode_base64(f)
Expand Down

0 comments on commit 0be37e0

Please sign in to comment.