Skip to content

HeRedBo/order-consumer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

order-consumer

Go-Search 订单数据同步消费者 — 基于 Kafka 监听订单变更,近实时同步至 Elasticsearch 8

Go Kafka Elasticsearch Redis License


order-consumerGo-Search 项目的订单数据同步微服务,作为 shop-main 的 Kafka 消费者。 当订单数据发生变更时,shop-main 将变更消息推送至 Kafka,order-consumer 消费 shop-order Topic 后通过 BulkProcessor 近实时写入 Elasticsearch 8,保证订单搜索数据的时效性。


✨ 服务亮点

📨 Consumer Group 消费

基于 Sarama Consumer Group 实现,支持多分区并行消费,Handler 返回值控制位移提交,消费可靠

⚡ BulkProcessor 批量写入

ES 写入基于 BulkProcessor 异步批量提交,兼顾吞吐与延迟,避免逐条写入的性能瓶颈

🔍 拼音 + Ngram 搜索

订单号支持 Edge Ngram 前缀匹配,商品名称支持 Ngram 模糊搜索与拼音搜索,覆盖多种检索场景

🛑 优雅关闭

ShutdownHook 信号监听,按序关闭 Kafka Consumer → ES → Redis,确保资源安全释放


📡 数据同步流程

graph LR
    subgraph Producer[shop-main]
        P1[订单下单]
        P2[订单更新]
        P3[订单删除]
    end

    subgraph MQ[Kafka]
        T1[shop-order Topic]
    end

    subgraph Consumer[order-consumer]
        H[OrderMsgHandler]
        D{"operation?"}
    end

    subgraph ES[Elasticsearch 8]
        IDX[order 索引<br/>拼音+Ngram搜索]
    end

    P1 -->|"OrderMsg{operation:create}"| T1
    P2 -->|"OrderMsg{operation:update}"| T1
    P3 -->|"OrderMsg{operation:delete}"| T1
    T1 --> H
    H --> D
    D -->|"create/update"| IDX
    D -->|"delete"| IDX
Loading
操作类型 说明 ES 操作
create 订单创建 BulkCreate 写入文档
update 订单更新 BulkCreate 覆盖文档
delete 订单删除 Delete 删除文档

🗂️ 订单索引结构

字段类型说明
order_id / order_id_suffixtext (edge_ngram)订单号全文 / 后4位,支持前缀搜索
text (edge_ngram)快速检索订单号尾号
namestext (ngram + pinyin)商品名称,支持拼音/Ngram 模糊搜索
product_idskeyword关联商品 ID 列表
uidkeyword用户 ID
order_statuskeyword订单状态
refund_statuskeyword退款状态
shipping_typekeyword配送方式
pay_typekeyword支付类型
pay_time / create_time / update_timedate支付/创建/更新时间

索引特性:

  • Edge Ngram 分词 — 订单号支持前缀匹配搜索
  • Ngram + 拼音分词 — 商品名称支持模糊搜索与拼音搜索
  • 索引默认按 update_time 降序排列
  • 慢查询日志 — 查询 > 500ms / 索引 > 1s 记录警告

🧩 项目结构

order-consumer/
├── conf/                       # 配置管理
│   ├── config.yml              #   主配置文件(Kafka / ES / Redis)
│   ├── config.go               #   Viper 配置解析
│   └── constant.go             #   常量定义
├── consumer/                   # 消费者逻辑
│   ├── order_consumer.go       #   Kafka Consumer Group + 消息处理
│   └── order_index.go          #   订单索引模型定义
├── global/                     # 全局变量
│   ├── global.go               #   全局配置与变量
│   ├── constant.go             #   常量(操作类型、索引名)
│   └── logger.go               #   日志初始化
├── sql/                        # ES 索引 Mapping
│   └── shop-order.json         #   订单索引 Mapping 定义
├── doc/img/                    # 架构图
├── go.mod
└── main.go                     # 入口文件(优雅关闭)

🛠️ 技术栈

分类技术用途
语言开发语言
消息队列Consumer Group 消费 shop-order Topic
搜索引擎BulkProcessor 批量写入订单索引
缓存缓存中间件
日志结构化日志,文件轮转
配置YAML 配置解析
基础组件Go-Search pkg 组件库按需引入

🚀 快速开始

环境要求

依赖版本说明
>= 1.23开发语言
2.x+消息队列,必须
8.x搜索引擎,必须
>= 4.0缓存,必须

部署步骤

① 获取项目

git clone https://github.com/HeRedBo/order-consumer.git
cd order-consumer && go mod tidy

② 修改配置 — 编辑 conf/config.yml

kafka:
  hosts: ["127.0.0.1:9092"]
  order-topic: shop-order       # 消费 Topic
elasticsearch:
  host: "https://127.0.0.1:9200"
redis:
  host: "127.0.0.1:6379"

③ 创建 ES 索引

# 使用 sql/shop-order.json 中的 Mapping 定义创建订单索引

④ 启动服务

go run main.go

服务启动后自动从 Kafka 消费消息并同步至 ES,当订单下单/更新/删除时实时触发。


🔗 关联项目

项目 说明
go-search 项目总入口
pkg 核心基础设施库
shop-main 核心业务微服务(消息生产者)
shop-search-api 搜索 API 微服务
product-consumer 商品数据同步消费者

Apache-2.0 License

About

order-consumer

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages