Skip to content

Commit

Permalink
[FLINK-3700] [core] Remove Guava dependency from flink-core
Browse files Browse the repository at this point in the history
Almost all Guava functionality used within flink-core has corresponding
utils in Flink's codebase, or the JDK library.

This replaces the Guava code as follows
  - Preconditions calls by Flink's Preconditions class
  - Collection utils by simple Java Collection calls
  - Iterator's by Flink's Union Iterator
  - Files by simple util methods arount java.nio.Files
  - InetAddresses IPv6 encoding code has been adapted into Flink's NetUtils (with attribution comments)

Some util classes where moved from flink-runtime to flink-core.

This closes #1854
  • Loading branch information
StephanEwen authored and fhueske committed Apr 15, 2016
1 parent 1b93b32 commit 760a0d9
Show file tree
Hide file tree
Showing 82 changed files with 534 additions and 327 deletions.
Expand Up @@ -21,7 +21,7 @@
import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Fields;


import org.apache.flink.runtime.util.MathUtils; import org.apache.flink.util.MathUtils;
import org.apache.flink.storm.api.FlinkLocalCluster; import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology; import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.tests.operators.FiniteRandomSpout; import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
Expand Down
6 changes: 0 additions & 6 deletions flink-core/pom.xml
Expand Up @@ -70,12 +70,6 @@ under the License.
<artifactId>${shading-artifact.name}</artifactId> <artifactId>${shading-artifact.name}</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>


<!-- test depedencies --> <!-- test depedencies -->


Expand Down
Expand Up @@ -18,9 +18,6 @@


package org.apache.flink.api.common; package org.apache.flink.api.common;


import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
import java.util.Collection; import java.util.Collection;
Expand All @@ -43,6 +40,8 @@
import org.apache.flink.util.Visitable; import org.apache.flink.util.Visitable;
import org.apache.flink.util.Visitor; import org.apache.flink.util.Visitor;


import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkArgument;
/** /**
* This class represents Flink programs, in the form of dataflow plans. * This class represents Flink programs, in the form of dataflow plans.
* *
Expand Down
Expand Up @@ -20,8 +20,8 @@


import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;


import static com.google.common.base.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkArgument;


/** /**
* Encapsulates task-specific information: name, index of subtask, parallelism and attempt number. * Encapsulates task-specific information: name, index of subtask, parallelism and attempt number.
Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.Future; import java.util.concurrent.Future;


import com.google.common.base.Preconditions;
import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.TaskInfo;
Expand All @@ -45,6 +44,8 @@
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;


import static org.apache.flink.util.Preconditions.checkNotNull;

/** /**
* A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators. * A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators.
*/ */
Expand All @@ -66,11 +67,11 @@ public AbstractRuntimeUDFContext(TaskInfo taskInfo,
ExecutionConfig executionConfig, ExecutionConfig executionConfig,
Map<String, Accumulator<?,?>> accumulators, Map<String, Accumulator<?,?>> accumulators,
Map<String, Future<Path>> cpTasks) { Map<String, Future<Path>> cpTasks) {
this.taskInfo = Preconditions.checkNotNull(taskInfo); this.taskInfo = checkNotNull(taskInfo);
this.userCodeClassLoader = userCodeClassLoader; this.userCodeClassLoader = userCodeClassLoader;
this.executionConfig = executionConfig; this.executionConfig = executionConfig;
this.distributedCache = new DistributedCache(Preconditions.checkNotNull(cpTasks)); this.distributedCache = new DistributedCache(checkNotNull(cpTasks));
this.accumulators = Preconditions.checkNotNull(accumulators); this.accumulators = checkNotNull(accumulators);
} }


@Override @Override
Expand Down
Expand Up @@ -18,12 +18,7 @@


package org.apache.flink.api.common.io; package org.apache.flink.api.common.io;


import java.io.IOException;
import java.util.ArrayList;

import org.apache.flink.annotation.Public; import org.apache.flink.annotation.Public;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
Expand All @@ -33,7 +28,12 @@
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;


import com.google.common.base.Charsets; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;


/** /**
* Base implementation for input formats that split the input at a delimiter into records. * Base implementation for input formats that split the input at a delimiter into records.
Expand All @@ -53,6 +53,9 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
* The log. * The log.
*/ */
private static final Logger LOG = LoggerFactory.getLogger(DelimitedInputFormat.class); private static final Logger LOG = LoggerFactory.getLogger(DelimitedInputFormat.class);

/** The default charset to convert strings to bytes */
private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");


/** /**
* The default read buffer size = 1MB. * The default read buffer size = 1MB.
Expand Down Expand Up @@ -185,7 +188,7 @@ public void setDelimiter(char delimiter) {
} }


public void setDelimiter(String delimiter) { public void setDelimiter(String delimiter) {
this.delimiter = delimiter.getBytes(Charsets.UTF_8); this.delimiter = delimiter.getBytes(UTF_8_CHARSET);
} }


public int getLineLengthLimit() { public int getLineLengthLimit() {
Expand Down
Expand Up @@ -18,22 +18,10 @@


package org.apache.flink.api.common.io; package org.apache.flink.api.common.io;


import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Public; import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory; import org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory;
import org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory; import org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory;
import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory; import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
Expand All @@ -45,6 +33,20 @@
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** /**
* The base class for {@link RichInputFormat}s that read from files. For specific input types the * The base class for {@link RichInputFormat}s that read from files. For specific input types the
* {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented. * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented.
Expand Down Expand Up @@ -143,7 +145,7 @@ protected static InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Str
* @return the extension of the file name or {@code null} if there is no extension. * @return the extension of the file name or {@code null} if there is no extension.
*/ */
protected static String extractFileExtension(String fileName) { protected static String extractFileExtension(String fileName) {
Preconditions.checkNotNull(fileName); checkNotNull(fileName);
int lastPeriodIndex = fileName.lastIndexOf('.'); int lastPeriodIndex = fileName.lastIndexOf('.');
if (lastPeriodIndex < 0){ if (lastPeriodIndex < 0){
return null; return null;
Expand Down
Expand Up @@ -16,20 +16,16 @@
* limitations under the License. * limitations under the License.
*/ */



package org.apache.flink.api.common.io; package org.apache.flink.api.common.io;


import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;

import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser; import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser; import org.apache.flink.types.parser.StringParser;
import org.apache.flink.types.parser.StringValueParser; import org.apache.flink.types.parser.StringValueParser;
import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.InstantiationUtil;

import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -41,13 +37,20 @@
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;


import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

@Internal @Internal
public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> { public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {


private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);

private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;



private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);

/** The default charset to convert strings to bytes */
private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");

private static final Class<?>[] EMPTY_TYPES = new Class<?>[0]; private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];


private static final boolean[] EMPTY_INCLUDED = new boolean[0]; private static final boolean[] EMPTY_INCLUDED = new boolean[0];
Expand Down Expand Up @@ -127,7 +130,7 @@ public void setCommentPrefix(char commentPrefix) {
} }


public void setCommentPrefix(String commentPrefix) { public void setCommentPrefix(String commentPrefix) {
setCommentPrefix(commentPrefix, Charsets.UTF_8); setCommentPrefix(commentPrefix, UTF_8_CHARSET);
} }


public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
Expand Down Expand Up @@ -171,7 +174,7 @@ public void setFieldDelimiter(char delimiter) {
} }


public void setFieldDelimiter(String delimiter) { public void setFieldDelimiter(String delimiter) {
this.fieldDelim = delimiter.getBytes(Charsets.UTF_8); this.fieldDelim = delimiter.getBytes(UTF_8_CHARSET);
} }


public boolean isLenient() { public boolean isLenient() {
Expand Down Expand Up @@ -247,9 +250,9 @@ protected void setFieldTypesGeneric(Class<?> ... fieldTypes) {
} }


protected void setFieldsGeneric(int[] sourceFieldIndices, Class<?>[] fieldTypes) { protected void setFieldsGeneric(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
Preconditions.checkNotNull(sourceFieldIndices); checkNotNull(sourceFieldIndices);
Preconditions.checkNotNull(fieldTypes); checkNotNull(fieldTypes);
Preconditions.checkArgument(sourceFieldIndices.length == fieldTypes.length, checkArgument(sourceFieldIndices.length == fieldTypes.length,
"Number of field indices and field types must match."); "Number of field indices and field types must match.");


for (int i : sourceFieldIndices) { for (int i : sourceFieldIndices) {
Expand All @@ -258,7 +261,7 @@ protected void setFieldsGeneric(int[] sourceFieldIndices, Class<?>[] fieldTypes)
} }
} }


int largestFieldIndex = Ints.max(sourceFieldIndices); int largestFieldIndex = max(sourceFieldIndices);
this.fieldIncluded = new boolean[largestFieldIndex + 1]; this.fieldIncluded = new boolean[largestFieldIndex + 1];
ArrayList<Class<?>> types = new ArrayList<Class<?>>(); ArrayList<Class<?>> types = new ArrayList<Class<?>>();


Expand All @@ -280,8 +283,8 @@ protected void setFieldsGeneric(int[] sourceFieldIndices, Class<?>[] fieldTypes)
} }


protected void setFieldsGeneric(boolean[] includedMask, Class<?>[] fieldTypes) { protected void setFieldsGeneric(boolean[] includedMask, Class<?>[] fieldTypes) {
Preconditions.checkNotNull(includedMask); checkNotNull(includedMask);
Preconditions.checkNotNull(fieldTypes); checkNotNull(fieldTypes);


ArrayList<Class<?>> types = new ArrayList<Class<?>>(); ArrayList<Class<?>> types = new ArrayList<Class<?>>();


Expand Down Expand Up @@ -530,4 +533,14 @@ protected static void checkForMonotonousOrder(int[] positions, Class<?>[] types)
lastPos = positions[i]; lastPos = positions[i];
} }
} }

private static int max(int[] ints) {
checkArgument(ints.length > 0);

int max = ints[0];
for (int i = 1 ; i < ints.length; i++) {
max = Math.max(max, ints[i]);
}
return max;
}
} }
Expand Up @@ -16,7 +16,6 @@
* limitations under the License. * limitations under the License.
*/ */



package org.apache.flink.api.common.operators; package org.apache.flink.api.common.operators;


import java.util.List; import java.util.List;
Expand All @@ -39,7 +38,7 @@
import org.apache.flink.types.Nothing; import org.apache.flink.types.Nothing;
import org.apache.flink.util.Visitor; import org.apache.flink.util.Visitor;


import com.google.common.base.Preconditions; import static org.apache.flink.util.Preconditions.checkNotNull;


/** /**
* Operator for nodes that act as data sinks, storing the data they receive. * Operator for nodes that act as data sinks, storing the data they receive.
Expand All @@ -66,7 +65,7 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
public GenericDataSinkBase(OutputFormat<IN> f, UnaryOperatorInformation<IN, Nothing> operatorInfo, String name) { public GenericDataSinkBase(OutputFormat<IN> f, UnaryOperatorInformation<IN, Nothing> operatorInfo, String name) {
super(operatorInfo, name); super(operatorInfo, name);


Preconditions.checkNotNull(f, "The OutputFormat may not be null."); checkNotNull(f, "The OutputFormat may not be null.");
this.formatWrapper = new UserCodeObjectWrapper<OutputFormat<IN>>(f); this.formatWrapper = new UserCodeObjectWrapper<OutputFormat<IN>>(f);
} }


Expand All @@ -79,8 +78,7 @@ public GenericDataSinkBase(OutputFormat<IN> f, UnaryOperatorInformation<IN, Noth
*/ */
public GenericDataSinkBase(UserCodeWrapper<? extends OutputFormat<IN>> f, UnaryOperatorInformation<IN, Nothing> operatorInfo, String name) { public GenericDataSinkBase(UserCodeWrapper<? extends OutputFormat<IN>> f, UnaryOperatorInformation<IN, Nothing> operatorInfo, String name) {
super(operatorInfo, name); super(operatorInfo, name);
Preconditions.checkNotNull(f, "The OutputFormat class may not be null."); this.formatWrapper = checkNotNull(f, "The OutputFormat class may not be null.");
this.formatWrapper = f;
} }


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand All @@ -100,8 +98,7 @@ public Operator<IN> getInput() {
* @param input The operator to use as the input. * @param input The operator to use as the input.
*/ */
public void setInput(Operator<IN> input) { public void setInput(Operator<IN> input) {
Preconditions.checkNotNull(input, "The input may not be null."); this.input = checkNotNull(input, "The input may not be null.");
this.input = input;
} }


/** /**
Expand All @@ -112,7 +109,7 @@ public void setInput(Operator<IN> input) {
*/ */
@Deprecated @Deprecated
public void setInputs(Operator<IN>... inputs) { public void setInputs(Operator<IN>... inputs) {
Preconditions.checkNotNull(inputs, "The inputs may not be null."); checkNotNull(inputs, "The inputs may not be null.");
this.input = Operator.createUnionCascade(inputs); this.input = Operator.createUnionCascade(inputs);
} }


Expand All @@ -124,7 +121,7 @@ public void setInputs(Operator<IN>... inputs) {
*/ */
@Deprecated @Deprecated
public void setInputs(List<Operator<IN>> inputs) { public void setInputs(List<Operator<IN>> inputs) {
Preconditions.checkNotNull(inputs, "The inputs may not be null."); checkNotNull(inputs, "The inputs may not be null.");
this.input = Operator.createUnionCascade(inputs); this.input = Operator.createUnionCascade(inputs);
} }


Expand All @@ -136,7 +133,7 @@ public void setInputs(List<Operator<IN>> inputs) {
*/ */
@Deprecated @Deprecated
public void addInput(Operator<IN>... inputs) { public void addInput(Operator<IN>... inputs) {
Preconditions.checkNotNull(inputs, "The input may not be null."); checkNotNull(inputs, "The input may not be null.");
this.input = Operator.createUnionCascade(this.input, inputs); this.input = Operator.createUnionCascade(this.input, inputs);
} }


Expand All @@ -149,7 +146,7 @@ public void addInput(Operator<IN>... inputs) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Deprecated @Deprecated
public void addInputs(List<? extends Operator<IN>> inputs) { public void addInputs(List<? extends Operator<IN>> inputs) {
Preconditions.checkNotNull(inputs, "The inputs may not be null."); checkNotNull(inputs, "The inputs may not be null.");
this.input = createUnionCascade(this.input, (Operator<IN>[]) inputs.toArray(new Operator[inputs.size()])); this.input = createUnionCascade(this.input, (Operator<IN>[]) inputs.toArray(new Operator[inputs.size()]));
} }


Expand Down Expand Up @@ -259,7 +256,7 @@ public int compare(IN o1, IN o2) {
format.configure(this.parameters); format.configure(this.parameters);


if(format instanceof RichOutputFormat){ if(format instanceof RichOutputFormat){
((RichOutputFormat) format).setRuntimeContext(ctx); ((RichOutputFormat<?>) format).setRuntimeContext(ctx);
} }
format.open(0, 1); format.open(0, 1);
for (IN element : inputData) { for (IN element : inputData) {
Expand Down

0 comments on commit 760a0d9

Please sign in to comment.