Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink---CEP(Complex Event Processing)复杂事件处理 #88

Open
AronChung opened this issue Jun 24, 2020 · 0 comments
Open

Flink---CEP(Complex Event Processing)复杂事件处理 #88

AronChung opened this issue Jun 24, 2020 · 0 comments
Labels
Flink Flink学习
Projects

Comments

@AronChung
Copy link
Owner

Flink的CEP解决了什么样的问题:
比如:

  • 我们需要在大量的订单交易中发现那些虚假交易
  • 在网站的访问日志中寻找那些使用脚本或者工具爆破登录的用户
  • 在快递运输中发现那些滞留很久没有签收的包裹

程序结构

主要分为两个步骤:

  1. 定义模式
  2. 匹配结果

官网案例:

DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
    ).next("middle").subtype(SubEvent.class).where(
        new SimpleCondition<SubEvent>() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        }
    ).followedBy("end").where(
         new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end");
            }
         }
    );
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.process(
    new PatternProcessFunction<Event, Alert>() {
        @Override
        public void processMatch(
                Map<String, List<Event>> pattern,
                Context ctx,
                Collector<Alert> out) throws Exception {
            out.collect(createAlertFrom(pattern));
        }
    });
  • 第一步,定义一个模式Pattern,在这里定义了一个这样的模式,即在所有接收到的事件中匹配volume大于10.0的事件,继续匹配一个name等于end的事件
  • 第二步,匹配模式并且发出报警,根据定义的pattern在输入流上进行匹配,一旦命中我们的模式,就发出一个报警

实战案例

模拟电商网站用户搜索的数据来作为数据的输入源,然后查找其中重复搜索某一个商品的人,发送一条告警消息

public static void main(String[] args) throws Exception{
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    DataStreamSource source = env.fromElements(
            //浏览记录
            Tuple3.of("Marry", "外套", 1L),
            Tuple3.of("Marry", "帽子",1L),
            Tuple3.of("Marry", "帽子",2L),
            Tuple3.of("Marry", "帽子",3L),
            Tuple3.of("Ming", "衣服",1L),
            Tuple3.of("Marry", "鞋子",1L),
            Tuple3.of("Marry", "鞋子",2L),
            Tuple3.of("LiLei", "帽子",1L),
            Tuple3.of("LiLei", "帽子",2L),
            Tuple3.of("LiLei", "帽子",3L)
    );
    //定义Pattern,寻找连续搜索帽子的用户
    Pattern<Tuple3<String, String, Long>, Tuple3<String, String, Long>> pattern = Pattern
            .<Tuple3<String, String, Long>>begin("start")
            .where(new SimpleCondition<Tuple3<String, String, Long>>() {
                @Override
                public boolean filter(Tuple3<String, String, Long> value) throws Exception {
                    return value.f1.equals("帽子");
                }
            }) //.timesOrMore(3);
            .next("middle")
            .where(new SimpleCondition<Tuple3<String, String, Long>>() {
                @Override
                public boolean filter(Tuple3<String, String, Long> value) throws Exception {
                    return value.f1.equals("帽子");
                }
            });

    KeyedStream keyedStream = source.keyBy(0);
    PatternStream patternStream = CEP.pattern(keyedStream, pattern);
    SingleOutputStreamOperator matchStream = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Long>, String>() {
        @Override
        public String select(Map<String, List<Tuple3<String, String, Long>>> pattern) throws Exception {
            List<Tuple3<String, String, Long>> middle = pattern.get("middle");
            return middle.get(0).f0 + ":" + middle.get(0).f2 + ":" + "连续搜索两次帽子!";
        }
    });
    matchStream.printToErr();
    env.execute("execute cep");
}
@AronChung AronChung added the Flink Flink学习 label Jun 24, 2020
@AronChung AronChung added this to Flink in My Blog Jan 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Flink Flink学习
Projects
My Blog
  
Flink
Development

No branches or pull requests

1 participant