From bc9f9f3daffe47426f22f2b944f860b8cd2889da Mon Sep 17 00:00:00 2001
From: James Xu
Date: Thu, 4 May 2017 11:44:14 +0800
Subject: [PATCH 1/4] Support TextIO as SQL source/sink
---
dsls/sql/pom.xml | 5 +
.../sql/schema/text/BeamTextCSVTable.java | 66 +++++
.../schema/text/BeamTextCSVTableIOReader.java | 116 +++++++++
.../schema/text/BeamTextCSVTableIOWriter.java | 76 ++++++
.../dsls/sql/schema/text/BeamTextTable.java | 45 ++++
.../dsls/sql/schema/text/package-info.java | 22 ++
.../sql/schema/text/BeamTextCSVTableTest.java | 232 ++++++++++++++++++
7 files changed, 562 insertions(+)
create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
create mode 100644 dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java
create mode 100644 dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index 6139adafb4ed..15692e9a559c 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -204,5 +204,10 @@
hamcrest-alltest
+
+ org.apache.commons
+ commons-csv
+ 1.4
+
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
new file mode 100644
index 000000000000..b9e6b816529f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.commons.csv.CSVFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code BeamTextCSVTable} is a {@code BeamTextTable} which formatted in CSV.
+ *
+ *
+ * {@link CSVFormat} itself has many dialects, check its javadoc for more info.
+ *
+ */
+public class BeamTextCSVTable extends BeamTextTable {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(BeamTextCSVTable.class);
+
+ private CSVFormat csvFormat;
+
+ /**
+ * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
+ */
+ public BeamTextCSVTable(RelProtoDataType protoDataType, String filePattern) {
+ this(protoDataType, filePattern, CSVFormat.DEFAULT);
+ }
+
+ public BeamTextCSVTable(RelProtoDataType protoDataType, String filePattern,
+ CSVFormat csvFormat) {
+ super(protoDataType, filePattern);
+ this.csvFormat = csvFormat;
+ }
+
+ @Override
+ public PTransform super PBegin, PCollection> buildIOReader() {
+ return new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat);
+ }
+
+ @Override
+ public PTransform super PCollection, PDone> buildIOWriter() {
+ return new BeamTextCSVTableIOWriter(beamSqlRecordType, filePattern, csvFormat);
+ }
+}
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
new file mode 100644
index 000000000000..e9dcefbd146d
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringReader;
+
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IOReader for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableIOReader
+ extends PTransform>
+ implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(BeamTextCSVTableIOReader.class);
+ private String filePattern;
+ protected BeamSQLRecordType beamSqlRecordType;
+ protected CSVFormat csvFormat;
+
+ public BeamTextCSVTableIOReader(BeamSQLRecordType beamSqlRecordType, String filePattern,
+ CSVFormat csvFormat) {
+ this.filePattern = filePattern;
+ this.beamSqlRecordType = beamSqlRecordType;
+ this.csvFormat = csvFormat;
+ }
+
+ @Override
+ public PCollection expand(PBegin input) {
+ return input.apply("decodeRecord", TextIO.Read.from(filePattern))
+ .apply(ParDo.of(new DoFn() {
+ @ProcessElement
+ public void processElement(ProcessContext ctx) {
+ String str = ctx.element();
+
+ StringReader reader = new StringReader(str);
+ CSVRecord rawRecord = null;
+ try {
+ CSVParser parser = csvFormat.parse(reader);
+ rawRecord = parser.getRecords().get(0);
+ } catch (IOException e) {
+ LOG.error("error record: " + str, e);
+ }
+
+ BeamSQLRow row = new BeamSQLRow(beamSqlRecordType);
+ if (rawRecord.size() != beamSqlRecordType.size()) {
+ LOG.error("invalid record: {}, expect {} fields, but actually {}",
+ str, beamSqlRecordType.size(), rawRecord.size());
+ } else {
+ for (int idx = 0; idx < beamSqlRecordType.size(); idx++) {
+ String raw = rawRecord.get(idx);
+ addFieldWithAutoTypeCasting(row, idx, raw);
+ }
+ ctx.output(row);
+ }
+ }
+ }));
+ }
+
+ public void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, String raw) {
+ switch (row.getDataType().getFieldsType().get(idx)) {
+ case TINYINT:
+ row.addField(idx, Byte.valueOf(raw));
+ break;
+ case SMALLINT:
+ row.addField(idx, Short.valueOf(raw));
+ break;
+ case INTEGER:
+ row.addField(idx, Integer.valueOf(raw));
+ break;
+ case BIGINT:
+ row.addField(idx, Long.valueOf(raw));
+ break;
+ case FLOAT:
+ row.addField(idx, Float.valueOf(raw));
+ break;
+ case DOUBLE:
+ row.addField(idx, Double.valueOf(raw));
+ break;
+ case VARCHAR:
+ row.addField(idx, raw);
+ break;
+ default:
+ row.addField(idx, raw);
+ }
+ }
+}
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
new file mode 100644
index 000000000000..0653972f854c
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.StringWriter;
+
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IOWriter for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableIOWriter extends PTransform, PDone>
+ implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(BeamTextCSVTableIOWriter.class);
+
+ private String filePattern;
+ protected BeamSQLRecordType beamSqlRecordType;
+ protected CSVFormat csvFormat;
+
+ public BeamTextCSVTableIOWriter(BeamSQLRecordType beamSqlRecordType, String filePattern,
+ CSVFormat csvFormat) {
+ this.filePattern = filePattern;
+ this.beamSqlRecordType = beamSqlRecordType;
+ this.csvFormat = csvFormat;
+ }
+
+ @Override public PDone expand(PCollection input) {
+ return input.apply("encodeRecord", ParDo.of(new DoFn() {
+
+ @ProcessElement public void processElement(ProcessContext ctx) {
+ BeamSQLRow row = ctx.element();
+ StringWriter writer = new StringWriter();
+
+ try (CSVPrinter printer = csvFormat.print(writer)) {
+ for (int i = 0; i < row.size(); i++) {
+ printer.print(row.getFieldValue(i).toString());
+ }
+ printer.println();
+ } catch (IOException e) {
+ LOG.error("invalid record: " + row, e);
+ }
+
+ ctx.output(writer.toString());
+ }
+ })).apply(TextIO.Write.to(filePattern));
+ }
+}
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
new file mode 100644
index 000000000000..335376194cac
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import java.io.Serializable;
+
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.calcite.rel.type.RelProtoDataType;
+
+/**
+ * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}).
+ */
+public abstract class BeamTextTable extends BaseBeamTable implements Serializable {
+ protected String filePattern;
+ protected BeamTextTable(RelProtoDataType protoRowType) {
+ super(protoRowType);
+ }
+
+ protected BeamTextTable(RelProtoDataType protoDataType, String filePattern) {
+ super(protoDataType);
+ this.filePattern = filePattern;
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return BeamIOType.BOUNDED;
+ }
+}
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java
new file mode 100644
index 000000000000..f48f2fe5c250
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Table schema for text files.
+ */
+package org.apache.beam.dsls.sql.schema.text;
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
new file mode 100644
index 000000000000..343ec170488a
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.schema.text;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordTypeCoder;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests for {@code BeamTextCSVTable}.
+ */
+public class BeamTextCSVTableTest {
+
+ /**
+ * testData.
+ *
+ *
+ * The types of the csv fields are:
+ * integer,bigint,float,double,string
+ *
+ */
+ private static List
*/
- private static List testData = new ArrayList() {{
- add(new Object[] { 1, 1L, 1.1F, 1.1, "james" });
- add(new Object[] { 2, 2L, 2.2F, 2.2, "bond" });
+ private static Object[] data1 = new Object[] { 1, 1L, 1.1F, 1.1, "james" };
+ private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" };
+
+ private static List testData = Arrays.asList(data1, data2);
+ private static List testDataRows = new ArrayList() {{
+ for (Object[] data : testData) {
+ add(buildRow(data));
+ }
}};
private static ConcurrentLinkedQueue actualData = new ConcurrentLinkedQueue<>();
@@ -81,34 +81,27 @@ public class BeamTextCSVTableTest {
private static File readerSourceFile;
private static File writerTargetFile;
-
@Test public void testBuildIOReader() {
- pipeline.apply(
- new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader())
- .apply(ParDo.of(new TeeFn()));
+ PCollection rows = pipeline.apply(
+ new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader());
+ PAssert.that(rows).containsInAnyOrder(testDataRows);
pipeline.run();
-
- equalsIgnoreOrder(testData, actualData);
}
@Test public void testBuildIOWriter() {
// reader from a source file, then write into a target file
pipeline.apply(
new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader())
- .apply(ParDo.of(new TeeFn())).apply(
- new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath()).buildIOWriter());
+ .apply(new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath())
+ .buildIOWriter());
pipeline.run();
- // read from the target file
- actualData.clear();
-
- pipeline2.apply(
- new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath()).buildIOReader())
- .apply(ParDo.of(new TeeFn()));
- pipeline2.run();
+ PCollection rows = pipeline2.apply(
+ new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath()).buildIOReader());
// confirm the two reads match
- equalsIgnoreOrder(testData, actualData);
+ PAssert.that(rows).containsInAnyOrder(testDataRows);
+ pipeline2.run();
}
@BeforeClass public static void setUp() throws IOException {
@@ -159,63 +152,28 @@ private static void writeToStreamAndClose(List rows, OutputStream outp
}
}
- /**
- * Keep a copy and forward.
- */
- private static class TeeFn extends DoFn implements Serializable {
-
- @ProcessElement public void processElement(ProcessContext ctx) {
- Object[] row = new Object[5];
- for (int i = 0; i < ctx.element().size(); i++) {
- row[i] = ctx.element().getFieldValue(i);
- }
- actualData.add(row);
- ctx.output(ctx.element());
- }
- }
-
- /**
- * Compares two list of objects ignores their order difference.
- *
- *