Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/dev/table/sourceSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ An `OrcTableSource` is created as shown below:
Configuration config = new Configuration();

OrcTableSource orcTableSource = OrcTableSource.builder()
// path to ORC file(s)
// path to ORC file(s). NOTE: By default, directories are recursively scanned.
.path("file:///path/to/data")
// schema of ORC files
.forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
Expand All @@ -566,7 +566,7 @@ OrcTableSource orcTableSource = OrcTableSource.builder()
val config = new Configuration()

val orcTableSource = OrcTableSource.builder()
// path to ORC file(s)
// path to ORC file(s). NOTE: By default, directories are recursively scanned.
.path("file:///path/to/data")
// schema of ORC files
.forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,25 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
}
}

// --------------------------------------------------------------------------------------------
// Getter methods for tests
// --------------------------------------------------------------------------------------------

@VisibleForTesting
Configuration getConfiguration() {
return conf;
}

@VisibleForTesting
int getBatchSize() {
return batchSize;
}

@VisibleForTesting
String getSchema() {
return schema.toString();
}

// --------------------------------------------------------------------------------------------
// Classes to define predicates
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class OrcTableSource
private final Configuration orcConfig;
// the number of rows to read in a batch
private final int batchSize;
// flag whether a path is recursively enumerated
private final boolean recursiveEnumeration;

// type information of the data returned by the InputFormat
private final RowTypeInfo typeInfo;
Expand All @@ -107,13 +109,15 @@ public class OrcTableSource
* @param orcSchema The schema of the ORC files as TypeDescription.
* @param orcConfig The configuration to read the ORC files.
* @param batchSize The number of Rows to read in a batch, default is 1000.
* @param recursiveEnumeration Flag whether the path should be recursively enumerated or not.
*/
private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) {
this(path, orcSchema, orcConfig, batchSize, null, null);
private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize, boolean recursiveEnumeration) {
this(path, orcSchema, orcConfig, batchSize, recursiveEnumeration, null, null);
}

private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig,
int batchSize, int[] selectedFields, Predicate[] predicates) {
int batchSize, boolean recursiveEnumeration,
int[] selectedFields, Predicate[] predicates) {

Preconditions.checkNotNull(path, "Path must not be null.");
Preconditions.checkNotNull(orcSchema, "OrcSchema must not be null.");
Expand All @@ -123,6 +127,7 @@ private OrcTableSource(String path, TypeDescription orcSchema, Configuration orc
this.orcSchema = orcSchema;
this.orcConfig = orcConfig;
this.batchSize = batchSize;
this.recursiveEnumeration = recursiveEnumeration;
this.selectedFields = selectedFields;
this.predicates = predicates;

Expand All @@ -146,6 +151,7 @@ private OrcTableSource(String path, TypeDescription orcSchema, Configuration orc
@Override
public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
OrcRowInputFormat orcIF = buildOrcInputFormat();
orcIF.setNestedFileEnumeration(recursiveEnumeration);
if (selectedFields != null) {
orcIF.selectFields(selectedFields);
}
Expand Down Expand Up @@ -175,7 +181,7 @@ public TableSchema getTableSchema() {
@Override
public TableSource<Row> projectFields(int[] selectedFields) {
// create a copy of the OrcTableSouce with new selected fields
return new OrcTableSource(path, orcSchema, orcConfig, batchSize, selectedFields, predicates);
return new OrcTableSource(path, orcSchema, orcConfig, batchSize, recursiveEnumeration, selectedFields, predicates);
}

@Override
Expand All @@ -190,7 +196,7 @@ public TableSource<Row> applyPredicate(List<Expression> predicates) {
}
}

return new OrcTableSource(path, orcSchema, orcConfig, batchSize, selectedFields, orcPredicates.toArray(new Predicate[]{}));
return new OrcTableSource(path, orcSchema, orcConfig, batchSize, recursiveEnumeration, selectedFields, orcPredicates.toArray(new Predicate[]{}));
}

@Override
Expand Down Expand Up @@ -405,8 +411,11 @@ public static class Builder {

private int batchSize = 0;

private boolean recursive = true;

/**
* Sets the path of the ORC file(s).
* If the path specifies a directory, it will be recursively enumerated.
*
* @param path The path of the ORC file(s).
* @return The builder.
Expand All @@ -418,6 +427,21 @@ public Builder path(String path) {
return this;
}

/**
* Sets the path of the ORC file(s).
*
* @param path The path of the ORC file(s).
* @param recursive Flag whether the to enumerate
* @return The builder.
*/
public Builder path(String path, boolean recursive) {
Preconditions.checkNotNull(path, "Path must not be null.");
Preconditions.checkArgument(!path.isEmpty(), "Path must not be empty.");
this.path = path;
this.recursive = recursive;
return this;
}

/**
* Sets the ORC schema of the files to read as a String.
*
Expand Down Expand Up @@ -483,7 +507,7 @@ public OrcTableSource build() {
// set default batch size
this.batchSize = DEFAULT_BATCH_SIZE;
}
return new OrcTableSource(this.path, this.schema, this.config, this.batchSize);
return new OrcTableSource(this.path, this.schema, this.config, this.batchSize, this.recursive);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
Expand All @@ -37,6 +38,7 @@
import org.apache.flink.table.expressions.ResolvedFieldReference;
import org.apache.flink.types.Row;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -233,6 +235,56 @@ public void testApplyPredicate() throws Exception {
assertFalse(orc.isFilterPushedDown());
}

@Test
public void testBuilder() throws Exception {

// validate path, schema, and recursive enumeration default (enabled)
OrcTableSource orc1 = OrcTableSource.builder()
.path(getPath(TEST_FILE_NESTED))
.forOrcSchema(TEST_SCHEMA_NESTED)
.build();

DataSet<Row> rows1 = orc1.getDataSet(ExecutionEnvironment.createLocalEnvironment());
OrcRowInputFormat orcIF1 = (OrcRowInputFormat) ((DataSource) rows1).getInputFormat();
assertEquals(true, orcIF1.getNestedFileEnumeration());
assertEquals(getPath(TEST_FILE_NESTED), orcIF1.getFilePath().toString());
assertEquals(TEST_SCHEMA_NESTED, orcIF1.getSchema());

// validate recursive enumeration disabled
OrcTableSource orc2 = OrcTableSource.builder()
.path(getPath(TEST_FILE_NESTED), false)
.forOrcSchema(TEST_SCHEMA_NESTED)
.build();

DataSet<Row> rows2 = orc2.getDataSet(ExecutionEnvironment.createLocalEnvironment());
OrcRowInputFormat orcIF2 = (OrcRowInputFormat) ((DataSource) rows2).getInputFormat();
assertEquals(false, orcIF2.getNestedFileEnumeration());

// validate Hadoop configuration
Configuration conf = new Configuration();
conf.set("testKey", "testValue");
OrcTableSource orc3 = OrcTableSource.builder()
.path(getPath(TEST_FILE_NESTED))
.forOrcSchema(TEST_SCHEMA_NESTED)
.withConfiguration(conf)
.build();

DataSet<Row> rows3 = orc3.getDataSet(ExecutionEnvironment.createLocalEnvironment());
OrcRowInputFormat orcIF3 = (OrcRowInputFormat) ((DataSource) rows3).getInputFormat();
assertEquals(conf, orcIF3.getConfiguration());

// validate batch size
OrcTableSource orc4 = OrcTableSource.builder()
.path(getPath(TEST_FILE_NESTED))
.forOrcSchema(TEST_SCHEMA_NESTED)
.withBatchSize(987)
.build();

DataSet<Row> rows4 = orc4.getDataSet(ExecutionEnvironment.createLocalEnvironment());
OrcRowInputFormat orcIF4 = (OrcRowInputFormat) ((DataSource) rows4).getInputFormat();
assertEquals(987, orcIF4.getBatchSize());
}

private String getPath(String fileName) {
return getClass().getClassLoader().getResource(fileName).getPath();
}
Expand Down