Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

translateHashJoin should allow Joiner subclasses #46

Closed
themodernlife opened this issue Jan 6, 2016 · 4 comments
Closed

translateHashJoin should allow Joiner subclasses #46

themodernlife opened this issue Jan 6, 2016 · 4 comments

Comments

@themodernlife
Copy link

When running Scalding jobs on Flink (more info here: http://themodernlife.github.io/scala/hadoop/hdfs/sclading/flink/streaming/realtime/2015/12/20/running-scalding-jobs-on-apache-flink/) I noticed that the only join type that works was

pipe.joinWithSmaller(...)

Unfortunately a lot of jobs are going to use something like

pipe.joinWithTiny(...)

Currently this fails on Flink with

Caused by: cascading.flow.planner.PlannerException: [net.themodernlife.Word...] could not build flow from assembly: [HashJoin does only support InnerJoin and LeftJoin.]
    at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:749)
    at cascading.flow.planner.FlowPlanner.buildFlow(FlowPlanner.java:209)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:456)
    at com.dataartisans.flink.cascading.FlinkConnector.connect(FlinkConnector.java:86)
    at com.twitter.scalding.ExecutionContext$class.buildFlow(ExecutionContext.scala:74)
    at com.twitter.scalding.ExecutionContext$$anon$1.buildFlow(ExecutionContext.scala:165)
    at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:225)
    at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:225)
    at scala.util.Success.flatMap(Try.scala:231)
    at com.twitter.scalding.Job.buildFlow(Job.scala:225)
    at com.twitter.scalding.Job.run(Job.scala:295)
    at com.twitter.scalding.Tool.start$1(Tool.scala:126)
    at com.twitter.scalding.Tool.run(Tool.scala:142)
    at com.twitter.scalding.Tool.run(Tool.scala:70)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at com.twitter.scalding.Tool$.main(Tool.scala:150)
    ... 12 more
Caused by: cascading.flow.FlowException: HashJoin does only support InnerJoin and LeftJoin.
    at com.dataartisans.flink.cascading.planner.FlinkFlowStep.translateHashJoin(FlinkFlowStep.java:882)
    at com.dataartisans.flink.cascading.planner.FlinkFlowStep.buildFlinkProgram(FlinkFlowStep.java:309)
    at com.dataartisans.flink.cascading.planner.FlinkFlowStep.createFlowStepJob(FlinkFlowStep.java:126)
    at com.dataartisans.flink.cascading.planner.FlinkFlowStep.createFlowStepJob(FlinkFlowStep.java:96)
    at cascading.flow.planner.BaseFlowStep.getCreateFlowStepJob(BaseFlowStep.java:913)
    at cascading.flow.BaseFlow.initializeNewJobsMap(BaseFlow.java:1318)
    at cascading.flow.BaseFlow.initialize(BaseFlow.java:244)
    at cascading.flow.planner.FlowPlanner.buildFlow(FlowPlanner.java:203)
    ... 26 more

The reason is because in the Scalding code for joinWithTiny you have this:

def joinWithTiny(fs: (Fields, Fields), that: Pipe) = {
    val intersection = asSet(fs._1).intersect(asSet(fs._2))
    if (intersection.isEmpty) {
      new HashJoin(assignName(pipe), fs._1, assignName(that), fs._2, WrappedJoiner(new InnerJoin))
    } else {
      val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection)
      (new HashJoin(assignName(pipe), fs._1, assignName(renamedThat), newJoinFields, WrappedJoiner(new InnerJoin)))
        .discard(temp)
    }
  }

Notice how the Joiner has been wrapped with WrappedJoiner? In Cascading Flink you have this:

    private DataSet<Tuple> translateHashJoin(List<DataSet<Tuple>> inputs, FlowNode node) {

        HashJoin hashJoin = (HashJoin) getCommonSuccessor(node.getSourceElements(), node);
        Joiner joiner = hashJoin.getJoiner();

        int numJoinInputs = hashJoin.isSelfJoin() ? hashJoin.getNumSelfJoins() + 1 : inputs.size();

        Fields[] inputFields = new Fields[numJoinInputs];
        Fields[] keyFields = new Fields[numJoinInputs];
        String[][] flinkKeys = new String[numJoinInputs][];

        List<DataSet<Tuple>> joinInputs = computeSpliceInputsFieldsKeys(hashJoin, node, inputs, inputFields, keyFields, flinkKeys);

        if(keyFields[0].isNone()) {
            // Cartesian product
            return translateInnerCrossProduct(node, joinInputs);
        }
        else if(joiner.getClass().equals(InnerJoin.class)) {
            // inner join with keys
            return translateInnerHashJoin(node, joinInputs, inputFields, keyFields, flinkKeys);
        }
        else if (joiner.getClass().equals(LeftJoin.class)) {
            return translateLeftHashJoin(node, joinInputs, inputFields, keyFields, flinkKeys);
        }
        else {
            throw new FlowException("HashJoin does only support InnerJoin and LeftJoin.");
        }
    }

I thought it would be simple as replacing joiner.getClass().equals(InnerJoin.class) with joiner.getClass().isAssignableFrom(InnerJoin.class) but that's not the case since WrappedJoiner extends Joiner and not InnerJoin (or LeftJoin).

Not sure if there is a simple solution, but wanted to get the bug in in case anyone has any ideas!

@fhueske
Copy link
Contributor

fhueske commented Jan 7, 2016

Hi @themodernlife, Thanks for reporting this issue. I read you blog post and I'm happy that you are trying to run Scalding on Flink. This is very exciting and a good validation for Cascading-Flink.

Cascading on MR or Tez uses its own implementation for joins. This implementation is generic and capable of handling all supported join types (inner, left outer, right outer, full outer, and custom joins) by handing the result of a full outer join to the Joiner class. For example, an InnerJoin discards all results with an empty left or right side.

Cascading-Flink leverages Flink's internal join implementations which are only available for inner and left joins. Therefore, Cascading-Flink needs to check for the Joiner type to make sure that the Joiner is called with all tuple-pairs that will be in the join result. Hence the restriction to InnerJoin and LeftJoin.

I think this issue can be solved by adding a Scalding dependency to Cascading-Flink and check for the type of the wrapped joiner if a WrappedJoiner is found.

Please report any other problem you find when using Cascading-Flink, with Scalding or without.
Thanks again, Fabian

@themodernlife
Copy link
Author

@fhueske can I suggest a different approach? Depending on Scalding (and associated dependencies) might be a no go for Cascading Java users.

What if instead you just added a configuration parameter, something like flink.joiner.strict (true by default) which throws an exception if the join types don't match, but just ignores any type checking when false?

I think this way Scalding users could say "trust me, I know what I'm doing".

@fhueske
Copy link
Contributor

fhueske commented Jan 19, 2016

Hi @themodernlife, sorry for the late reply.
The Flink connector needs to know the type for the wrapped joiner in order to make the right choice when translating the join into Flink's native joins. Hence, I'm not sure if a configuration switch would do the job here.

I just pushed a fix to extract the wrapped joiner via Java reflection without adding a Scalding dependency. I'm not experienced with Scalding, but tested it with a Java mock-up. Please let me know, if that solves your problem. Also, let me know if you face any other issue.

Thanks, Fabian

@fhueske
Copy link
Contributor

fhueske commented Feb 24, 2016

I ran a Scalding job with a WrappedJoiner and the reflection approach is working.
I'll close this issue. Please reopen, if the issue is not resolved.

@fhueske fhueske closed this as completed Feb 24, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants