diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index a4e301232071e..1cac9e085879a 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -77,6 +77,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE; import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_TIME_INTERVAL; @@ -141,9 +142,10 @@ public final DataStreamSink consumeDataStream(DataStream dataStream) { isCompressed); String extension = Utilities.getFileExtension(jobConf, isCompressed, (HiveOutputFormat) hiveOutputFormatClz.newInstance()); - extension = extension == null ? "" : extension; OutputFileConfig outputFileConfig = OutputFileConfig.builder() - .withPartSuffix(extension).build(); + .withPartPrefix("part-" + UUID.randomUUID().toString()) + .withPartSuffix(extension == null ? "" : extension) + .build(); if (isBounded) { FileSystemOutputFormat.Builder builder = new FileSystemOutputFormat.Builder<>(); builder.setPartitionComputer(new HiveRowPartitionComputer( diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java index a73bce96a2a02..7592a33b7d9e5 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java @@ -45,6 +45,8 @@ import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + import com.klarna.hiverunner.HiveShell; import com.klarna.hiverunner.annotations.HiveSQL; import org.apache.hadoop.hive.conf.HiveConf; @@ -60,6 +62,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -228,6 +231,25 @@ public void testWriteNullValues() throws Exception { } } + @Test + public void testBatchAppend() { + TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE); + tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tEnv.useCatalog(hiveCatalog.getName()); + tEnv.executeSql("create database db1"); + tEnv.useDatabase("db1"); + try { + tEnv.executeSql("create table append_table (i int, j int)"); + TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 1, 1"); + TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 2, 2"); + ArrayList rows = Lists.newArrayList(tEnv.executeSql("select * from append_table").collect()); + rows.sort(Comparator.comparingInt(o -> (int) o.getField(0))); + Assert.assertEquals(Arrays.asList(Row.of(1, 1), Row.of(2, 2)), rows); + } finally { + tEnv.executeSql("drop database db1 cascade"); + } + } + @Test(timeout = 120000) public void testDefaultSerPartStreamingWrite() throws Exception { testStreamingWrite(true, false, true, this::checkSuccessFiles); @@ -253,6 +275,34 @@ public void testNonPartStreamingMrWrite() throws Exception { testStreamingWrite(false, true, false, (p) -> {}); } + @Test(timeout = 120000) + public void testStreamingAppend() throws Exception { + testStreamingWrite(false, false, false, (p) -> { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env); + tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + tEnv.useCatalog(hiveCatalog.getName()); + + TableEnvUtil.execInsertSqlAndWaitResult( + tEnv, + "insert into db1.sink_table select 6,'a','b','2020-05-03','12'"); + + assertBatch("db1.sink_table", Arrays.asList( + "1,a,b,2020-05-03,7", + "1,a,b,2020-05-03,7", + "2,p,q,2020-05-03,8", + "2,p,q,2020-05-03,8", + "3,x,y,2020-05-03,9", + "3,x,y,2020-05-03,9", + "4,x,y,2020-05-03,10", + "4,x,y,2020-05-03,10", + "5,x,y,2020-05-03,11", + "5,x,y,2020-05-03,11", + "6,a,b,2020-05-03,12")); + }); + } + private void checkSuccessFiles(String path) { File basePath = new File(path, "d=2020-05-03"); Assert.assertEquals(5, basePath.list().length); @@ -317,6 +367,18 @@ private void testStreamingWrite( tEnv.sqlQuery("select * from my_table"), "sink_table"); + assertBatch("db1.sink_table", Arrays.asList( + "1,a,b,2020-05-03,7", + "1,a,b,2020-05-03,7", + "2,p,q,2020-05-03,8", + "2,p,q,2020-05-03,8", + "3,x,y,2020-05-03,9", + "3,x,y,2020-05-03,9", + "4,x,y,2020-05-03,10", + "4,x,y,2020-05-03,10", + "5,x,y,2020-05-03,11", + "5,x,y,2020-05-03,11")); + // using batch table env to query. List results = new ArrayList<>(); TableEnvironment batchTEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); @@ -346,6 +408,19 @@ private void testStreamingWrite( } } + private void assertBatch(String table, List expected) { + // using batch table env to query. + List results = new ArrayList<>(); + TableEnvironment batchTEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); + batchTEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + batchTEnv.useCatalog(hiveCatalog.getName()); + batchTEnv.executeSql("select * from " + table).collect() + .forEachRemaining(r -> results.add(r.toString())); + results.sort(String::compareTo); + expected.sort(String::compareTo); + Assert.assertEquals(expected, results); + } + private RowTypeInfo createHiveDestTable(String dbName, String tblName, TableSchema tableSchema, int numPartCols) throws Exception { CatalogTable catalogTable = createHiveCatalogTable(tableSchema, numPartCols); hiveCatalog.createTable(new ObjectPath(dbName, tblName), catalogTable, false); diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala index 3b7fd46286efe..12981e94bfcf7 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala @@ -236,6 +236,43 @@ trait FileSystemITCaseBase { row(19, 3, "x19") )) } + + @Test + def testInsertAppend(): Unit = { + tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT") + tableEnv.execute("test1") + + tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT") + tableEnv.execute("test2") + + check( + "select y, b, x from partitionedTable where a=3", + Seq( + row(17, 1, "x17"), + row(18, 2, "x18"), + row(19, 3, "x19"), + row(17, 1, "x17"), + row(18, 2, "x18"), + row(19, 3, "x19") + )) + } + + @Test + def testInsertOverwrite(): Unit = { + tableEnv.sqlUpdate("insert overwrite partitionedTable select x, y, a, b from originalT") + tableEnv.execute("test1") + + tableEnv.sqlUpdate("insert overwrite partitionedTable select x, y, a, b from originalT") + tableEnv.execute("test2") + + check( + "select y, b, x from partitionedTable where a=3", + Seq( + row(17, 1, "x17"), + row(18, 2, "x18"), + row(19, 3, "x19") + )) + } } object FileSystemITCaseBase { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala index 2a61be84becdb..5b78fee1044a8 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestSink import org.apache.flink.types.Row import org.junit.Assert.assertEquals -import org.junit.Before +import org.junit.{Before, Test} import scala.collection.Seq @@ -55,4 +55,8 @@ abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSys expectedResult.map(TestSinkUtil.rowToString(_)).sorted, sink.getAppendResults.sorted) } + + // Streaming mode not support overwrite + @Test + override def testInsertOverwrite(): Unit = {} } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java index 977aa3f92342c..b6127461f6e29 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java @@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; @@ -63,6 +64,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND; import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE; @@ -126,6 +128,9 @@ public final DataStreamSink consumeDataStream(DataStream dataS partitionKeys.toArray(new String[0])); EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path); + OutputFileConfig outputFileConfig = OutputFileConfig.builder() + .withPartPrefix("part-" + UUID.randomUUID().toString()) + .build(); if (isBounded) { FileSystemOutputFormat.Builder builder = new FileSystemOutputFormat.Builder<>(); @@ -137,6 +142,7 @@ public final DataStreamSink consumeDataStream(DataStream dataS builder.setOverwrite(overwrite); builder.setStaticPartitions(staticPartitions); builder.setTempPath(toStagingPath()); + builder.setOutputFileConfig(outputFileConfig); return dataStream.writeUsingOutputFormat(builder.build()) .setParallelism(dataStream.getParallelism()); } else { @@ -155,12 +161,14 @@ public final DataStreamSink consumeDataStream(DataStream dataS bucketsBuilder = StreamingFileSink.forRowFormat( path, new ProjectionEncoder((Encoder) writer, computer)) .withBucketAssigner(assigner) + .withOutputFileConfig(outputFileConfig) .withRollingPolicy(rollingPolicy); } else { //noinspection unchecked bucketsBuilder = StreamingFileSink.forBulkFormat( path, new ProjectionBulkFactory((BulkWriter.Factory) writer, computer)) .withBucketAssigner(assigner) + .withOutputFileConfig(outputFileConfig) .withRollingPolicy(rollingPolicy); } return createStreamingSink( diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java index 9e0b98d7fa624..9a8f8f6cec7a0 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java @@ -103,7 +103,7 @@ private void overwrite(Path destDir) throws Exception { } /** - * Moves files from srcDir to destDir. Delete files in destDir first when overwrite. + * Moves files from srcDir to destDir. */ private void renameFiles(List srcDirs, Path destDir) throws Exception { for (Path srcDir : srcDirs) { @@ -113,12 +113,7 @@ private void renameFiles(List srcDirs, Path destDir) throws Exception { for (FileStatus srcFile : srcFiles) { Path srcPath = srcFile.getPath(); Path destPath = new Path(destDir, srcPath.getName()); - int count = 1; - while (!fs.rename(srcPath, destPath)) { - String name = srcPath.getName() + "_copy_" + count; - destPath = new Path(destDir, name); - count++; - } + fs.rename(srcPath, destPath); } } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java index b00bed314e299..8e225ffd4e247 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java @@ -97,7 +97,7 @@ public Path createPartitionDir(String... partitions) { } private String newFileName() { - return String.format("%s%s-%s-file-%d%s", + return String.format("%s-%s-%s-file-%d%s", outputFileConfig.getPartPrefix(), checkpointName(checkpointId), taskName(taskNumber), nameCounter++, outputFileConfig.getPartSuffix()); }