From c4c1a184388439efb33ff7d130f3dba348b41194 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 18 Jul 2019 20:59:40 +0800 Subject: [PATCH] [FLINK-13286][table-common] Port FileSystem and FileSystemValidator to common --- .../flink/table/descriptors/FileSystem.java | 58 +++++++++++++++++ .../descriptors/FileSystemValidator.java} | 31 ++++----- .../flink/table/descriptors/FileSystem.scala | 64 ------------------- .../table/descriptors/FileSystemTest.scala | 2 +- .../descriptors/TableDescriptorTest.scala | 2 +- .../table/runtime/utils/CommonTestData.scala | 6 +- 6 files changed, 77 insertions(+), 86 deletions(-) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java rename flink-table/{flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala => flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java} (54%) delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java new file mode 100644 index 00000000000000..291c0f946ba019 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +import static org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_PATH; +import static org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_TYPE_VALUE; + +/** + * Connector descriptor for a file system. + */ +@PublicEvolving +public class FileSystem extends ConnectorDescriptor { + + private String path = null; + + public FileSystem() { + super(CONNECTOR_TYPE_VALUE, 1, true); + } + + /** + * Sets the path to a file or directory in a file system. + * + * @param path the path a file or directory + */ + public FileSystem path(String path) { + this.path = path; + return this; + } + + @Override + protected Map toConnectorProperties() { + DescriptorProperties properties = new DescriptorProperties(); + if (path != null) { + properties.putString(CONNECTOR_PATH, path); + } + return properties.asMap(); + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java similarity index 54% rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java index 4d1b7deee2aab9..d0e2e1b0111739 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java @@ -16,26 +16,23 @@ * limitations under the License. */ -package org.apache.flink.table.descriptors +package org.apache.flink.table.descriptors; -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE -import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE} +import org.apache.flink.annotation.PublicEvolving; /** - * Validator for [[FileSystem]]. - */ -class FileSystemValidator extends ConnectorDescriptorValidator { - - override def validate(properties: DescriptorProperties): Unit = { - super.validate(properties) - properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, false) - properties.validateString(CONNECTOR_PATH, false, 1) - } -} - -object FileSystemValidator { + * Validator for {@link FileSystem}. + */ +@PublicEvolving +public class FileSystemValidator extends ConnectorDescriptorValidator { - val CONNECTOR_TYPE_VALUE = "filesystem" - val CONNECTOR_PATH = "connector.path" + public static final String CONNECTOR_TYPE_VALUE = "filesystem"; + public static final String CONNECTOR_PATH = "connector.path"; + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, false); + properties.validateString(CONNECTOR_PATH, false, 1); + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala deleted file mode 100644 index 77cf27b39ada55..00000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import java.util - -import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE} - -/** - * Connector descriptor for a file system. - */ -class FileSystem extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, 1, true) { - - private var path: Option[String] = None - - /** - * Sets the path to a file or directory in a file system. - * - * @param path the path a file or directory - */ - def path(path: String): FileSystem = { - this.path = Some(path) - this - } - - override protected def toConnectorProperties: util.Map[String, String] = { - val properties = new DescriptorProperties() - - path.foreach(properties.putString(CONNECTOR_PATH, _)) - - properties.asMap() - } -} - -/** - * Connector descriptor for a file system. - */ -object FileSystem { - - /** - * Connector descriptor for a file system. - * - * @deprecated Use `new FileSystem()`. - */ - @deprecated - def apply(): FileSystem = new FileSystem() - -} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala index 1162694a01dc73..d232a0d1db68ad 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala @@ -40,7 +40,7 @@ class FileSystemTest extends DescriptorTestBase { // ---------------------------------------------------------------------------------------------- override def descriptors(): util.List[Descriptor] = { - util.Arrays.asList(FileSystem().path("/myfile")) + util.Arrays.asList(new FileSystem().path("/myfile")) } override def validator(): DescriptorValidator = { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala index df4d3fc798f82f..555a030521d6cf 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala @@ -56,7 +56,7 @@ class TableDescriptorTest extends TableTestBase { // schema.field("proctime", Types.SQL_TIMESTAMP).proctime() //} - val connector = FileSystem() + val connector = new FileSystem() .path("/path/to/csv") val format = OldCsv() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala index 0d633fea07798b..b5ada5db2d518e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala @@ -67,7 +67,7 @@ object CommonTestData { ) val tempFilePath1 = writeToTempFile(csvRecord1.mkString("\n"), "csv-test1", "tmp") - val connDesc1 = FileSystem().path(tempFilePath1) + val connDesc1 = new FileSystem().path(tempFilePath1) val formatDesc1 = OldCsv() .field("a", Types.INT) .field("b", Types.LONG) @@ -106,7 +106,7 @@ object CommonTestData { ) val tempFilePath2 = writeToTempFile(csvRecord2.mkString("\n"), "csv-test2", "tmp") - val connDesc2 = FileSystem().path(tempFilePath2) + val connDesc2 = new FileSystem().path(tempFilePath2) val formatDesc2 = OldCsv() .field("d", Types.INT) .field("e", Types.LONG) @@ -131,7 +131,7 @@ object CommonTestData { } val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp") - val connDesc3 = FileSystem().path(tempFilePath3) + val connDesc3 = new FileSystem().path(tempFilePath3) val formatDesc3 = OldCsv() .field("x", Types.INT) .field("y", Types.LONG)