Skip to content

BlackGlory/mq

Repository files navigation

MQ

一个Web友好的自托管ad-hoc微服务, 提供基于HTTP的单接收者持久化消息队列.

Install

从源代码运行

可以使用环境变量 MQ_HOSTMQ_PORT 决定服务器监听的地址和端口, 默认值为localhost和8080.

git clone https://github.com/BlackGlory/mq
cd mq
yarn install
yarn build
yarn bundle
yarn --silent start

Docker

docker run \
  --detach \
  --publish 8080:8080 \
  blackglory/mq

从源代码构建

git clone https://github.com/BlackGlory/mq
cd mq
yarn install
yarn docker:build

Recipes

docker-compose.yml
version: '3.8'

services:
  mq:
    image: 'blackglory/mq'
    restart: always
    environment:
      - MQ_ADMIN_PASSWORD=password
    volumes:
      - 'mq-data:/data'
    ports:
      - '8080:8080'

volumes:
  mq-data:

API

所有API中的namespace都需要满足此正则表达式: `^[a-zA-Z0-9.-_]{0,255}$`

Public

producer

draft

POST /mq/<namespace>/messages

申请往特定消息队列放入消息, namespace用于标识消息队列. 该动作创建一个drafting状态的message, 返回此message的id.

需要按以下格式发送Payload:

{
  priority: number | null // 数字越小, 优先级越高, 0是最高优先级, null是最低优先级.
}

一个处于drafting状态的消息会在设定的超时时间后被删除, 需要生产者手动调用set以避免超时.

Example
curl
curl \
  --request POST \
  --header 'Content-Type: application/json' \
  --data '{ "priority": null }' \
  "http://localhost:8080/mq/$namespace/messages"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}/messages`, {
  method: 'POST'
, headers: { 'Content-Type': 'application/json' }
, body: JSON.stringify({ priority: null })
}).then(res => res.text())
<<set>>

PUT /mq/<namespace>/messages/<id>

发送消息正文并确认入列, 当消息的状态是drafting时, 转为waiting状态. 若消息的状态是waiting, 可以再次调用此操作对消息内容进行更新.

当消息不存在时, 会返回404状态码. 当消息的状态不满足条件时(不是drafting, waiting中的任何一个), 会返回409状态码. 当消息payload与其他已有消息重复时(需要将配置中的 unique 设为 true), 会返回409状态码.

Example
curl
curl \
  --request PUT \
  --data "$msg" \
  "http://localhost:8080/mq/$namespace/messages/$id"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}/messages/${id}`, {
  method: 'PUT'
, body: msg
})

consumer

order

GET /mq/<namespace>/messages

从特定消息队列取出消息, 优先级最高且最早的消息会先被取出. 返回message id. 如果没有可用的消息, 则会阻塞直到有可用的消息返回. 如果消息队列遭到clear, 则会以404状态码中断阻塞.

该操作会使消息从waiting状态转为ordered状态. 一个处于ordered状态的消息会在设定的超时时间后以waiting状态重新入列, 优先级不会改变. 需要消费者手动调用get以避免超时.

Example
curl
curl "http://localhost:8080/mq/$namespace/messages"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}/messages`).then(res => res.text())
<<get>>

GET /mq/<namespace>/messages/<id>

获取特定队列的指定消息, 仅当消息的状态为ordered时, 消息会从ordered状态转为active状态. 一个处于active状态的消息会在设定的超时时间后以waiting状态重新入列, 优先级不会改变. 需要消费者手动调用complete, failabandon以避免超时.

此操作是幂等的, 可以多次调用.

响应头 X-MQ-Priority 显示该消息的优先级(number | null), 可用于在手动重新入列时使用. 响应头 X-MQ-State 显示该消息在此请求响应后的状态(string), 可能为drafting, waiting, active, failed中的一个.

当消息不存在时, 会返回404状态码. 当消息的状态不满足条件时(不是waiting, ordered, active, failed中的任何一个), 会返回409状态码.

Example
curl
curl "http://localhost:8080/mq/$namespace/messages/$id"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}/messages/${id}`).then(res => res.text())
<<abandon>>

DELETE /mq/<namespace>/messages/<id>

废弃此消息, 消息将不会纳入统计结果.

此操作是幂等的, 若遇到网络错误, 可以再次调用.

当消息不存在时, 会返回404.

Example
curl
curl \
  --request DELETE \
  "http://localhost:8080/mq/$namespace/messages/$id"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}/messages/${id}`, {
  method: 'DELETE'
})
<<complete>>

PATCH /mq/<namespace>/messages/<id>/complete

当消息处于active状态时表示消息被消耗完毕, 消息将被统计为已完成的消息(completed).

此操作是幂等的, 若遇到网络错误, 可以再次调用.

当消息不存在时, 将返回404状态码. 当消息的状态不满足条件时(不是active), 将返回409状态码.

Example
curl
curl \
  --request PATCH \
  "http://localhost:8080/mq/$namespace/messages/$id/complete"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}/messages/${id}/complete`, {
  method: 'PATCH'
})
<<fail>>

PATCH /mq/<namespace>/messages/<id>/fail

当消息处于active状态时表示消息在消耗过程中失败, 消息将从active状态转为failed状态, 同时被统计为已失败的消息(failed).

此操作是幂等的, 若遇到网络错误, 可以再次调用.

当消息不存在时, 将返回404状态码. 当消息的状态不满足条件时(不是active), 将返回409状态码.

Example
curl
curl \
  --request PATCH \
  "http://localhost:8080/mq/$namespace/messages/$id/fail"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}/messages/${id}/fail`, {
  method: 'PATCH'
})
renew

PATCH /mq/<namespace>/messages/<id>/renew

当消息处于failed状态时表示将消息重新入列, 消息将从failed状态转为waiting状态.

此操作是幂等的, 若遇到网络错误, 可以再次调用.

当消息不存在时, 将返回404状态码. 当消息的状态不满足条件时(不是failed), 将返回409状态码.

Example
curl
curl \
  --request PATCH \
  "http://localhost:8080/mq/$namespace/messages/$id/renew"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}/messages/${id}/renew`, {
  method: 'PATCH'
})
get all failed message ids

GET /mq/<namespace>/failed-messages

列出所有处于failed状态的消息id. 返回 Array<string>.

此操作支持返回ndjson格式的响应, 需要 Accept: application/x-ndjson 请求头.

Example
curl
curl "http://localhost:8080/mq/$namespace/failed-messages"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}/failed-messages`).then(res => res.json())
abandon all failed messages

DELETE /mq/<namespace>/failed-messages

将所有failed状态的消息废弃.

Example
curl
curl \
  --request DELETE \
  "http://localhost:8080/mq/$namespace/failed-messages"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}/failed-messages`, {
  method: 'POST'
})
renew all failed messages

PATCH /mq/<namespace>/failed-messages/renew

将所有failed状态的消息以FIFO的顺序转为waiting状态.

Example
curl
curl \
  --request PATCH \
  "http://localhost:8080/mq/$namespace/failed-messages/renew"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}/failed-messages/renew`, {
  method: 'POST'
})

<<clear>>

DELETE /mq/<namespace>

清空队列内的所有消息和统计信息.

Example
curl
curl \
  --request DELETE \
  "http://localhost:8080/mq/$namespace"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}`, {
  method: 'DELETE'
})

stats

GET /mq/<namespace>/stats

获取统计信息, 查看当前队列中对应状态的消息个数. 由于completed状态的消息不会保留, 因此completed的值是由计数器统计的. 除非调用clear, 否则completed的数值将只会增长不会减少.

{
  namespace: string
  drafting: number
  waiting: number
  ordered: number
  active: number
  completed: number
  failed: number
}
Example
curl
curl "http://localhost:8080/mq/$namespace/stats"
JavaScript
await fetch(`http://localhost:8080/mq/${namespace}/stats`).then(res => res.json())

get all namespaces

GET /mq

获取所有有统计信息的队列namespace.

返回 Array<string>.

此操作支持返回ndjson格式的响应, 需要 Accept: application/x-ndjson 请求头.

Example
curl
curl 'http://localhost:8080/mq'
JavaScript
await fetch('http://localhost:8080/mq').then(res => res.json())
Example
curl
curl 'http://localhost:8080/metrics'
JavaScript
await fetch('http://localhost:8080/metrics').then(res => res.json())

Private

队列配置

{
  unique: boolean | null // 队列是否对消息自动去重, null表示继承全局设置
  draftingTimeout: number | null // 允许处于draft状态的秒数, null表示继承全局设置
  orderedTimeout: number | null // 允许处于ordered状态的秒数, null表示继承全局设置
  activeTimeout: number | null // 允许处于active状态的秒数, null表示继承全局设置
  concurrency: number | null // 允许派发的并发任务数, null表示继承全局设置
}

可用以下环境变量作为全局设置:

  • MQ_UNIQUE, 默认为 false.
  • MQ_DRAFTING_TIMEOUT, 默认为60秒.
  • MQ_ORDERED_TIMEOUT, 默认为60秒.
  • MQ_ACTIVE_TIMEOUT, 默认为300秒.
  • MQ_CONCURRENCY, 默认为无限.
获取所有具有配置的namespace

GET /admin/mq-with-config

返回由JSON表示的字符串数组 string[].

Example
curl
curl \
  --header "Authorization: Bearer $ADMIN_PASSWORD" \
  "http://localhost:8080/admin/mq-with-config"
fetch
await fetch('http://localhost:8080/admin/mq-with-config', {
  headers: {
    'Authorization': `Bearer ${adminPassword}`
  }
}).then(res => res.json())
获取特定队列的配置

GET /admin/mq/<namespace>/config

返回JSON:

{
  unique: boolean | null
  draftingTimeout: number | null
  orderedTimeout: number | null
  activeTimeout: number | null
  concurrency: number | null
}
Example
curl
curl \
  --header "Authorization: Bearer $ADMIN_PASSWORD" \
  "http://localhost:8080/admin/mq/$namespace/config"
fetch
await fetch(`http://localhost:8080/admin/mq/${namespace}/config`, {
  headers: {
    'Authorization': `Bearer ${adminPassword}`
  }
}).then(res => res.json())
设置配置

PUT /admin/mq/<namespace>/config/unique PUT /admin/mq/<namespace>/config/drafting-timeout PUT /admin/mq/<namespace>/config/ordered-timeout PUT /admin/mq/<namespace>/config/active-timeout PUT /admin/mq/<namespace>/config/concurrency

Payload必须为对应的null以外的JSON值.

Example
curl
curl \
  --request PUT \
  --header "Authorization: Bearer $ADMIN_PASSWORD" \
  --header "Content-Type: application/json" \
  --data "$UNIQUE" \
  "http://localhost:8080/admin/mq/$namespace/config/unique"
fetch
await fetch(`http://localhost:8080/admin/mq/${namespace}/config/unique`, {
  method: 'PUT'
, headers: {
    'Authorization': `Bearer ${adminPassword}`
  , 'Content-Type': 'application/json'
  }
, body: JSON.stringify(unique)
})
移除配置

DELETE /admin/mq/<namespace>/config/unique DELETE /admin/mq/<namespace>/config/drafting-timeout DELETE /admin/mq/<namespace>/config/ordered-timeout DELETE /admin/mq/<namespace>/config/active-timeout DELETE /admin/mq/<namespace>/config/concurrency

Example
curl
curl \
  --request DELETE \
  --header "Authorization: Bearer $ADMIN_PASSWORD" \
  "http://localhost:8080/admin/mq/$namespace/config/unique"
fetch
await fetch(`http://localhost:8080/admin/mq/${namespace}/config/unique`, {
  method: 'DELETE'
})

限制Payload大小

设置环境变量 MQ_PAYLOAD_LIMIT 可限制服务接受的单个请求的Payload字节数, 默认值为1048576(1MB).

设置环境变量 MQ_SET_PAYLOAD_LIMIT 可限制set接受的单个请求的Payload字节数, 默认值继承自 MQ_PAYLOAD_LIMIT.