Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions apisix/core/ctx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,16 @@ do
var_x_forwarded_proto = true,
}

-- sort in alphabetical
local apisix_var_names = {
balancer_ip = true,
balancer_port = true,
consumer_name = true,
mqtt_client_id = true,
route_id = true,
route_name = true,
service_id = true,
service_name = true,
consumer_name = true,
balancer_ip = true,
balancer_port = true,
}

local mt = {
Expand Down
14 changes: 7 additions & 7 deletions apisix/stream/plugins/mqtt-proxy.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ local schema = {
protocol_name = {type = "string"},
protocol_level = {type = "integer"},
upstream = {
description = "Deprecated. We should configure upstream outside of the plugin",
type = "object",
properties = {
ip = {type = "string"}, -- deprecated, use "host" instead
Expand Down Expand Up @@ -57,13 +58,7 @@ local _M = {


function _M.check_schema(conf)
local ok, err = core.schema.check(schema, conf)

if not ok then
return false, err
end

return true
return core.schema.check(schema, conf)
end


Expand Down Expand Up @@ -185,6 +180,11 @@ function _M.preread(conf, ctx)

core.log.info("mqtt client id: ", res.client_id)

-- when client id is missing, fallback to balance by client IP
if res.client_id ~= "" then
ctx.mqtt_client_id = res.client_id
end

if not conf.upstream then
return
end
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/apisix-variable.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ List in alphabetical order:
| graphql_name | the [operation name](https://graphql.org/learn/queries/#operation-name) of GraphQL | HeroComparison |
| graphql_operation | the operation type of GraphQL | mutation |
| graphql_root_fields | the top level fields of GraphQL | ["hero"] |
| mqtt_client_id | the client id in MQTT protocol | |
| route_id | id of `route` | |
| route_name | name of `route` | |
| service_id | id of `service` | |
Expand Down
32 changes: 32 additions & 0 deletions docs/en/latest/plugins/mqtt-proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,38 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f03

In case Docker is used in combination with MacOS `host.docker.internal` is the right parameter for `host`.

This plugin exposes a variable `mqtt_client_id`, and we can use it to load balance via the client id. For example:

```shell
curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"plugins": {
"mqtt-proxy": {
"protocol_name": "MQTT",
"protocol_level": 4
}
},
"upstream": {
"type": "chash",
"key": "mqtt_client_id",
"nodes": [
{
"host": "127.0.0.1",
"port": 1995,
"weight": 1
},
{
"host": "127.0.0.2",
"port": 1995,
"weight": 1
}
]
}
}'
```

MQTT connections with different client ID will be forwarded to different node via the consistent hash algorithm. If the client ID is missing, we will balance via client IP instead.

## Delete Plugin

```shell
Expand Down
32 changes: 32 additions & 0 deletions docs/zh/latest/plugins/mqtt-proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,38 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f03

在 Docker 与 MacOS 结合使用的情况下,`host.docker.internal` 是 `host` 的正确参数。

这个插件暴露了一个变量 `mqtt_client_id`,我们可以用它来通过客户端 ID 进行负载均衡。比如说:

```shell
curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"plugins": {
"mqtt-proxy": {
"protocol_name": "MQTT",
"protocol_level": 4
}
},
"upstream": {
"type": "chash",
"key": "mqtt_client_id",
"nodes": [
{
"host": "127.0.0.1",
"port": 1995,
"weight": 1
},
{
"host": "127.0.0.2",
"port": 1995,
"weight": 1
}
]
}
}'
```

不同客户端 ID 的 MQTT 连接将通过一致性哈希算法被转发到不同的节点。如果客户端 ID 为空,我们将通过客户端 IP 进行均衡。

#### 禁用插件

当你想去掉插件的时候,很简单,在插件的配置中把对应的 json 配置删除即可,无须重启服务,即刻生效:
Expand Down
94 changes: 94 additions & 0 deletions t/stream-plugin/mqtt-proxy.t
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,97 @@ qr/mqtt client id: \S+/
mqtt client id: clint-111
--- no_error_log
[error]



=== TEST 17: balance with mqtt_client_id
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/stream_routes/1',
ngx.HTTP_PUT,
[[{
"remote_addr": "127.0.0.1",
"server_port": 1985,
"plugins": {
"mqtt-proxy": {
"protocol_name": "MQTT",
"protocol_level": 5
}
},
"upstream": {
"type": "chash",
"key": "mqtt_client_id",
"nodes": [
{
"host": "0.0.0.0",
"port": 1995,
"weight": 1
},
{
"host": "127.0.0.1",
"port": 1995,
"weight": 1
}
]
}
}]]
)

if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 18: hit route with empty id
--- stream_request eval
"\x10\x0d\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x00"
--- stream_response
hello world
--- grep_error_log eval
qr/(mqtt client id: \w+|proxy request to \S+)/
--- grep_error_log_out
proxy request to 127.0.0.1:1995
--- no_error_log
[error]



=== TEST 19: hit route with different client id, part 1
--- stream_request eval
"\x10\x0e\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x01\x66"
--- stream_response
hello world
--- grep_error_log eval
qr/(mqtt client id: \w+|proxy request to \S+)/
--- grep_error_log_out
mqtt client id: f
proxy request to 0.0.0.0:1995
--- no_error_log
[error]



=== TEST 20: hit route with different client id, part 2
--- stream_request eval
"\x10\x0e\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x01\x67"
--- stream_response
hello world
--- grep_error_log eval
qr/(mqtt client id: \w+|proxy request to \S+)/
--- grep_error_log_out
mqtt client id: g
proxy request to 127.0.0.1:1995
--- no_error_log
[error]