Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion flink-end-to-end-tests/flink-batch-sql-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,24 @@
<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
package org.apache.flink.sql.tests;

import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.IteratorInputFormat;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sources.InputFormatTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -62,13 +62,17 @@ public static void main(String[] args) throws Exception {
.registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0));
((TableEnvironmentInternal) tEnv)
.registerTableSourceInternal("table2", new GeneratorTableSource(5, 0.2f, 60, 5));
((TableEnvironmentInternal) tEnv)
.registerTableSinkInternal(
"sinkTable",
new CsvTableSink(outputPath)
.configure(
new String[] {"f0", "f1"},
new TypeInformation[] {Types.INT, Types.SQL_TIMESTAMP}));
tEnv.createTemporaryTable(
"sinkTable",
TableDescriptor.forConnector("filesystem")
.schema(
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", DataTypes.TIMESTAMP(3))
.build())
.option(FileSystemConnectorOptions.PATH, outputPath)
.format("csv")
.build());

TableResult result = tEnv.executeSql(sqlStatement);
// wait job finish
Expand Down
15 changes: 9 additions & 6 deletions flink-end-to-end-tests/test-scripts/test_batch_sql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ source "$(dirname "$0")"/common.sh

TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-batch-sql-test/target/BatchSQLTestProgram.jar

OUTPUT_FILE_PATH="${TEST_DATA_DIR}/out/result/results.csv"
OUTPUT_FILE_PATH="${TEST_DATA_DIR}/out/result"

function sqlJobQuery() {
local tumbleWindowSizeSeconds=10
Expand Down Expand Up @@ -72,11 +72,14 @@ set_config_key "taskmanager.numberOfTaskSlots" "1"
start_cluster

# The task has total 2 x (1 + 1 + 1 + 1) + 1 = 9 slots
$FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath "file://${OUTPUT_FILE_PATH}" -sqlStatement \
$FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath "${OUTPUT_FILE_PATH}" -sqlStatement \
"INSERT INTO sinkTable $(sqlJobQuery)"

# Concat result
cat ${OUTPUT_FILE_PATH}/* | sort > ${OUTPUT_FILE_PATH}/result.csv

# check result:
#1980,1970-01-01 00:00:00.0
#1980,1970-01-01 00:00:20.0
#1980,1970-01-01 00:00:40.0
check_result_hash "BatchSQL" "${OUTPUT_FILE_PATH}" "c7ccd2c3a25c3e06616806cf6aecaa66"
#1980,"1970-01-01 00:00:00"
#1980,"1970-01-01 00:00:20"
#1980,"1970-01-01 00:00:40"
check_result_hash "BatchSQL" "${OUTPUT_FILE_PATH}/result.csv" "f4d69894b075e4bf30b3c7a1190bee70"

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,64 +18,47 @@

package org.apache.flink.table.planner.catalog;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.test.WithTableEnvironment;

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.util.ImmutableBitSet;
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;

import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for Catalog constraints. */
@Execution(ExecutionMode.CONCURRENT)
public class CatalogConstraintTest {

private String databaseName = "default_database";

private TableEnvironment tEnv;
private Catalog catalog;

@Before
public void setup() {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
tEnv = TableEnvironment.create(settings);
catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).orElse(null);
assertThat(catalog).isNotNull();
}

@Test
public void testWithPrimaryKey() throws Exception {
TableSchema tableSchema =
TableSchema.builder()
.fields(
new String[] {"a", "b", "c"},
new DataType[] {
DataTypes.STRING(),
DataTypes.BIGINT().notNull(),
DataTypes.INT()
})
.primaryKey("b")
.build();
Map<String, String> properties = buildCatalogTableProperties(tableSchema);

catalog.createTable(
new ObjectPath(databaseName, "T1"),
new CatalogTableImpl(tableSchema, properties, ""),
false);
@WithTableEnvironment(executionMode = RuntimeExecutionMode.BATCH)
public void testWithPrimaryKey(TableEnvironment tEnv) {
tEnv.createTable(
"T1",
TableDescriptor.forConnector("filesystem")
.schema(
Schema.newBuilder()
.column("a", STRING())
.column("b", BIGINT().notNull())
.column("c", INT())
.primaryKey("b")
.build())
.option("path", "/path/to/csv")
.format("testcsv")
.build());

RelNode t1 = TableTestUtil.toRelNode(tEnv.sqlQuery("select * from T1"));
FlinkRelMetadataQuery mq =
Expand All @@ -84,38 +67,24 @@ public void testWithPrimaryKey() throws Exception {
}

@Test
public void testWithoutPrimaryKey() throws Exception {
TableSchema tableSchema =
TableSchema.builder()
.fields(
new String[] {"a", "b", "c"},
new DataType[] {
DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.INT()
})
.build();
Map<String, String> properties = buildCatalogTableProperties(tableSchema);

catalog.createTable(
new ObjectPath(databaseName, "T1"),
new CatalogTableImpl(tableSchema, properties, ""),
false);
@WithTableEnvironment(executionMode = RuntimeExecutionMode.BATCH)
public void testWithoutPrimaryKey(TableEnvironment tEnv) {
tEnv.createTable(
"T1",
TableDescriptor.forConnector("filesystem")
.schema(
Schema.newBuilder()
.column("a", BIGINT())
.column("b", STRING())
.column("c", INT())
.build())
.option("path", "/path/to/csv")
.format("testcsv")
.build());

RelNode t1 = TableTestUtil.toRelNode(tEnv.sqlQuery("select * from T1"));
FlinkRelMetadataQuery mq =
FlinkRelMetadataQuery.reuseOrCreate(t1.getCluster().getMetadataQuery());
assertThat(mq.getUniqueKeys(t1)).isEqualTo(ImmutableSet.of());
}

private Map<String, String> buildCatalogTableProperties(TableSchema tableSchema) {
Map<String, String> properties = new HashMap<>();
properties.put("connector.type", "filesystem");
properties.put("connector.property-version", "1");
properties.put("connector.path", "/path/to/csv");

properties.put("format.type", "csv");
properties.put("format.property-version", "1");
properties.put("format.field-delimiter", ";");

return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,9 @@ public void testGetStatsFromCatalogForConnectorCatalogTable() throws Exception {
@Test
public void testGetStatsFromCatalogForCatalogTableImpl() throws Exception {
Map<String, String> properties = new HashMap<>();
properties.put("connector.type", "filesystem");
properties.put("connector.property-version", "1");
properties.put("connector.path", "/path/to/csv");

properties.put("format.type", "csv");
properties.put("format.property-version", "1");
properties.put("format.field-delimiter", ";");
properties.put("connector", "filesystem");
properties.put("path", "/path/to/csv");
properties.put("format", "testcsv");

catalog.createTable(
new ObjectPath(databaseName, "T1"),
Expand Down

This file was deleted.

Loading