Skip to content

Commit

Permalink
[FLINK-13286][table-common] Port FileSystem and FileSystemValidator t…
Browse files Browse the repository at this point in the history
…o common
  • Loading branch information
JingsongLi committed Jul 19, 2019
1 parent d0b04b6 commit c4c1a18
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 86 deletions.
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.descriptors;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Map;

import static org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_PATH;
import static org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_TYPE_VALUE;

/**
* Connector descriptor for a file system.
*/
@PublicEvolving
public class FileSystem extends ConnectorDescriptor {

private String path = null;

public FileSystem() {
super(CONNECTOR_TYPE_VALUE, 1, true);
}

/**
* Sets the path to a file or directory in a file system.
*
* @param path the path a file or directory
*/
public FileSystem path(String path) {
this.path = path;
return this;
}

@Override
protected Map<String, String> toConnectorProperties() {
DescriptorProperties properties = new DescriptorProperties();
if (path != null) {
properties.putString(CONNECTOR_PATH, path);
}
return properties.asMap();
}
}
Expand Up @@ -16,26 +16,23 @@
* limitations under the License.
*/

package org.apache.flink.table.descriptors
package org.apache.flink.table.descriptors;

import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE
import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
import org.apache.flink.annotation.PublicEvolving;

/**
* Validator for [[FileSystem]].
*/
class FileSystemValidator extends ConnectorDescriptorValidator {

override def validate(properties: DescriptorProperties): Unit = {
super.validate(properties)
properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, false)
properties.validateString(CONNECTOR_PATH, false, 1)
}
}

object FileSystemValidator {
* Validator for {@link FileSystem}.
*/
@PublicEvolving
public class FileSystemValidator extends ConnectorDescriptorValidator {

val CONNECTOR_TYPE_VALUE = "filesystem"
val CONNECTOR_PATH = "connector.path"
public static final String CONNECTOR_TYPE_VALUE = "filesystem";
public static final String CONNECTOR_PATH = "connector.path";

@Override
public void validate(DescriptorProperties properties) {
super.validate(properties);
properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, false);
properties.validateString(CONNECTOR_PATH, false, 1);
}
}

This file was deleted.

Expand Up @@ -40,7 +40,7 @@ class FileSystemTest extends DescriptorTestBase {
// ----------------------------------------------------------------------------------------------

override def descriptors(): util.List[Descriptor] = {
util.Arrays.asList(FileSystem().path("/myfile"))
util.Arrays.asList(new FileSystem().path("/myfile"))
}

override def validator(): DescriptorValidator = {
Expand Down
Expand Up @@ -56,7 +56,7 @@ class TableDescriptorTest extends TableTestBase {
// schema.field("proctime", Types.SQL_TIMESTAMP).proctime()
//}

val connector = FileSystem()
val connector = new FileSystem()
.path("/path/to/csv")

val format = OldCsv()
Expand Down
Expand Up @@ -67,7 +67,7 @@ object CommonTestData {
)
val tempFilePath1 = writeToTempFile(csvRecord1.mkString("\n"), "csv-test1", "tmp")

val connDesc1 = FileSystem().path(tempFilePath1)
val connDesc1 = new FileSystem().path(tempFilePath1)
val formatDesc1 = OldCsv()
.field("a", Types.INT)
.field("b", Types.LONG)
Expand Down Expand Up @@ -106,7 +106,7 @@ object CommonTestData {
)
val tempFilePath2 = writeToTempFile(csvRecord2.mkString("\n"), "csv-test2", "tmp")

val connDesc2 = FileSystem().path(tempFilePath2)
val connDesc2 = new FileSystem().path(tempFilePath2)
val formatDesc2 = OldCsv()
.field("d", Types.INT)
.field("e", Types.LONG)
Expand All @@ -131,7 +131,7 @@ object CommonTestData {
}

val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
val connDesc3 = FileSystem().path(tempFilePath3)
val connDesc3 = new FileSystem().path(tempFilePath3)
val formatDesc3 = OldCsv()
.field("x", Types.INT)
.field("y", Types.LONG)
Expand Down

0 comments on commit c4c1a18

Please sign in to comment.