Skip to content

Commit

Permalink
[FLINK-1319] [core] Add static code analysis for user code
Browse files Browse the repository at this point in the history
This closes #729.
  • Loading branch information
twalthr authored and uce committed Jun 8, 2015
1 parent d433ba9 commit c854d52
Show file tree
Hide file tree
Showing 33 changed files with 4,916 additions and 83 deletions.
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common;

/**
* Specifies to which extent user-defined functions are analyzed in order
* to give the Flink optimizer an insight of UDF internals and inform
* the user about common implementation mistakes.
*
* The analyzer gives hints about:
* - ForwardedFields semantic properties
* - Warnings if static fields are modified by a Function
* - Warnings if a FilterFunction modifies its input objects
* - Warnings if a Function returns null
* - Warnings if a tuple access uses a wrong index
* - Information about the number of object creations (for manual optimization)
*/
public enum CodeAnalysisMode {

/**
* Code analysis does not take place.
*/
DISABLE,

/**
* Hints for improvement of the program are printed to the log.
*/
HINT,

/**
* The program will be automatically optimized with knowledge from code
* analysis.
*/
OPTIMIZE;

}
Expand Up @@ -44,6 +44,10 @@
* handling <i>generic types</i> and <i>POJOs</i>. This is usually only needed
* when the functions return not only the types declared in their signature, but
* also subclasses of those types.</li>
* <li>The {@link CodeAnalysisMode} of the program: Enable hinting/optimizing or disable
* the "static code analyzer". The static code analyzer pre-interprets user-defined functions in order to
* get implementation insights for program improvements that can be printed to the log or
* automatically applied.</li>
* </ul>
*/
public class ExecutionConfig implements Serializable {
Expand Down Expand Up @@ -78,6 +82,8 @@ public class ExecutionConfig implements Serializable {

private boolean forceAvro = false;

private CodeAnalysisMode codeAnalysisMode = CodeAnalysisMode.DISABLE;

/** If set to true, progress updates are printed to System.out during execution */
private boolean printProgressDuringExecution = true;

Expand Down Expand Up @@ -316,6 +322,26 @@ public ExecutionConfig disableObjectReuse() {
public boolean isObjectReuseEnabled() {
return objectReuse;
}

/**
* Sets the {@link CodeAnalysisMode} of the program. Specifies to which extent user-defined
* functions are analyzed in order to give the Flink optimizer an insight of UDF internals
* and inform the user about common implementation mistakes. The static code analyzer pre-interprets
* user-defined functions in order to get implementation insights for program improvements
* that can be printed to the log, automatically applied, or disabled.
*
* @param codeAnalysisMode see {@link CodeAnalysisMode}
*/
public void setCodeAnalysisMode(CodeAnalysisMode codeAnalysisMode) {
this.codeAnalysisMode = codeAnalysisMode;
}

/**
* Returns the {@link CodeAnalysisMode} of the program.
*/
public CodeAnalysisMode getCodeAnalysisMode() {
return codeAnalysisMode;
}

/**
* Enables the printing of progress update messages to {@code System.out}
Expand Down
4 changes: 2 additions & 2 deletions flink-java/pom.xml
Expand Up @@ -60,10 +60,10 @@ under the License.

<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<artifactId>asm-all</artifactId>
<version>${asm.version}</version>
</dependency>

<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
Expand Down
Expand Up @@ -991,6 +991,9 @@ public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitabl
LOG.debug("Registered Kryo default Serializers: {}", Joiner.on(',').join(config.getDefaultKryoSerializers()));
LOG.debug("Registered Kryo default Serializers Classes {}", Joiner.on(',').join(config.getDefaultKryoSerializerClasses()));
LOG.debug("Registered POJO types: {}", Joiner.on(',').join(config.getRegisteredPojoTypes()));

// print information about static code analysis
LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
}

return plan;
Expand Down
4 changes: 4 additions & 0 deletions flink-java/src/main/java/org/apache/flink/api/java/Utils.java
Expand Up @@ -29,8 +29,10 @@
import java.lang.reflect.Modifier;
import java.util.List;
import org.apache.flink.api.common.functions.RichFlatMapFunction;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis;


public class Utils {
Expand Down Expand Up @@ -70,6 +72,7 @@ public static void getContainedGenericTypes(CompositeType typeInfo, List<Generic
}
}

@SkipCodeAnalysis
public static class CountHelper<T> extends RichFlatMapFunction<T, Long> {

private static final long serialVersionUID = 1L;
Expand All @@ -93,6 +96,7 @@ public void close() throws Exception {
}
}

@SkipCodeAnalysis
public static class CollectHelper<T> extends RichFlatMapFunction<T, T> {

private static final long serialVersionUID = 1L;
Expand Down
Expand Up @@ -369,7 +369,22 @@ public class FunctionAnnotation {
public @interface ReadFieldsSecond {
String[] value();
}


/**
* The SkipCodeAnalysis annotation declares that a function will not be analyzed by Flink's
* code analysis capabilities independent of the configured {@link org.apache.flink.api.common.CodeAnalysisMode}.
*
* If this annotation is not present the static code analyzer pre-interprets user-defined
* functions in order to get implementation insights for program improvements that can be
* printed to the log as hints, automatically applied, or disabled (see
* {@link org.apache.flink.api.common.ExecutionConfig}).
*
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface SkipCodeAnalysis {
}

/**
* Private constructor to prevent instantiation. This class is intended only as a container.
*/
Expand Down

0 comments on commit c854d52

Please sign in to comment.