Skip to content

Mabo-IoT/beam-demo

Repository files navigation

overview

python-beam-flink

issues

  1. 目前demo模式,本机需要安装flink 1.8,最新的1.9 beam不支持;
  2. --FlinkRunner这个参数有问题,还是需要
    • ./gradlew :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081开启jobservice
    • 然后输入PortableRunner参数--
  3. 只支持flink1.8,不支持1.9以上
  4. python -m apache_beam.examples.wordcount --input /home/heathkang/Projects/Proj-mv/Beam/beam-demo/test.txt --output /home/heathkang/Projects/Proj-mv/Beam/beam-demo/handle.txt --runner PortableRunner --environment_type=LOOPBACK --job_endpoint=localhost:8099 is failed
  5. python -m apache_beam.examples.wordcount --input /home/heathkang/Projects/Proj-mv/Beam/beam-demo/test.txt --output /home/heathkang/Projects/Proj-mv/Beam/beam-demo/handle.txt --runner FlinkRunner --environment_type=LOOPBACK --experiments=beam_fn_api --flink_master_url=localhost:8081 is good must have --experiments=beam_fn_api
  6. 由于python的kafka是调用java的库(大多数runner是用java写的)),所以需要Expansion Service, flink的jobserver 默认为localhost:8097;也就是说要通过2的命令启动java expansion service
  7. window配合trigger使用
  8. 也许需要镜像apachebeam/java_sdk,确实需要该镜像来进行一些service的提供

nextsteps

  1. 测试一下kafka的数据,看能否行的通
  2. 下一步看能否设置metric来查看当前的data状态

Program-model

PiplinePCollection => Transform => PCollection => Transform...

pipline

整体数据流,从input 到 output

PCollection

内部的data,可以是unbound或bound

unbound

对于unbound数据,一般用window来进行数据的切割

window

  • fixed window: 固定时间段的window,比如5分钟一个
  • sliding window:有些重合的window,比如每10s一个,每个5分钟
  • session window:

trigger

一些触发条件

Transform

  • ParDo: 类似于map
    • 可通过beam.DoFn来自定义ParDo操作
  • GroupByKey: same key , different value ; collect
    • "cat,1, cat, 2, dog:3" => "cat, [1,2], dog, [3]"
  • CoGroupByKey:same key, different attribute;collect
    • emails_list = [
          ('amy', 'amy@example.com'),
          ('carl', 'carl@example.com'),
          ('julia', 'julia@example.com'),
          ('carl', 'carl@email.com'),
      ]
      phones_list = [
          ('amy', '111-222-3333'),
          ('james', '222-333-4444'),
          ('amy', '333-444-5555'),
          ('carl', '444-555-6666'),
      ]
      
  [
   ('amy', {
       'emails': ['amy@example.com'],
       'phones': ['111-222-3333', '333-444-5555']}),
   ('carl', {
       'emails': ['carl@email.com', 'carl@example.com'],
       'phones': ['444-555-6666']}),
   ('james', {
       'emails': [],
       'phones': ['222-333-4444']}),
   ('julia', {
       'emails': ['julia@example.com'],
       'phones': []}),
 ]
  • Combine: 类似reduce,将data集合起来,比如add 累加
  • Flatten: 将类型相同的几个data集合,merge到一个集合里
  • Partition: 类似split,将data集合按func分割为不同集合

metric

beam提供metric来查看当前计算的state

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages