Skip to content

Commit

Permalink
Merge pull request #483 from bennofs/fix-example-async-close
Browse files Browse the repository at this point in the history
Make close of asyncOutputStream synchronous in example
  • Loading branch information
wetneb committed Mar 6, 2020
2 parents c09530d + ff86bed commit 8bc243d
Showing 1 changed file with 44 additions and 26 deletions.
Expand Up @@ -20,12 +20,7 @@
* #L%
*/

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.*;

import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipParameters;
Expand Down Expand Up @@ -90,23 +85,23 @@ public static void main(String[] args) throws IOException {
* Print some basic documentation about this program.
*/
private static void printDocumentation() {
System.out
.println("********************************************************************");
System.out.println("*** Wikidata Toolkit: RDF Serialization Example");
System.out.println("*** ");
System.out
.println("*** This program will download dumps from Wikidata and serialize the data in a RDF format.");
System.out
.println("*** Downloading may take some time initially. After that, files");
System.out
.println("*** are stored on disk and are used until newer dumps are available.");
System.out
.println("*** You can delete files manually when no longer needed (see ");
System.out
.println("*** message below for the directory where dump files are found).");
System.out
.println("********************************************************************");
}
System.out
.println("********************************************************************");
System.out.println("*** Wikidata Toolkit: RDF Serialization Example");
System.out.println("*** ");
System.out
.println("*** This program will download dumps from Wikidata and serialize the data in a RDF format.");
System.out
.println("*** Downloading may take some time initially. After that, files");
System.out
.println("*** are stored on disk and are used until newer dumps are available.");
System.out
.println("*** You can delete files manually when no longer needed (see ");
System.out
.println("*** message below for the directory where dump files are found).");
System.out
.println("********************************************************************");
}

/**
* Creates a separate thread for writing into the given output stream and
Expand All @@ -128,7 +123,7 @@ public static OutputStream asynchronousOutputStream(
final int SIZE = 1024 * 1024 * 10;
final PipedOutputStream pos = new PipedOutputStream();
final PipedInputStream pis = new PipedInputStream(pos, SIZE);
new Thread(() -> {
final Thread worker = new Thread(() -> {
try {
byte[] bytes = new byte[SIZE];
for (int len; (len = pis.read(bytes)) > 0;) {
Expand All @@ -140,8 +135,31 @@ public static OutputStream asynchronousOutputStream(
close(pis);
close(outputStream);
}
}, "async-output-stream").start();
return pos;
}, "async-output-stream");
return new SyncCloseOutputStream(pos, worker);
}


/**
* Helper class that joins a thread on a call to close, to ensure that the output stream has really been closed.
*/
private static final class SyncCloseOutputStream extends FilterOutputStream {
private final Thread worker;

public SyncCloseOutputStream(OutputStream out, Thread worker) {
super(out);
this.worker = worker;
}

@Override
public void close() throws IOException {
super.close();
try {
worker.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
Expand Down

0 comments on commit 8bc243d

Please sign in to comment.