Skip to content

farseer-go/queue

Repository files navigation

queue 本地队列

包:"github.com/farseer-go/queue"

模块:queue.Module

codecov Build

概述

在我们的生产环境中,我们每秒钟、每分钟都会产生非常多的执行日志。

如果每个产生的日志都被写到ES、数据库一次,就会给IO带来更大的压力。

而这些日志数据在一定程度上是允许被延迟的。

理想的做法是积累到一定时间或一定数量,然后再批量写入。

这时使用本地队列是最合适的,因为它足够轻量,不需要搭建服务端的消息队列中间件。

1、生产消息

本着farseer-go极简、优雅风格,使用queue组件也是非常简单的:

func Push(queueName string, message any)
  • queueName:队列名称
  • message:消息内容

演示:

func main() {
    fs.Initialize[queue.Module]("queue生产消息演示")

    for i := 0; i < 100; i++ {
        queue.Push("test", i)
    }
}

2、消费

函数定义:

type queueSubscribeFunc func(subscribeName string, lstMessage collections.ListAny, remainingCount int)
func Subscribe(queueName string, subscribeName string, pullCount int, fn queueSubscribeFunc)
  • queueName:队列名称
  • subscribeName:订阅名称
  • pullCount:每次拉取的数量
  • fn:回调函数
  • lstMessage:本次拉取消息的集合
  • remainingCount:队列中剩余的数量

如果有多个不同的subscribeName订阅者,订阅同一个队列时,则他们的消费是独立的,互相不会影响彼此的进度。

演示:

func main() {
    fs.Initialize[queue.Module]("queue生产消息演示")

    // 消费test队列,每次只拉2条记录
    queue.Subscribe("test", "A", 2, consumer)

    // 消费逻辑
    func consumer(subscribeName string, lstMessage collections.ListAny, remainingCount int) {
        var lst collections.List[int]
        lstMessage.MapToList(&lst)
        flog.Info(lst.Count())
    }
}