Skip to content

Commit

Permalink
[FLINK-1245] [Java API] Introduce TypeHints for Java API operators
Browse files Browse the repository at this point in the history
Also contains fixes by sewen@apache.org
 - Make MissingTypeInfo optional in TypeExtractor (by default still throws exception)
 - Simplified deferred evaluation of type dependend code by making evaluations lazy
 - Add call location function names to MissingTypeInfo error messages.
 - Improvements on other error messages.

This closes #270
  • Loading branch information
twalthr authored and StephanEwen committed Jan 8, 2015
1 parent 06503c8 commit d8dbaee
Show file tree
Hide file tree
Showing 35 changed files with 1,198 additions and 390 deletions.
Expand Up @@ -27,8 +27,7 @@
import org.apache.flink.api.java.ExecutionEnvironment;

/**
* Wordcount for placing at least something into the jar file.
*
* WordCount for placing at least something into the jar file.
*/
public class WordCount {

Expand Down Expand Up @@ -98,7 +97,7 @@ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// *************************************************************************

private static boolean fileOutput = false;
private static boolean verbose = false;

private static String textPath;
private static String outputPath;

Expand All @@ -111,7 +110,7 @@ private static boolean parseParameters(String[] args) {
textPath = args[0];
outputPath = args[1];
} else if(args.length == 4 && (args[0].startsWith("-v") || args[0].startsWith("--verbose"))) { // cli line: program {optArg} {optVal} {textPath} {outputPath}
verbose = Boolean.valueOf(args[1]);
Boolean.valueOf(args[1]); // parse verbosity flag
textPath = args[2];
outputPath = args[3];
} else {
Expand Down
Expand Up @@ -56,8 +56,7 @@ public class DualInputSemanticProperties extends SemanticProperties {


public DualInputSemanticProperties() {
super();
this.init();
init();
}

/**
Expand Down Expand Up @@ -251,15 +250,24 @@ public FieldSet getReadFields2() {
*/
@Override
public void clearProperties() {
this.init();
super.clearProperties();
init();
}

@Override
public boolean isEmpty() {
return super.isEmpty() &&
(forwardedFields1 == null || forwardedFields1.isEmpty()) &&
(forwardedFields2 == null || forwardedFields2.isEmpty()) &&
(readFields1 == null || readFields1.size() == 0) &&
(readFields2 == null || readFields2.size() == 0);
}


private void init() {
this.forwardedFields1 = new HashMap<Integer,FieldSet>();
this.forwardedFields2 = new HashMap<Integer,FieldSet>();
this.readFields1 = null;
this.readFields2 = null;
}

}
Expand Up @@ -30,11 +30,10 @@ public abstract class SemanticProperties implements Serializable {

private static final long serialVersionUID = 1L;

/**
* Set of fields that are written in the destination record(s).
*/
/** Set of fields that are written in the destination record(s).*/
private FieldSet writtenFields;


/**
* Adds, to the existing information, field(s) that are written in
* the destination record(s).
Expand Down Expand Up @@ -71,10 +70,10 @@ public FieldSet getWrittenFields() {
* Clears the object.
*/
public void clearProperties() {
this.init();
this.writtenFields = null;
}

private void init() {
this.writtenFields = null;
public boolean isEmpty() {
return this.writtenFields == null || this.writtenFields.size() == 0;
}
}
Expand Up @@ -27,23 +27,18 @@
* Container for the semantic properties associated to a single input operator.
*/
public class SingleInputSemanticProperties extends SemanticProperties {

private static final long serialVersionUID = 1L;

/**
* Mapping from fields in the source record(s) to fields in the destination
* record(s).
*/
/**Mapping from fields in the source record(s) to fields in the destination record(s). */
private Map<Integer,FieldSet> forwardedFields;

/**
* Set of fields that are read in the source record(s).
*/
/** Set of fields that are read in the source record(s).*/
private FieldSet readFields;


public SingleInputSemanticProperties() {
super();
this.init();
init();
}

/**
Expand Down Expand Up @@ -140,8 +135,15 @@ public FieldSet getReadFields() {
*/
@Override
public void clearProperties() {
this.init();
super.clearProperties();
init();
}

@Override
public boolean isEmpty() {
return super.isEmpty() &&
(forwardedFields == null || forwardedFields.isEmpty()) &&
(readFields == null || readFields.size() == 0);
}

private void init() {
Expand Down Expand Up @@ -206,5 +208,10 @@ public void addWrittenFields(FieldSet writtenFields) {
public void setWrittenFields(FieldSet writtenFields) {
throw new UnsupportedOperationException();
}

@Override
public boolean isEmpty() {
return false;
}
}
}

0 comments on commit d8dbaee

Please sign in to comment.