Skip to content

HeRedBo/product-consumer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

product-consumer

Go-Search 商品数据同步消费者 — 基于 Kafka 监听商品变更,近实时同步至 Elasticsearch 8

Go Kafka Elasticsearch Redis License


product-consumerGo-Search 项目的商品数据同步微服务,作为 shop-main 的 Kafka 消费者。 当商品数据发生变更时,shop-main 将变更消息推送至 Kafka,product-consumer 消费 shop-product Topic 后通过 BulkProcessor 近实时写入 Elasticsearch 8,保证商品搜索数据与业务状态实时一致。


✨ 服务亮点

📨 Consumer Group 消费

基于 Sarama Consumer Group 实现,支持多分区并行消费,Handler 返回值控制位移提交,消费可靠

🔄 上下架联动

根据 operation + is_show 双字段联动判断,商品更新时自动决定 ES 写入还是删除,搜索结果实时一致

🗂️ Routing 策略

使用 cate_id 作为 ES Routing 键,同分类商品路由到同一分片,显著提升分类查询性能

⚡ BulkProcessor 批量写入

ES 写入基于 BulkProcessor 异步批量提交,兼顾写入吞吐与延迟,避免逐条写入瓶颈


📡 数据同步流程

graph LR
    subgraph Producer[shop-main]
        P1[商品上架]
        P2[商品更新]
        P3[商品下架/删除]
    end

    subgraph MQ[Kafka]
        T1[shop-product Topic]
    end

    subgraph Consumer[product-consumer]
        H[MsgHandler]
        D{"operation + is_show?"}
    end

    subgraph ES[Elasticsearch 8]
        IDX[product 索引<br/>cate_id Routing]
    end

    P1 -->|"ProductMsg{operation:create, is_show:1}"| T1
    P2 -->|"ProductMsg{operation:update, is_show}"| T1
    P3 -->|"ProductMsg{operation:un_sale/delete}"| T1
    T1 --> H
    H --> D
    D -->|"create/on_sale 或 update+is_show=1"| IDX
    D -->|"un_sale/delete 或 update+is_show=0"| IDX
Loading
操作类型 is_show 说明 ES 操作
create / on_sale 1 商品创建/上架 BulkCreate 写入文档
update 1 商品更新且上架 BulkCreate 覆盖文档
update 0 商品更新且下架 DeleteRefresh 删除文档
un_sale / delete 商品下架/删除 DeleteRefresh 删除文档

🗂️ 商品索引结构

字段类型说明
idint64商品 ID
store_namestring商品名称
store_infostring商品简介
keywordstring关键字
cate_idint分类 ID(Routing 键,优化分类查询)
pricefloat64商品价格
sales / fictiint32真实销量 / 虚拟销量
is_hot / is_benefit / is_best / is_newint8热卖 / 优惠 / 精品 / 新品标签
is_postage / is_goodint8包邮 / 优品推荐标签
descriptionstring产品详情描述
create_time / update_timedate创建时间 / 更新时间

🧩 项目结构

product-consumer/
├── conf/                           # 配置管理
│   ├── config.yml                  #   主配置文件(Kafka / ES / Redis)
│   ├── config.go                   #   Viper 配置解析
│   └── constant.go                 #   常量定义
├── internal/
│   └── consumer/                   # 消费者逻辑
│       ├── product_consumer.go     #   Kafka Consumer Group + 消息处理
│       └── product_index.go        #   商品索引模型定义
├── global/                         # 全局变量
│   ├── global.go                   #   全局配置与变量
│   ├── constant.go                 #   常量(操作类型、索引名)
│   └── logger.go                   #   日志初始化
├── sql/                            # ES 索引 Mapping(商品索引 JSON)
├── doc/img/                        # 架构图
├── runtime/                        # 运行时资源
├── go.mod
└── main.go                         # 入口文件(优雅关闭)

🛠️ 技术栈

分类技术用途
语言开发语言
消息队列Consumer Group 消费 shop-product Topic
搜索引擎BulkProcessor 批量写入商品索引,cate_id Routing
缓存缓存中间件
日志结构化日志,文件轮转
配置YAML 配置解析
基础组件Go-Search pkg 组件库按需引入

🚀 快速开始

环境要求

依赖版本说明
>= 1.23开发语言
2.x+消息队列,必须
8.x搜索引擎,必须
>= 4.0缓存,必须

部署步骤

① 获取项目

git clone https://github.com/HeRedBo/product-consumer.git
cd product-consumer && go mod tidy

② 修改配置 — 编辑 conf/config.yml

kafka:
  hosts: ["127.0.0.1:9092"]
  product-topic: shop-product   # 消费 Topic
elasticsearch:
  host: "https://127.0.0.1:9200"
redis:
  host: "127.0.0.1:6379"

③ 创建 ES 索引

# 使用 sql/ 目录下的 JSON Mapping 定义创建商品索引

④ 启动服务

go run main.go

服务启动后自动从 Kafka 消费消息并同步至 ES,当商品上架/更新/下架时实时触发。


🔗 关联项目

项目 说明
go-search 项目总入口
pkg 核心基础设施库
shop-main 核心业务微服务(消息生产者)
shop-search-api 搜索 API 微服务
order-consumer 订单数据同步消费者

Apache-2.0 License

About

海量数据高并发场景,构建Go+ES8企业级搜索微服务 队列消耗服务代码

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages