Skip to content

实时功能使用文档

yuyang edited this page Mar 21, 2023 · 8 revisions

使用步骤

  1. 确保正确拉起了kafka集群,并创建了topic
    1. 快速拉起一个kafka集群用于实时测试: https://kafka.apache.org/documentation.html#quickstart
  2. cluster配置中打开实时功能
    1. table中的cluster配置需要增加:"realtime": true,参考:example/cases/realtime/config/online_config/table/0/clusters/in0_cluster.json
    2. 实时流程读取原始ha3格式的文档,并经过processor处理后再build,因此在线的table中需要增加processor的相关配置,参考:example/cases/realtime/config/online_config/table/0
  3. 增加实时信息配置文件
    1. bs build时,通过--realtimeInfo将实时参数传入,在runtimedata对应generation中会生成一份realtime_info.json(已经存在realtime_info.json不会重新生成覆盖)
/home/havenask/havenask/ha3_install/usr/local/bin/bs startjob -c /home/havenask/havenask/example/cases/realtime/config/offline_config -n in0 -j local -m full -d /home/havenask/havenask/example/data/test.data -w /home/havenask/havenask/example/cases/realtime/workdir -i ./runtimedata -p 1 --documentformat=ha3 --realtimeInfo="{
    \"realtime_mode\":\"realtime_service_rawdoc_rt_build_mode\",
    \"data_table\" : \"in0\",
    \"type\":\"plugin\",
    \"module_name\":\"kafka\",
    \"module_path\":\"libbs_raw_doc_reader_plugin.so\",
    \"topic_name\":\"quickstart-events\",
    \"bootstrap.servers\":\"localhost:9092\",
    \"src_signature\":\"16113477091138423397\",
    \"realtime_process_rawdoc\": \"true\"
}
"

参数含义

  • realtime_mode: 实时模式
  • data_table: 表名
  • type": 数据源类型
  • module_name": 插件模块名称
  • module_path: 插件加载路径
  • topic_name: 实时数据对应的kafka topic名字
  • bootstrap.servers: kafka的bootstrap配置
  • src_signature: 数据源签名
  • realtime_process_rawdoc: 是否需要process原始文档
  • kafka_start_timestamp: (可选参数)起始消费时间戳(us),默认从topic的起始开始消费

快速开始

使用example/cases/realtime快速使用实时功能

  1. 参考使用步骤1,拉起kafka集群,创建topic
  2. 修改example/cases/realtime/config/realtime_info.json中的topic_namebootstrap.servers
  3. build索引,拉起引擎
cd ~/havenask/example
python build_demo_data.py /ha3_install/ realtime
python start_demo_searcher.py /ha3_install/ realtime
  1. 推送ha3格式的实时文档到topic后查询,查看新增文档。写入实时topic客户端见下文
# 推送文档后查询
python ~/havenask/example/common/curl_http.py 45800 "select * from in0"

其他

实时写入客户端

  • java版本的HASH函数实现代码: aios/plugin_platform/bs_reader_plugins/util/HashAlgorithm.java 实时消费消息时,会按照消息key的hash值做range过滤,所以producer的partitioner要使用相同的hash函数和划分partition的策略
hashId = getHashId(key);
partId = getPartitionId(hashId, kafkaPartitionCnt);

Note

  • 实时索引只会build比全量和增量更新的消息
  • producer的partitioner策略需要保持一致,否则消息会被过滤
  • 开启实时功能后,正确初始化后日志中会打印kafka reader init succuss,没有这条日志请检查参数配置
Clone this wiki locally