diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java new file mode 100644 index 0000000000000..1d98ad3efc1bb --- /dev/null +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.sources.tsextractors.ExistingField; +import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp; +import org.apache.flink.table.sources.tsextractors.TimestampExtractor; +import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps; +import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps; +import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks; +import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy; +import org.apache.flink.table.utils.EncodingUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; + +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED; + +/** + * Validator for {@link Rowtime}. + */ +@PublicEvolving +public class RowtimeValidator implements DescriptorValidator { + + private final boolean supportsSourceTimestamps; + private final boolean supportsSourceWatermarks; + private final String prefix; + + public RowtimeValidator(boolean supportsSourceTimestamps, boolean supportsSourceWatermarks) { + this(supportsSourceTimestamps, supportsSourceWatermarks, ""); + } + + public RowtimeValidator(boolean supportsSourceTimestamps, boolean supportsSourceWatermarks, String prefix) { + this.supportsSourceTimestamps = supportsSourceTimestamps; + this.supportsSourceWatermarks = supportsSourceWatermarks; + this.prefix = prefix; + } + + @Override + public void validate(DescriptorProperties properties) { + Consumer timestampExistingField = + s -> properties.validateString(prefix + ROWTIME_TIMESTAMPS_FROM, false, 1); + + Consumer timestampCustom = s -> { + properties.validateString(prefix + ROWTIME_TIMESTAMPS_CLASS, false, 1); + properties.validateString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED, false, 1); + }; + + Map> timestampsValidation = new HashMap<>(); + if (supportsSourceTimestamps) { + timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD, timestampExistingField); + timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE, DescriptorProperties.noValidation()); + timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM, timestampCustom); + } else { + timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD, timestampExistingField); + timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM, timestampCustom); + } + + properties.validateEnum(prefix + ROWTIME_TIMESTAMPS_TYPE, false, timestampsValidation); + + Consumer watermarkPeriodicBounded = + s -> properties.validateLong(prefix + ROWTIME_WATERMARKS_DELAY, false, 0); + + Consumer watermarkCustom = s -> { + properties.validateString(prefix + ROWTIME_WATERMARKS_CLASS, false, 1); + properties.validateString(prefix + ROWTIME_WATERMARKS_SERIALIZED, false, 1); + }; + + Map> watermarksValidation = new HashMap<>(); + if (supportsSourceWatermarks) { + watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING, DescriptorProperties.noValidation()); + watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED, watermarkPeriodicBounded); + watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE, DescriptorProperties.noValidation()); + watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM, watermarkCustom); + } else { + watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING, DescriptorProperties.noValidation()); + watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED, watermarkPeriodicBounded); + watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM, watermarkCustom); + } + + properties.validateEnum(prefix + ROWTIME_WATERMARKS_TYPE, false, watermarksValidation); + } + + // utilities + + public static Optional> getRowtimeComponents( + DescriptorProperties properties, String prefix) { + // create timestamp extractor + TimestampExtractor extractor; + Optional t = properties.getOptionalString(prefix + ROWTIME_TIMESTAMPS_TYPE); + if (!t.isPresent()) { + return Optional.empty(); + } + + switch (t.get()) { + case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD: + String field = properties.getString(prefix + ROWTIME_TIMESTAMPS_FROM); + extractor = new ExistingField(field); + break; + case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE: + extractor = StreamRecordTimestamp.INSTANCE; + break; + case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM: + Class clazz = properties.getClass( + prefix + ROWTIME_TIMESTAMPS_CLASS, TimestampExtractor.class); + extractor = EncodingUtils.decodeStringToObject( + properties.getString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED), + clazz); + break; + default: + throw new RuntimeException("Unsupported rowtime timestamps type: " + t.get()); + } + + // create watermark strategy + WatermarkStrategy strategy; + String s = properties.getString(prefix + ROWTIME_WATERMARKS_TYPE); + switch (s) { + case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING: + strategy = new AscendingTimestamps(); + break; + case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED: + long delay = properties.getLong(prefix + ROWTIME_WATERMARKS_DELAY); + strategy = new BoundedOutOfOrderTimestamps(delay); + break; + case ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE: + strategy = PreserveWatermarks.INSTANCE; + break; + case ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM: + Class clazz = properties.getClass( + prefix + ROWTIME_WATERMARKS_CLASS, WatermarkStrategy.class); + strategy = EncodingUtils.decodeStringToObject( + properties.getString(prefix + ROWTIME_WATERMARKS_SERIALIZED), + clazz); + break; + default: + throw new RuntimeException("Unsupported rowtime timestamps type: " + s); + } + + return Optional.of(new Tuple2<>(extractor, strategy)); + } +} diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java new file mode 100644 index 0000000000000..2ad92c1c8f943 --- /dev/null +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.factories.TableFormatFactory; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; +import org.apache.flink.table.sources.tsextractors.TimestampExtractor; +import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static java.lang.String.format; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED; +import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE; +import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME; +import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; + +/** + * Validator for {@link Schema}. + */ +@PublicEvolving +public class SchemaValidator implements DescriptorValidator { + + private final boolean isStreamEnvironment; + private final boolean supportsSourceTimestamps; + private final boolean supportsSourceWatermarks; + + public SchemaValidator(boolean isStreamEnvironment, boolean supportsSourceTimestamps, + boolean supportsSourceWatermarks) { + this.isStreamEnvironment = isStreamEnvironment; + this.supportsSourceTimestamps = supportsSourceTimestamps; + this.supportsSourceWatermarks = supportsSourceWatermarks; + } + + @Override + public void validate(DescriptorProperties properties) { + Map names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME); + Map types = properties.getIndexedProperty(SCHEMA, SCHEMA_TYPE); + + if (names.isEmpty() && types.isEmpty()) { + throw new ValidationException( + format("Could not find the required schema in property '%s'.", SCHEMA)); + } + + boolean proctimeFound = false; + + for (int i = 0; i < Math.max(names.size(), types.size()); i++) { + properties.validateString(SCHEMA + "." + i + "." + SCHEMA_NAME, false, 1); + properties.validateType(SCHEMA + "." + i + "." + SCHEMA_TYPE, false, false); + properties.validateString(SCHEMA + "." + i + "." + SCHEMA_FROM, true, 1); + // either proctime or rowtime + String proctime = SCHEMA + "." + i + "." + SCHEMA_PROCTIME; + String rowtime = SCHEMA + "." + i + "." + ROWTIME; + if (properties.containsKey(proctime)) { + // check the environment + if (!isStreamEnvironment) { + throw new ValidationException( + format("Property '%s' is not allowed in a batch environment.", proctime)); + } + // check for only one proctime attribute + else if (proctimeFound) { + throw new ValidationException("A proctime attribute must only be defined once."); + } + // check proctime + properties.validateBoolean(proctime, false); + proctimeFound = properties.getBoolean(proctime); + // no rowtime + properties.validatePrefixExclusion(rowtime); + } else if (properties.hasPrefix(rowtime)) { + // check rowtime + RowtimeValidator rowtimeValidator = new RowtimeValidator( + supportsSourceTimestamps, + supportsSourceWatermarks, + SCHEMA + "." + i + "."); + rowtimeValidator.validate(properties); + // no proctime + properties.validateExclusion(proctime); + } + } + } + + /** + * Returns keys for a {@link TableFormatFactory#supportedProperties()} method that + * are accepted for schema derivation using {@code deriveFormatFields(DescriptorProperties)}. + */ + public static List getSchemaDerivationKeys() { + List keys = new ArrayList<>(); + + // schema + keys.add(SCHEMA + ".#." + SCHEMA_TYPE); + keys.add(SCHEMA + ".#." + SCHEMA_NAME); + keys.add(SCHEMA + ".#." + SCHEMA_FROM); + + // time attributes + keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME); + keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE); + keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM); + keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS); + keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED); + keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE); + keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS); + keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED); + keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY); + + return keys; + } + + /** + * Finds the proctime attribute if defined. + */ + public static Optional deriveProctimeAttribute(DescriptorProperties properties) { + Map names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME); + + for (int i = 0; i < names.size(); i++) { + Optional isProctime = properties.getOptionalBoolean(SCHEMA + "." + i + "." + SCHEMA_PROCTIME); + if (isProctime.isPresent() && isProctime.get()) { + return Optional.of(names.get(SCHEMA + "." + i + "." + SCHEMA_NAME)); + } + } + return Optional.empty(); + } + + /** + * Finds the rowtime attributes if defined. + */ + public static List deriveRowtimeAttributes( + DescriptorProperties properties) { + + Map names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME); + + List attributes = new ArrayList<>(); + + // check for rowtime in every field + for (int i = 0; i < names.size(); i++) { + Optional> rowtimeComponents = RowtimeValidator + .getRowtimeComponents(properties, SCHEMA + "." + i + "."); + int index = i; + // create descriptor + rowtimeComponents.ifPresent(tuple2 -> attributes.add(new RowtimeAttributeDescriptor( + properties.getString(SCHEMA + "." + index + "." + SCHEMA_NAME), + tuple2.f0, + tuple2.f1)) + ); + } + + return attributes; + } + + /** + * Derives the table schema for a table sink. A sink ignores a proctime attribute and + * needs to track the origin of a rowtime field. + * + * @deprecated This method combines two separate concepts of table schema and field mapping. + * This should be split into two methods once we have support for + * the corresponding interfaces (see FLINK-9870). + */ + @Deprecated + public static TableSchema deriveTableSinkSchema(DescriptorProperties properties) { + TableSchema.Builder builder = TableSchema.builder(); + + TableSchema schema = properties.getTableSchema(SCHEMA); + + for (int i = 0; i < schema.getFieldCount(); i++) { + TypeInformation t = schema.getFieldTypes()[i]; + String n = schema.getFieldNames()[i]; + boolean isProctime = properties + .getOptionalBoolean(SCHEMA + "." + i + "." + SCHEMA_PROCTIME) + .orElse(false); + String tsType = SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_TYPE; + boolean isRowtime = properties.containsKey(tsType); + if (!isProctime && !isRowtime) { + // check for a aliasing + String fieldName = properties.getOptionalString(SCHEMA + "." + i + "." + SCHEMA_FROM) + .orElse(n); + builder.field(fieldName, t); + } + // only use the rowtime attribute if it references a field + else if (isRowtime) { + switch (properties.getString(tsType)) { + case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD: + String field = properties.getString(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_FROM); + builder.field(field, t); + break; + // other timestamp strategies require a reverse timestamp extractor to + // insert the timestamp into the output + default: + throw new TableException(format("Unsupported rowtime type '%s' for sink" + + " table schema. Currently only '%s' is supported for table sinks.", + t, ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD)); + } + } + } + + return builder.build(); + } + + /** + * Finds a table source field mapping. + * + * @param properties The properties describing a schema. + * @param inputType The input type that a connector and/or format produces. This parameter + * can be used to resolve a rowtime field against an input field. + */ + public static Map deriveFieldMapping( + DescriptorProperties properties, Optional> inputType) { + + Map mapping = new HashMap<>(); + + TableSchema schema = properties.getTableSchema(SCHEMA); + + List columnNames = new ArrayList<>(); + inputType.ifPresent(t -> columnNames.addAll(Arrays.asList(((CompositeType) t).getFieldNames()))); + + // add all source fields first because rowtime might reference one of them + columnNames.forEach(name -> mapping.put(name, name)); + + // add all schema fields first for implicit mappings + Arrays.stream(schema.getFieldNames()).forEach(name -> mapping.put(name, name)); + + Map names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME); + + for (int i = 0; i < names.size(); i++) { + String name = properties.getString(SCHEMA + "." + i + "." + SCHEMA_NAME); + Optional source = properties.getOptionalString(SCHEMA + "." + i + "." + SCHEMA_FROM); + if (source.isPresent()) { + // add explicit mapping + mapping.put(name, source.get()); + } else { // implicit mapping or time + boolean isProctime = properties + .getOptionalBoolean(SCHEMA + "." + i + "." + SCHEMA_PROCTIME) + .orElse(false); + boolean isRowtime = properties + .containsKey(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_TYPE); + // remove proctime/rowtime from mapping + if (isProctime || isRowtime) { + mapping.remove(name); + } + // check for invalid fields + else if (!columnNames.contains(name)) { + throw new ValidationException(format("Could not map the schema field '%s' to a field " + + "from source. Please specify the source field from which it can be derived.", name)); + } + } + } + + return mapping; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java new file mode 100644 index 0000000000000..291c0f946ba01 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +import static org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_PATH; +import static org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_TYPE_VALUE; + +/** + * Connector descriptor for a file system. + */ +@PublicEvolving +public class FileSystem extends ConnectorDescriptor { + + private String path = null; + + public FileSystem() { + super(CONNECTOR_TYPE_VALUE, 1, true); + } + + /** + * Sets the path to a file or directory in a file system. + * + * @param path the path a file or directory + */ + public FileSystem path(String path) { + this.path = path; + return this; + } + + @Override + protected Map toConnectorProperties() { + DescriptorProperties properties = new DescriptorProperties(); + if (path != null) { + properties.putString(CONNECTOR_PATH, path); + } + return properties.asMap(); + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java similarity index 54% rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java index 4d1b7deee2aab..d0e2e1b011173 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java @@ -16,26 +16,23 @@ * limitations under the License. */ -package org.apache.flink.table.descriptors +package org.apache.flink.table.descriptors; -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE -import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE} +import org.apache.flink.annotation.PublicEvolving; /** - * Validator for [[FileSystem]]. - */ -class FileSystemValidator extends ConnectorDescriptorValidator { - - override def validate(properties: DescriptorProperties): Unit = { - super.validate(properties) - properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, false) - properties.validateString(CONNECTOR_PATH, false, 1) - } -} - -object FileSystemValidator { + * Validator for {@link FileSystem}. + */ +@PublicEvolving +public class FileSystemValidator extends ConnectorDescriptorValidator { - val CONNECTOR_TYPE_VALUE = "filesystem" - val CONNECTOR_PATH = "connector.path" + public static final String CONNECTOR_TYPE_VALUE = "filesystem"; + public static final String CONNECTOR_PATH = "connector.path"; + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, false); + properties.validateString(CONNECTOR_PATH, false, 1); + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala deleted file mode 100644 index 77cf27b39ada5..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import java.util - -import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE} - -/** - * Connector descriptor for a file system. - */ -class FileSystem extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, 1, true) { - - private var path: Option[String] = None - - /** - * Sets the path to a file or directory in a file system. - * - * @param path the path a file or directory - */ - def path(path: String): FileSystem = { - this.path = Some(path) - this - } - - override protected def toConnectorProperties: util.Map[String, String] = { - val properties = new DescriptorProperties() - - path.foreach(properties.putString(CONNECTOR_PATH, _)) - - properties.asMap() - } -} - -/** - * Connector descriptor for a file system. - */ -object FileSystem { - - /** - * Connector descriptor for a file system. - * - * @deprecated Use `new FileSystem()`. - */ - @deprecated - def apply(): FileSystem = new FileSystem() - -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala deleted file mode 100644 index a3b6e8d2bce04..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.table.descriptors.Rowtime._ -import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor} -import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy} -import org.apache.flink.table.util.JavaScalaConversionUtil.toJava -import org.apache.flink.table.utils.EncodingUtils - -import scala.collection.JavaConverters._ - -/** - * Validator for [[Rowtime]]. - */ -class RowtimeValidator( - supportsSourceTimestamps: Boolean, - supportsSourceWatermarks: Boolean, - prefix: String = "") - extends DescriptorValidator { - - override def validate(properties: DescriptorProperties): Unit = { - val timestampExistingField = (_: String) => { - properties.validateString( - prefix + ROWTIME_TIMESTAMPS_FROM, false, 1) - } - - val timestampCustom = (_: String) => { - properties.validateString( - prefix + ROWTIME_TIMESTAMPS_CLASS, false, 1) - properties.validateString( - prefix + ROWTIME_TIMESTAMPS_SERIALIZED, false, 1) - } - - val timestampsValidation = if (supportsSourceTimestamps) { - Map( - ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField), - ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> DescriptorProperties.noValidation(), - ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom)) - } else { - Map( - ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField), - ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom)) - } - - properties.validateEnum( - prefix + ROWTIME_TIMESTAMPS_TYPE, - false, - timestampsValidation.asJava - ) - - val watermarkPeriodicBounded = (_: String) => { - properties.validateLong( - prefix + ROWTIME_WATERMARKS_DELAY, false, 0) - } - - val watermarkCustom = (_: String) => { - properties.validateString( - prefix + ROWTIME_WATERMARKS_CLASS, false, 1) - properties.validateString( - prefix + ROWTIME_WATERMARKS_SERIALIZED, false, 1) - } - - val watermarksValidation = if (supportsSourceWatermarks) { - Map( - ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> DescriptorProperties.noValidation(), - ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded), - ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE -> DescriptorProperties.noValidation(), - ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom)) - } else { - Map( - ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> DescriptorProperties.noValidation(), - ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded), - ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom)) - } - - properties.validateEnum( - prefix + ROWTIME_WATERMARKS_TYPE, - false, - watermarksValidation.asJava - ) - } -} - -object RowtimeValidator { - - // utilities - - def getRowtimeComponents(properties: DescriptorProperties, prefix: String) - : Option[(TimestampExtractor, WatermarkStrategy)] = { - - // create timestamp extractor - val t = properties.getOptionalString(prefix + ROWTIME_TIMESTAMPS_TYPE) - if (!t.isPresent) { - return None - } - val extractor: TimestampExtractor = t.get() match { - - case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD => - val field = properties.getString(prefix + ROWTIME_TIMESTAMPS_FROM) - new ExistingField(field) - - case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE => - StreamRecordTimestamp.INSTANCE - - case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM => - val clazz = properties.getClass( - prefix + ROWTIME_TIMESTAMPS_CLASS, - classOf[TimestampExtractor]) - EncodingUtils.decodeStringToObject( - properties.getString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED), - clazz) - } - - // create watermark strategy - val s = properties.getString(prefix + ROWTIME_WATERMARKS_TYPE) - val strategy: WatermarkStrategy = s match { - - case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING => - new AscendingTimestamps() - - case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED => - val delay = properties.getLong(prefix + ROWTIME_WATERMARKS_DELAY) - new BoundedOutOfOrderTimestamps(delay) - - case ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE => - PreserveWatermarks.INSTANCE - - case ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM => - val clazz = properties.getClass( - prefix + ROWTIME_WATERMARKS_CLASS, - classOf[WatermarkStrategy]) - EncodingUtils.decodeStringToObject( - properties.getString(prefix + ROWTIME_WATERMARKS_SERIALIZED), - clazz) - } - - Some((extractor, strategy)) - } -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala deleted file mode 100644 index f5769bf5bbafa..0000000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import java.util -import java.util.Optional - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.table.api.{TableException, TableSchema, ValidationException} -import org.apache.flink.table.descriptors.Rowtime._ -import org.apache.flink.table.descriptors.Schema._ -import org.apache.flink.table.sources.RowtimeAttributeDescriptor -import org.apache.flink.table.util.JavaScalaConversionUtil.{toJava, toScala} - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -/** - * Validator for [[Schema]]. - */ -class SchemaValidator( - isStreamEnvironment: Boolean, - supportsSourceTimestamps: Boolean, - supportsSourceWatermarks: Boolean) - extends DescriptorValidator { - - override def validate(properties: DescriptorProperties): Unit = { - val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) - val types = properties.getIndexedProperty(SCHEMA, SCHEMA_TYPE) - - if (names.isEmpty && types.isEmpty) { - throw new ValidationException( - s"Could not find the required schema in property '$SCHEMA'.") - } - - var proctimeFound = false - - for (i <- 0 until Math.max(names.size, types.size)) { - properties - .validateString(s"$SCHEMA.$i.$SCHEMA_NAME", false, 1) - properties - .validateType(s"$SCHEMA.$i.$SCHEMA_TYPE", false, false) - properties - .validateString(s"$SCHEMA.$i.$SCHEMA_FROM", true, 1) - // either proctime or rowtime - val proctime = s"$SCHEMA.$i.$SCHEMA_PROCTIME" - val rowtime = s"$SCHEMA.$i.$ROWTIME" - if (properties.containsKey(proctime)) { - // check the environment - if (!isStreamEnvironment) { - throw new ValidationException( - s"Property '$proctime' is not allowed in a batch environment.") - } - // check for only one proctime attribute - else if (proctimeFound) { - throw new ValidationException("A proctime attribute must only be defined once.") - } - // check proctime - properties.validateBoolean(proctime, false) - proctimeFound = properties.getBoolean(proctime) - // no rowtime - properties.validatePrefixExclusion(rowtime) - } else if (properties.hasPrefix(rowtime)) { - // check rowtime - val rowtimeValidator = new RowtimeValidator( - supportsSourceTimestamps, - supportsSourceWatermarks, - s"$SCHEMA.$i.") - rowtimeValidator.validate(properties) - // no proctime - properties.validateExclusion(proctime) - } - } - } -} - -object SchemaValidator { - - /** - * Returns keys for a - * [[org.apache.flink.table.factories.TableFormatFactory.supportedProperties()]] method that - * are accepted for schema derivation using [[deriveFormatFields(DescriptorProperties)]]. - */ - def getSchemaDerivationKeys: util.List[String] = { - val keys = new util.ArrayList[String]() - - // schema - keys.add(SCHEMA + ".#." + SCHEMA_TYPE) - keys.add(SCHEMA + ".#." + SCHEMA_NAME) - keys.add(SCHEMA + ".#." + SCHEMA_FROM) - - // time attributes - keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME) - keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE) - keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM) - keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS) - keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED) - keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE) - keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS) - keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED) - keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY) - - keys - } - - // utilities - - /** - * Finds the proctime attribute if defined. - */ - def deriveProctimeAttribute(properties: DescriptorProperties): Optional[String] = { - val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) - - for (i <- 0 until names.size) { - val isProctime = toScala( - properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")) - isProctime.foreach { isSet => - if (isSet) { - return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME")) - } - } - } - toJava(None) - } - - /** - * Finds the rowtime attributes if defined. - */ - def deriveRowtimeAttributes(properties: DescriptorProperties) - : util.List[RowtimeAttributeDescriptor] = { - - val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) - - var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]() - - // check for rowtime in every field - for (i <- 0 until names.size) { - RowtimeValidator - .getRowtimeComponents(properties, s"$SCHEMA.$i.") - .foreach { case (extractor, strategy) => - // create descriptor - attributes += new RowtimeAttributeDescriptor( - properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"), - extractor, - strategy) - } - } - - attributes.asJava - } - - /** - * Derives the table schema for a table sink. A sink ignores a proctime attribute and - * needs to track the origin of a rowtime field. - * - * @deprecated This method combines two separate concepts of table schema and field mapping. - * This should be split into two methods once we have support for - * the corresponding interfaces (see FLINK-9870). - */ - @deprecated - def deriveTableSinkSchema(properties: DescriptorProperties): TableSchema = { - val builder = TableSchema.builder() - - val schema = properties.getTableSchema(SCHEMA) - - schema.getFieldNames.zip(schema.getFieldTypes).zipWithIndex.foreach { case ((n, t), i) => - val isProctime = properties - .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") - .orElse(false) - val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE" - val isRowtime = properties.containsKey(tsType) - if (!isProctime && !isRowtime) { - // check for a aliasing - val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM") - .orElse(n) - builder.field(fieldName, t) - } - // only use the rowtime attribute if it references a field - else if (isRowtime) { - properties.getString(tsType) match { - case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD => - val field = properties.getString(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_FROM") - builder.field(field, t) - - // other timestamp strategies require a reverse timestamp extractor to - // insert the timestamp into the output - case t@_ => - throw new TableException( - s"Unsupported rowtime type '$t' for sink table schema. Currently " + - s"only '$ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD' is supported for table sinks.") - } - } - } - - builder.build() - } - - /** - * Finds a table source field mapping. - * - * @param properties The properties describing a schema. - * @param inputType The input type that a connector and/or format produces. This parameter - * can be used to resolve a rowtime field against an input field. - */ - def deriveFieldMapping( - properties: DescriptorProperties, - inputType: Optional[TypeInformation[_]]) - : util.Map[String, String] = { - - val mapping = mutable.Map[String, String]() - - val schema = properties.getTableSchema(SCHEMA) - - val columnNames = toScala(inputType) match { - case Some(composite: CompositeType[_]) => composite.getFieldNames.toSeq - case _ => Seq[String]() - } - - // add all source fields first because rowtime might reference one of them - columnNames.foreach(name => mapping.put(name, name)) - - // add all schema fields first for implicit mappings - schema.getFieldNames.foreach { name => - mapping.put(name, name) - } - - val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME) - - for (i <- 0 until names.size) { - val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME") - toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) match { - - // add explicit mapping - case Some(source) => - mapping.put(name, source) - - // implicit mapping or time - case None => - val isProctime = properties - .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME") - .orElse(false) - val isRowtime = properties - .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE") - // remove proctime/rowtime from mapping - if (isProctime || isRowtime) { - mapping.remove(name) - } - // check for invalid fields - else if (!columnNames.contains(name)) { - throw new ValidationException(s"Could not map the schema field '$name' to a field " + - s"from source. Please specify the source field from which it can be derived.") - } - } - } - - mapping.toMap.asJava - } -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala index 1365eae38542f..3aa114ea51867 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala @@ -70,10 +70,7 @@ abstract class CsvTableSinkFactoryBase extends TableFactory { // validate new FileSystemValidator().validate(params) new OldCsvValidator().validate(params) - new SchemaValidator( - isStreaming, - supportsSourceTimestamps = false, - supportsSourceWatermarks = false).validate(params) + new SchemaValidator(isStreaming, false, false).validate(params) // build val formatSchema = params.getTableSchema(FORMAT_FIELDS) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala index 41c9698a8465d..a7a48bb5d01f9 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala @@ -75,10 +75,7 @@ abstract class CsvTableSourceFactoryBase extends TableFactory { // validate new FileSystemValidator().validate(params) new OldCsvValidator().validate(params) - new SchemaValidator( - isStreaming, - supportsSourceTimestamps = false, - supportsSourceWatermarks = false).validate(params) + new SchemaValidator(isStreaming, false, false).validate(params) // build val csvTableSourceBuilder = new CsvTableSource.Builder diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala index 1162694a01dc7..d232a0d1db68a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala @@ -40,7 +40,7 @@ class FileSystemTest extends DescriptorTestBase { // ---------------------------------------------------------------------------------------------- override def descriptors(): util.List[Descriptor] = { - util.Arrays.asList(FileSystem().path("/myfile")) + util.Arrays.asList(new FileSystem().path("/myfile")) } override def validator(): DescriptorValidator = { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala index 0f51c765e9ffb..8bd327cd406da 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala @@ -71,7 +71,7 @@ class RowtimeTest extends DescriptorTestBase { } override def validator(): DescriptorValidator = { - new RowtimeValidator(supportsSourceTimestamps = true, supportsSourceWatermarks = false) + new RowtimeValidator(true, false) } override def properties(): util.List[util.Map[String, String]] = { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala index 8bf97099697aa..3ab1a67b02174 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala @@ -61,10 +61,7 @@ class SchemaTest extends DescriptorTestBase { } override def validator(): DescriptorValidator = { - new SchemaValidator( - isStreamEnvironment = true, - supportsSourceTimestamps = true, - supportsSourceWatermarks = true) + new SchemaValidator(true, true, true) } override def properties(): util.List[util.Map[String, String]] = { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala index df4d3fc798f82..555a030521d6c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala @@ -56,7 +56,7 @@ class TableDescriptorTest extends TableTestBase { // schema.field("proctime", Types.SQL_TIMESTAMP).proctime() //} - val connector = FileSystem() + val connector = new FileSystem() .path("/path/to/csv") val format = OldCsv() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala index 0d633fea07798..b5ada5db2d518 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala @@ -67,7 +67,7 @@ object CommonTestData { ) val tempFilePath1 = writeToTempFile(csvRecord1.mkString("\n"), "csv-test1", "tmp") - val connDesc1 = FileSystem().path(tempFilePath1) + val connDesc1 = new FileSystem().path(tempFilePath1) val formatDesc1 = OldCsv() .field("a", Types.INT) .field("b", Types.LONG) @@ -106,7 +106,7 @@ object CommonTestData { ) val tempFilePath2 = writeToTempFile(csvRecord2.mkString("\n"), "csv-test2", "tmp") - val connDesc2 = FileSystem().path(tempFilePath2) + val connDesc2 = new FileSystem().path(tempFilePath2) val formatDesc2 = OldCsv() .field("d", Types.INT) .field("e", Types.LONG) @@ -131,7 +131,7 @@ object CommonTestData { } val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp") - val connDesc3 = FileSystem().path(tempFilePath3) + val connDesc3 = new FileSystem().path(tempFilePath3) val formatDesc3 = OldCsv() .field("x", Types.INT) .field("y", Types.LONG) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala index 28f68d522f371..6de7525f84f3b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala @@ -51,10 +51,7 @@ class InMemoryTableFactory(terminationCount: Int) params.putProperties(properties) // validate - new SchemaValidator( - isStreamEnvironment = true, - supportsSourceTimestamps = true, - supportsSourceWatermarks = true).validate(params) + new SchemaValidator(true, true, true).validate(params) val tableSchema = SchemaValidator.deriveTableSinkSchema(params) @@ -71,10 +68,7 @@ class InMemoryTableFactory(terminationCount: Int) params.putProperties(properties) // validate - new SchemaValidator( - isStreamEnvironment = true, - supportsSourceTimestamps = true, - supportsSourceWatermarks = true).validate(params) + new SchemaValidator(true, true, true).validate(params) val tableSchema = params.getTableSchema(SCHEMA)