Skip to content

Commit

Permalink
[FLINK-24728][tests] Add tests to ensure SQL file sink closes all cre…
Browse files Browse the repository at this point in the history
…ated files

This closes #18073
  • Loading branch information
tsreaper committed Dec 17, 2021
1 parent 069d629 commit 11d2470
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 11 deletions.
Expand Up @@ -19,15 +19,21 @@
package org.apache.flink.testutils;

import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalDataOutputStream;
import org.apache.flink.core.fs.local.LocalFileStatus;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A test file system. This also has a service entry in the test resources, to be loaded during
Expand All @@ -37,28 +43,44 @@ public class TestFileSystem extends LocalFileSystem {

public static final String SCHEME = "test";

private static int streamOpenCounter;
// number of (input) stream opened
private static final AtomicInteger streamOpenCounter = new AtomicInteger(0);

// current number of created, unclosed (output) stream
private static final Map<Path, Integer> currentUnclosedOutputStream = new ConcurrentHashMap<>();

public static int getNumtimeStreamOpened() {
return streamOpenCounter;
return streamOpenCounter.get();
}

public static void resetStreamOpenCounter() {
streamOpenCounter = 0;
streamOpenCounter.set(0);
}

public static int getNumberOfUnclosedOutputStream(Path path) {
return currentUnclosedOutputStream.getOrDefault(path, 0);
}

@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
streamOpenCounter++;
streamOpenCounter.incrementAndGet();
return super.open(f, bufferSize);
}

@Override
public FSDataInputStream open(Path f) throws IOException {
streamOpenCounter++;
streamOpenCounter.incrementAndGet();
return super.open(f);
}

@Override
public FSDataOutputStream create(final Path filePath, final WriteMode overwrite)
throws IOException {
currentUnclosedOutputStream.compute(filePath, (k, v) -> v == null ? 1 : v + 1);
LocalDataOutputStream stream = (LocalDataOutputStream) super.create(filePath, overwrite);
return new TestOutputStream(stream, filePath);
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
LocalFileStatus status = (LocalFileStatus) super.getFileStatus(f);
Expand All @@ -82,6 +104,51 @@ public URI getUri() {

// ------------------------------------------------------------------------

private static final class TestOutputStream extends FSDataOutputStream {

private final LocalDataOutputStream stream;
private final Path path;

private TestOutputStream(LocalDataOutputStream stream, Path path) {
this.stream = stream;
this.path = path;
}

@Override
public long getPos() throws IOException {
return stream.getPos();
}

@Override
public void write(int b) throws IOException {
stream.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
stream.write(b, off, len);
}

@Override
public void flush() throws IOException {
stream.flush();
}

@Override
public void sync() throws IOException {
stream.sync();
}

@Override
public void close() throws IOException {
currentUnclosedOutputStream.compute(
path, (k, v) -> Preconditions.checkNotNull(v) == 1 ? null : v - 1);
stream.close();
}
}

// ------------------------------------------------------------------------

public static final class TestFileSystemFactory implements FileSystemFactory {

@Override
Expand Down
Expand Up @@ -49,6 +49,8 @@ trait FileSystemITCaseBase {

def formatProperties(): Array[String] = Array()

def getScheme: String = "file"

def tableEnv: TableEnvironment

def check(sqlQuery: String, expectedResult: Seq[Row]): Unit
Expand All @@ -59,7 +61,7 @@ trait FileSystemITCaseBase {
}

def open(): Unit = {
resultPath = fileTmpFolder.newFolder().toURI.toString
resultPath = fileTmpFolder.newFolder().toURI.getPath
BatchTableEnvUtil.registerCollection(
tableEnv,
"originalT",
Expand All @@ -76,7 +78,7 @@ trait FileSystemITCaseBase {
| c as b + 1
|) partitioned by (a, b) with (
| 'connector' = 'filesystem',
| 'path' = '$resultPath',
| 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
Expand All @@ -90,7 +92,7 @@ trait FileSystemITCaseBase {
| b bigint
|) with (
| 'connector' = 'filesystem',
| 'path' = '$resultPath',
| 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
Expand All @@ -102,7 +104,7 @@ trait FileSystemITCaseBase {
| x decimal(10, 0), y int
|) with (
| 'connector' = 'filesystem',
| 'path' = '$resultPath',
| 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
Expand All @@ -114,7 +116,7 @@ trait FileSystemITCaseBase {
| x decimal(3, 2), y int
|) with (
| 'connector' = 'filesystem',
| 'path' = '$resultPath',
| 'path' = '$getScheme://$resultPath',
| ${formatProperties().mkString(",\n")}
|)
""".stripMargin
Expand Down Expand Up @@ -256,7 +258,7 @@ trait FileSystemITCaseBase {
"partition(a='1', b='1') select x, y from originalT where a=1 and b=1").await()

// create hidden partition dir
assertTrue(new File(new Path(resultPath + "/a=1/.b=2").toUri).mkdir())
assertTrue(new File(new Path("file:" + resultPath + "/a=1/.b=2").toUri).mkdir())

check(
"select x, y from partitionedTable",
Expand Down
Expand Up @@ -18,6 +18,12 @@

package org.apache.flink.table.planner.runtime.batch.sql

import org.apache.flink.core.fs.Path
import org.apache.flink.testutils.TestFileSystem

import org.junit.After
import org.junit.Assert.assertEquals

/**
* Test for file system table factory with testcsv format.
*/
Expand All @@ -26,4 +32,15 @@ class FileSystemTestCsvITCase extends BatchFileSystemITCaseBase {
override def formatProperties(): Array[String] = {
super.formatProperties() ++ Seq("'format' = 'testcsv'")
}

override def getScheme: String = "test"

@After
def close(): Unit = {
val path = new Path(resultPath)
assertEquals(
s"File $resultPath is not closed",
0,
TestFileSystem.getNumberOfUnclosedOutputStream(path))
}
}
Expand Up @@ -18,6 +18,12 @@

package org.apache.flink.table.planner.runtime.stream.sql

import org.apache.flink.core.fs.Path
import org.apache.flink.testutils.TestFileSystem

import org.junit.After
import org.junit.Assert.assertEquals

import scala.collection.Seq

/**
Expand All @@ -28,4 +34,15 @@ class StreamFileSystemTestCsvITCase extends StreamFileSystemITCaseBase {
override def formatProperties(): Array[String] = {
super.formatProperties() ++ Seq("'format' = 'testcsv'")
}

override def getScheme: String = "test"

@After
def close(): Unit = {
val path = new Path(resultPath)
assertEquals(
s"File $resultPath is not closed",
0,
TestFileSystem.getNumberOfUnclosedOutputStream(path))
}
}

0 comments on commit 11d2470

Please sign in to comment.