Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18130][hive][fs-connector] File name conflict for different jobs in filesystem/hive sink #12485

Merged
merged 2 commits into from Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -77,6 +77,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_TIME_INTERVAL;
Expand Down Expand Up @@ -141,9 +142,10 @@ public final DataStreamSink consumeDataStream(DataStream dataStream) {
isCompressed);
String extension = Utilities.getFileExtension(jobConf, isCompressed,
(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
extension = extension == null ? "" : extension;
OutputFileConfig outputFileConfig = OutputFileConfig.builder()
.withPartSuffix(extension).build();
.withPartPrefix("part-" + UUID.randomUUID().toString())
.withPartSuffix(extension == null ? "" : extension)
.build();
if (isBounded) {
FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(new HiveRowPartitionComputer(
Expand Down
Expand Up @@ -45,6 +45,8 @@
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;

import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -60,6 +62,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -228,6 +231,25 @@ public void testWriteNullValues() throws Exception {
}
}

@Test
public void testBatchAppend() {
TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tEnv.useCatalog(hiveCatalog.getName());
tEnv.executeSql("create database db1");
tEnv.useDatabase("db1");
try {
tEnv.executeSql("create table append_table (i int, j int)");
TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 1, 1");
TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 2, 2");
ArrayList<Row> rows = Lists.newArrayList(tEnv.executeSql("select * from append_table").collect());
rows.sort(Comparator.comparingInt(o -> (int) o.getField(0)));
Assert.assertEquals(Arrays.asList(Row.of(1, 1), Row.of(2, 2)), rows);
} finally {
tEnv.executeSql("drop database db1 cascade");
}
}

@Test(timeout = 120000)
public void testDefaultSerPartStreamingWrite() throws Exception {
testStreamingWrite(true, false, true, this::checkSuccessFiles);
Expand All @@ -253,6 +275,34 @@ public void testNonPartStreamingMrWrite() throws Exception {
testStreamingWrite(false, true, false, (p) -> {});
}

@Test(timeout = 120000)
public void testStreamingAppend() throws Exception {
testStreamingWrite(false, false, false, (p) -> {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tEnv.useCatalog(hiveCatalog.getName());

TableEnvUtil.execInsertSqlAndWaitResult(
tEnv,
"insert into db1.sink_table select 6,'a','b','2020-05-03','12'");

assertBatch("db1.sink_table", Arrays.asList(
"1,a,b,2020-05-03,7",
"1,a,b,2020-05-03,7",
"2,p,q,2020-05-03,8",
"2,p,q,2020-05-03,8",
"3,x,y,2020-05-03,9",
"3,x,y,2020-05-03,9",
"4,x,y,2020-05-03,10",
"4,x,y,2020-05-03,10",
"5,x,y,2020-05-03,11",
"5,x,y,2020-05-03,11",
"6,a,b,2020-05-03,12"));
});
}

private void checkSuccessFiles(String path) {
File basePath = new File(path, "d=2020-05-03");
Assert.assertEquals(5, basePath.list().length);
Expand Down Expand Up @@ -317,6 +367,18 @@ private void testStreamingWrite(
tEnv.sqlQuery("select * from my_table"),
"sink_table");

assertBatch("db1.sink_table", Arrays.asList(
"1,a,b,2020-05-03,7",
"1,a,b,2020-05-03,7",
"2,p,q,2020-05-03,8",
"2,p,q,2020-05-03,8",
"3,x,y,2020-05-03,9",
"3,x,y,2020-05-03,9",
"4,x,y,2020-05-03,10",
"4,x,y,2020-05-03,10",
"5,x,y,2020-05-03,11",
"5,x,y,2020-05-03,11"));

// using batch table env to query.
List<String> results = new ArrayList<>();
TableEnvironment batchTEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
Expand Down Expand Up @@ -346,6 +408,19 @@ private void testStreamingWrite(
}
}

private void assertBatch(String table, List<String> expected) {
// using batch table env to query.
List<String> results = new ArrayList<>();
TableEnvironment batchTEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
batchTEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
batchTEnv.useCatalog(hiveCatalog.getName());
batchTEnv.executeSql("select * from " + table).collect()
.forEachRemaining(r -> results.add(r.toString()));
results.sort(String::compareTo);
expected.sort(String::compareTo);
Assert.assertEquals(expected, results);
}

private RowTypeInfo createHiveDestTable(String dbName, String tblName, TableSchema tableSchema, int numPartCols) throws Exception {
CatalogTable catalogTable = createHiveCatalogTable(tableSchema, numPartCols);
hiveCatalog.createTable(new ObjectPath(dbName, tblName), catalogTable, false);
Expand Down
Expand Up @@ -236,6 +236,43 @@ trait FileSystemITCaseBase {
row(19, 3, "x19")
))
}

@Test
def testInsertAppend(): Unit = {
tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT")
tableEnv.execute("test1")

tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT")
tableEnv.execute("test2")

check(
"select y, b, x from partitionedTable where a=3",
Seq(
row(17, 1, "x17"),
row(18, 2, "x18"),
row(19, 3, "x19"),
row(17, 1, "x17"),
row(18, 2, "x18"),
row(19, 3, "x19")
))
}

@Test
def testInsertOverwrite(): Unit = {
tableEnv.sqlUpdate("insert overwrite partitionedTable select x, y, a, b from originalT")
tableEnv.execute("test1")

tableEnv.sqlUpdate("insert overwrite partitionedTable select x, y, a, b from originalT")
tableEnv.execute("test2")

check(
"select y, b, x from partitionedTable where a=3",
Seq(
row(17, 1, "x17"),
row(18, 2, "x18"),
row(19, 3, "x19")
))
}
}

object FileSystemITCaseBase {
Expand Down
Expand Up @@ -26,7 +26,7 @@ import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestSink
import org.apache.flink.types.Row

import org.junit.Assert.assertEquals
import org.junit.Before
import org.junit.{Before, Test}

import scala.collection.Seq

Expand Down Expand Up @@ -55,4 +55,8 @@ abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSys
expectedResult.map(TestSinkUtil.rowToString(_)).sorted,
sink.getAppendResults.sorted)
}

// Streaming mode not support overwrite
@Test
override def testInsertOverwrite(): Unit = {}
}
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
Expand Down Expand Up @@ -63,6 +64,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
Expand Down Expand Up @@ -126,6 +128,9 @@ public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataS
partitionKeys.toArray(new String[0]));

EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
OutputFileConfig outputFileConfig = OutputFileConfig.builder()
.withPartPrefix("part-" + UUID.randomUUID().toString())
.build();

if (isBounded) {
FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
Expand All @@ -137,6 +142,7 @@ public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataS
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitions);
builder.setTempPath(toStagingPath());
builder.setOutputFileConfig(outputFileConfig);
return dataStream.writeUsingOutputFormat(builder.build())
.setParallelism(dataStream.getParallelism());
} else {
Expand All @@ -155,12 +161,14 @@ public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataS
bucketsBuilder = StreamingFileSink.forRowFormat(
path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
.withBucketAssigner(assigner)
.withOutputFileConfig(outputFileConfig)
.withRollingPolicy(rollingPolicy);
} else {
//noinspection unchecked
bucketsBuilder = StreamingFileSink.forBulkFormat(
path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
.withBucketAssigner(assigner)
.withOutputFileConfig(outputFileConfig)
.withRollingPolicy(rollingPolicy);
}
return createStreamingSink(
Expand Down
Expand Up @@ -103,7 +103,7 @@ private void overwrite(Path destDir) throws Exception {
}

/**
* Moves files from srcDir to destDir. Delete files in destDir first when overwrite.
* Moves files from srcDir to destDir.
*/
private void renameFiles(List<Path> srcDirs, Path destDir) throws Exception {
for (Path srcDir : srcDirs) {
Expand All @@ -113,12 +113,7 @@ private void renameFiles(List<Path> srcDirs, Path destDir) throws Exception {
for (FileStatus srcFile : srcFiles) {
Path srcPath = srcFile.getPath();
Path destPath = new Path(destDir, srcPath.getName());
int count = 1;
while (!fs.rename(srcPath, destPath)) {
String name = srcPath.getName() + "_copy_" + count;
destPath = new Path(destDir, name);
count++;
}
fs.rename(srcPath, destPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here seems inconsistent with this method's comment: rename does not delete the dest path if it exists on HDFS. Instead, it will keep the srcPath and dest Path unchanged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is out of date, the overwrite is occurred in overwrite(Path) method instead of here.
Now it is consistent with HadoopRecoverableFsDataOutputStream.commit.
I will update comments.

}
}
}
Expand Down
Expand Up @@ -97,7 +97,7 @@ public Path createPartitionDir(String... partitions) {
}

private String newFileName() {
return String.format("%s%s-%s-file-%d%s",
return String.format("%s-%s-%s-file-%d%s",
outputFileConfig.getPartPrefix(), checkpointName(checkpointId),
taskName(taskNumber), nameCounter++, outputFileConfig.getPartSuffix());
}
Expand Down