Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-24728][tests] Add tests to ensure SQL file sink closes all created files #18073

Merged
merged 1 commit into from Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,15 +19,21 @@
package org.apache.flink.testutils;

import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalDataOutputStream;
import org.apache.flink.core.fs.local.LocalFileStatus;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A test file system. This also has a service entry in the test resources, to be loaded during
Expand All @@ -37,28 +43,44 @@ public class TestFileSystem extends LocalFileSystem {

public static final String SCHEME = "test";

private static int streamOpenCounter;
// number of (input) stream opened
private static final AtomicInteger streamOpenCounter = new AtomicInteger(0);

// current number of created, unclosed (output) stream
private static final Map<Path, Integer> currentUnclosedOutputStream = new ConcurrentHashMap<>();

public static int getNumtimeStreamOpened() {
return streamOpenCounter;
return streamOpenCounter.get();
}

public static void resetStreamOpenCounter() {
streamOpenCounter = 0;
streamOpenCounter.set(0);
}

public static int getNumberOfUnclosedOutputStream(Path path) {
return currentUnclosedOutputStream.getOrDefault(path, 0);
}

@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
streamOpenCounter++;
streamOpenCounter.incrementAndGet();
return super.open(f, bufferSize);
}

@Override
public FSDataInputStream open(Path f) throws IOException {
streamOpenCounter++;
streamOpenCounter.incrementAndGet();
return super.open(f);
}

@Override
public FSDataOutputStream create(final Path filePath, final WriteMode overwrite)
throws IOException {
currentUnclosedOutputStream.compute(filePath, (k, v) -> v == null ? 1 : v + 1);
LocalDataOutputStream stream = (LocalDataOutputStream) super.create(filePath, overwrite);
return new TestOutputStream(stream, filePath);
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
LocalFileStatus status = (LocalFileStatus) super.getFileStatus(f);
Expand All @@ -82,6 +104,51 @@ public URI getUri() {

// ------------------------------------------------------------------------

private static final class TestOutputStream extends FSDataOutputStream {

private final LocalDataOutputStream stream;
private final Path path;

private TestOutputStream(LocalDataOutputStream stream, Path path) {
this.stream = stream;
this.path = path;
}

@Override
public long getPos() throws IOException {
return stream.getPos();
}

@Override
public void write(int b) throws IOException {
stream.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
stream.write(b, off, len);
}

@Override
public void flush() throws IOException {
stream.flush();
}

@Override
public void sync() throws IOException {
stream.sync();
}

@Override
public void close() throws IOException {
currentUnclosedOutputStream.compute(
path, (k, v) -> Preconditions.checkNotNull(v) == 1 ? null : v - 1);
stream.close();
}
}

// ------------------------------------------------------------------------

public static final class TestFileSystemFactory implements FileSystemFactory {

@Override
Expand Down
Expand Up @@ -49,6 +49,8 @@ trait FileSystemITCaseBase {

def formatProperties(): Array[String] = Array()

def getScheme: String = "file"

def tableEnv: TableEnvironment

def check(sqlQuery: String, expectedResult: Seq[Row]): Unit
Expand All @@ -59,7 +61,7 @@ trait FileSystemITCaseBase {
}

def open(): Unit = {
resultPath = fileTmpFolder.newFolder().toURI.toString
resultPath = fileTmpFolder.newFolder().toURI.getPath
BatchTableEnvUtil.registerCollection(
tableEnv,
"originalT",
Expand All @@ -76,7 +78,7 @@ trait FileSystemITCaseBase {
| c as b + 1
|) partitioned by (a, b) with (
| 'connector' = 'filesystem',
| 'path' = '$resultPath',
| 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
Expand All @@ -90,7 +92,7 @@ trait FileSystemITCaseBase {
| b bigint
|) with (
| 'connector' = 'filesystem',
| 'path' = '$resultPath',
| 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
Expand All @@ -102,7 +104,7 @@ trait FileSystemITCaseBase {
| x decimal(10, 0), y int
|) with (
| 'connector' = 'filesystem',
| 'path' = '$resultPath',
| 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
Expand All @@ -114,7 +116,7 @@ trait FileSystemITCaseBase {
| x decimal(3, 2), y int
|) with (
| 'connector' = 'filesystem',
| 'path' = '$resultPath',
| 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
Expand Down Expand Up @@ -256,7 +258,7 @@ trait FileSystemITCaseBase {
"partition(a='1', b='1') select x, y from originalT where a=1 and b=1").await()

// create hidden partition dir
assertTrue(new File(new Path(resultPath + "/a=1/.b=2").toUri).mkdir())
assertTrue(new File(new Path("file:" + resultPath + "/a=1/.b=2").toUri).mkdir())

check(
"select x, y from partitionedTable",
Expand Down
Expand Up @@ -18,6 +18,12 @@

package org.apache.flink.table.planner.runtime.batch.sql

import org.apache.flink.core.fs.Path
import org.apache.flink.testutils.TestFileSystem

import org.junit.After
import org.junit.Assert.assertEquals

/**
* Test for file system table factory with testcsv format.
*/
Expand All @@ -26,4 +32,15 @@ class FileSystemTestCsvITCase extends BatchFileSystemITCaseBase {
override def formatProperties(): Array[String] = {
super.formatProperties() ++ Seq("'format' = 'testcsv'")
}

override def getScheme: String = "test"

@After
def close(): Unit = {
val path = new Path(resultPath)
assertEquals(
s"File $resultPath is not closed",
0,
TestFileSystem.getNumberOfUnclosedOutputStream(path))
}
}
Expand Up @@ -18,6 +18,12 @@

package org.apache.flink.table.planner.runtime.stream.sql

import org.apache.flink.core.fs.Path
import org.apache.flink.testutils.TestFileSystem

import org.junit.After
import org.junit.Assert.assertEquals

import scala.collection.Seq

/**
Expand All @@ -28,4 +34,15 @@ class StreamFileSystemTestCsvITCase extends StreamFileSystemITCaseBase {
override def formatProperties(): Array[String] = {
super.formatProperties() ++ Seq("'format' = 'testcsv'")
}

override def getScheme: String = "test"

@After
def close(): Unit = {
val path = new Path(resultPath)
assertEquals(
s"File $resultPath is not closed",
0,
TestFileSystem.getNumberOfUnclosedOutputStream(path))
}
}