Skip to content

Conversation

@godfreyhe
Copy link
Contributor

No description provided.

@fhueske
Copy link
Contributor

fhueske commented Nov 29, 2016

Thanks for fixing this bug @godfreyhe!
+1 to merge

@fhueske
Copy link
Contributor

fhueske commented Nov 29, 2016

Merging

@asfgit asfgit closed this in 0bb6847 Nov 29, 2016
liuyuzhong pushed a commit to liuyuzhong/flink that referenced this pull request Dec 2, 2016
liuyuzhong pushed a commit to liuyuzhong/flink that referenced this pull request Dec 5, 2016
static-max pushed a commit to static-max/flink that referenced this pull request Dec 13, 2016
joseprupi pushed a commit to joseprupi/flink that referenced this pull request Feb 12, 2017
hequn8128 pushed a commit to hequn8128/flink that referenced this pull request Jun 22, 2017
@lccbiluox2
Copy link

我有一个程序

public class ControlEvent extends Row {
    private String id;
    private String name;

    /**
     * Create a new Row instance.
     *
     * @param arity The number of fields in the Row
     */
    public ControlEvent(int arity) {
        super(arity);
    }
}

大概是这样的

public class FilterLabel extends CoProcessFunction<Row, ControlEvent,Row> {

    @Override
    public void processElement1(Row value, Context ctx, Collector<Row> out) throws Exception {
        Row newRow = useNewRule(value);
        System.out.println("测试两个out是否为一个对象1:"+out.toString());
        out.collect(newRow);
    }

    /**
     * 使用新的规则
     * @return
     */
    private Row useNewRule(Row value) {
        // 查询数据库
        // 加载新的规则,生产新的row
        return value;
    }

    @Override
    public void processElement2(ControlEvent value, Context ctx, Collector<Row> out) throws Exception {
        reloadRule();
        System.out.println("测试两个out是否为一个对象2:"+out.toString());
        // 控制流加载配置后往后面传递
        out.collect(value);
    }

    /**
     * 重新加载规则,并且预编译
     * @throws InterruptedException
     */
    private void reloadRule() throws InterruptedException {
//        System.out.println("进行重新加载配置操作-开始");
//        Thread.sleep(1000L);
//        System.out.println("进行重新加载配置操作-结束");
    }
}

image

这里原本设计A B两条线是断开的,不能往下传递,因此需要广播,广播到每个节点,这次想修改就是想打通AB两条线,到下面的算子并且由processElement2方法去操作,但是实际测试


processElement1(Row value, Context ctx, Collector<Row> out) 
processElement2(ControlEvent value, Context ctx, Collector<Row> out) 

这两个的out是一个对象,因此会发送到一个流中,而且CoProcessFunction定义就是两个输入,一个输出,输出的类型必须一致


public abstract class CoProcessFunction<IN1, IN2, OUT> 

所以上述如果直接输出是到了一个流中了如图CD。想有个算法是这样的


public abstract class xxxProcessFunction<IN1, IN2, OUT1, OUT2> 

但是发现没有

@lccbiluox2
Copy link

然后还报错这个

java.lang.RuntimeException: Row arity of from does not match serializers.
  at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:86)
  at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
  at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
  at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
  at com.dbapp.baas.engine.common.etl.filterlabel.FilterLabel.processElement2(FilterLabel.java:241)
  at com.dbapp.baas.engine.common.etl.filterlabel.FilterLabel.processElement2(FilterLabel.java:29)
  at org.apache.flink.streaming.api.operators.co.CoProcessOperator.processElement2(CoProcessOperator.java:77)
  at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:286)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:745)

找不到原因呀

参考:https://blog.csdn.net/qq_21383435/article/details/105647242

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants