Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink---分流 #87

Open
AronChung opened this issue Jun 24, 2020 · 0 comments
Open

Flink---分流 #87

AronChung opened this issue Jun 24, 2020 · 0 comments
Labels
Flink Flink学习

Comments

@AronChung
Copy link
Owner

分流

Filter分流

Filter分流的弊端:
为了得到我们需要的流数据,需要多次遍历原始流

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //获取数据源
    List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
    data.add(new Tuple3<>(0,1,0));
    data.add(new Tuple3<>(0,1,1));
    data.add(new Tuple3<>(0,2,2));
    data.add(new Tuple3<>(0,1,3));
    data.add(new Tuple3<>(1,2,5));
    data.add(new Tuple3<>(1,2,9));
    data.add(new Tuple3<>(1,2,11));
    data.add(new Tuple3<>(1,2,13));


    DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);

    SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> zeroStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 0);
    SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> oneStream = items.filter((FilterFunction<Tuple3<Integer, Integer, Integer>>) value -> value.f0 == 1);

    zeroStream.print();
    oneStream.printToErr();


    //打印结果
    String jobName = "user defined streaming source";
    env.execute(jobName);
}

Split分流

需要在Split算子中定义OutputSelector,然后重写其中的select方法,将不同类型的数据进行标记,最后对返回的SplitStream使用select方法将对应的数据选择出来

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //获取数据源
    List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
    data.add(new Tuple3<>(0,1,0));
    data.add(new Tuple3<>(0,1,1));
    data.add(new Tuple3<>(0,2,2));
    data.add(new Tuple3<>(0,1,3));
    data.add(new Tuple3<>(1,2,5));
    data.add(new Tuple3<>(1,2,9));
    data.add(new Tuple3<>(1,2,11));
    data.add(new Tuple3<>(1,2,13));


    DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);


    SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
        @Override
        public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
            List<String> tags = new ArrayList<>();
            if (value.f0 == 0) {
                tags.add("zeroStream");
            } else if (value.f0 == 1) {
                tags.add("oneStream");
            }
            return tags;
        }
    });

    splitStream.select("zeroStream").print();
    splitStream.select("oneStream").printToErr();

    //打印结果
    String jobName = "user defined streaming source";
    env.execute(jobName);
}

与filter不同的是,使用split算子切分过的流,是不能进行二次切分的,例如把上述zeroStream和oneStream流使用split切分,控制台会抛出以下异常:

Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.

原因:该方法已经废弃并且建议使用最新的SideOutPut进行分流操作

SideOutPut分流

SideOutPut分流是Flink为我们提供的最新的也是最为推荐的分流方法,需要按照以下步骤进行:

  1. 定义OutputTag
  2. 调用特定函数进行数据拆分
  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • KeyedCoPrecessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

例如:

ProcessFunction
public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //获取数据源
    List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
    data.add(new Tuple3<>(0,1,0));
    data.add(new Tuple3<>(0,1,1));
    data.add(new Tuple3<>(0,2,2));
    data.add(new Tuple3<>(0,1,3));
    data.add(new Tuple3<>(1,2,5));
    data.add(new Tuple3<>(1,2,9));
    data.add(new Tuple3<>(1,2,11));
    data.add(new Tuple3<>(1,2,13));


    DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);

    OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
    OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};


    SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
        @Override
        public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {

            if (value.f0 == 0) {
                ctx.output(zeroStream, value);
            } else if (value.f0 == 1) {
                ctx.output(oneStream, value);
            }
        }
    });

    DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
    DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);

    zeroSideOutput.print();
    oneSideOutput.printToErr();


    //打印结果
    String jobName = "user defined streaming source";
    env.execute(jobName);
}

需要注意的是:SideOutPut方式拆分流是可以多次进行拆分的,不会有异常

@AronChung AronChung added the Flink Flink学习 label Jun 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Flink Flink学习
Projects
None yet
Development

No branches or pull requests

1 participant