Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AsynchronousFileChannel for npipe #1408

Merged
merged 3 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,101 +4,77 @@
import com.sun.jna.win32.StdCallLibrary;
import com.sun.jna.win32.W32APIOptions;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channels;
import java.nio.channels.CompletionHandler;
import java.nio.file.NoSuchFileException;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

class NamedPipeSocket extends Socket {

private final String socketFileName;

private RandomAccessFile file;

private InputStream is;

private OutputStream os;
private AsynchronousFileByteChannel channel;

NamedPipeSocket(String socketFileName) {
this.socketFileName = socketFileName;
}

@Override
public void close() throws IOException {
if (file != null) {
file.close();
file = null;
if (channel != null) {
channel.close();
}
}

@Override
public void connect(SocketAddress endpoint) {
public void connect(SocketAddress endpoint) throws IOException {
connect(endpoint, 0);
}

@Override
public void connect(SocketAddress endpoint, int timeout) {
public void connect(SocketAddress endpoint, int timeout) throws IOException {
long startedAt = System.currentTimeMillis();
timeout = Math.max(timeout, 10_000);
while (true) {
try {
file = new RandomAccessFile(socketFileName, "rw");
channel = new AsynchronousFileByteChannel(
AsynchronousFileChannel.open(
Paths.get(socketFileName),
StandardOpenOption.READ,
StandardOpenOption.WRITE
)
);
break;
} catch (FileNotFoundException e) {
} catch (NoSuchFileException e) {
if (System.currentTimeMillis() - startedAt >= timeout) {
throw new RuntimeException(e);
} else {
Kernel32.INSTANCE.WaitNamedPipe(socketFileName, 100);
}
}
}

is = new InputStream() {
@Override
public int read(byte[] bytes, int off, int len) throws IOException {
return file.read(bytes, off, len);
}

@Override
public int read() throws IOException {
return file.read();
}

@Override
public int read(byte[] bytes) throws IOException {
return file.read(bytes);
}
};

os = new OutputStream() {
@Override
public void write(byte[] bytes, int off, int len) throws IOException {
file.write(bytes, off, len);
}

@Override
public void write(int value) throws IOException {
file.write(value);
}

@Override
public void write(byte[] bytes) throws IOException {
file.write(bytes);
}
};
}

@Override
public InputStream getInputStream() {
return is;
return Channels.newInputStream(channel);
}

@Override
public OutputStream getOutputStream() {
return os;
return Channels.newOutputStream(channel);
}

interface Kernel32 extends StdCallLibrary {
Expand All @@ -108,4 +84,75 @@ interface Kernel32 extends StdCallLibrary {
@SuppressWarnings("checkstyle:methodname")
boolean WaitNamedPipe(String lpNamedPipeName, int nTimeOut);
}

private static class AsynchronousFileByteChannel implements AsynchronousByteChannel {
private final AsynchronousFileChannel fileChannel;

AsynchronousFileByteChannel(AsynchronousFileChannel fileChannel) {
this.fileChannel = fileChannel;
}

@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
fileChannel.read(dst, 0, attachment, new CompletionHandler<Integer, A>() {
@Override
public void completed(Integer read, A attachment) {
handler.completed(read > 0 ? read : -1, attachment);
}

@Override
public void failed(Throwable exc, A attachment) {
if (exc instanceof AsynchronousCloseException) {
handler.completed(-1, attachment);
return;
}
handler.failed(exc, attachment);
}
});
}

@Override
public Future<Integer> read(ByteBuffer dst) {
CompletableFutureHandler future = new CompletableFutureHandler();
fileChannel.read(dst, 0, null, future);
return future;
}

@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
fileChannel.write(src, 0, attachment, handler);
}

@Override
public Future<Integer> write(ByteBuffer src) {
return fileChannel.write(src, 0);
}

@Override
public void close() throws IOException {
fileChannel.close();
}

@Override
public boolean isOpen() {
return fileChannel.isOpen();
}

private static class CompletableFutureHandler extends CompletableFuture<Integer> implements CompletionHandler<Integer, Object> {

@Override
public void completed(Integer read, Object attachment) {
complete(read > 0 ? read : -1);
}

@Override
public void failed(Throwable exc, Object attachment) {
if (exc instanceof AsynchronousCloseException) {
complete(-1);
return;
}
completeExceptionally(exc);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package com.github.dockerjava.okhttp;

import com.sun.jna.Native;
import com.sun.jna.win32.StdCallLibrary;
import com.sun.jna.win32.W32APIOptions;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.Channels;
import java.nio.channels.CompletionHandler;
import java.nio.file.NoSuchFileException;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

class NamedPipeSocket extends Socket {

private final String socketFileName;

private AsynchronousFileByteChannel channel;

NamedPipeSocket(String socketFileName) {
this.socketFileName = socketFileName;
}

@Override
public void close() throws IOException {
if (channel != null) {
channel.close();
}
}

@Override
public void connect(SocketAddress endpoint) throws IOException {
connect(endpoint, 0);
}

@Override
public void connect(SocketAddress endpoint, int timeout) throws IOException {
long startedAt = System.currentTimeMillis();

timeout = Math.max(timeout, 10_000);
while (true) {
try {
channel = new AsynchronousFileByteChannel(
AsynchronousFileChannel.open(
Paths.get(socketFileName),
StandardOpenOption.READ,
StandardOpenOption.WRITE
)
);
break;
} catch (NoSuchFileException e) {
if (System.currentTimeMillis() - startedAt >= timeout) {
throw new RuntimeException(e);
} else {
Kernel32.INSTANCE.WaitNamedPipe(socketFileName, 100);
}
}
}
}

@Override
public InputStream getInputStream() {
return Channels.newInputStream(channel);
}

@Override
public OutputStream getOutputStream() {
return Channels.newOutputStream(channel);
}

interface Kernel32 extends StdCallLibrary {

Kernel32 INSTANCE = Native.load("kernel32", Kernel32.class, W32APIOptions.DEFAULT_OPTIONS);

@SuppressWarnings("checkstyle:methodname")
boolean WaitNamedPipe(String lpNamedPipeName, int nTimeOut);
}

private static class AsynchronousFileByteChannel implements AsynchronousByteChannel {
private final AsynchronousFileChannel fileChannel;

AsynchronousFileByteChannel(AsynchronousFileChannel fileChannel) {
this.fileChannel = fileChannel;
}

@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
fileChannel.read(dst, 0, attachment, new CompletionHandler<Integer, A>() {
@Override
public void completed(Integer read, A attachment) {
handler.completed(read > 0 ? read : -1, attachment);
}

@Override
public void failed(Throwable exc, A attachment) {
if (exc instanceof AsynchronousCloseException) {
handler.completed(-1, attachment);
return;
}
handler.failed(exc, attachment);
}
});
}

@Override
public Future<Integer> read(ByteBuffer dst) {
CompletableFutureHandler future = new CompletableFutureHandler();
fileChannel.read(dst, 0, null, future);
return future;
}

@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
fileChannel.write(src, 0, attachment, handler);
}

@Override
public Future<Integer> write(ByteBuffer src) {
return fileChannel.write(src, 0);
}

@Override
public void close() throws IOException {
fileChannel.close();
}

@Override
public boolean isOpen() {
return fileChannel.isOpen();
}

private static class CompletableFutureHandler extends CompletableFuture<Integer> implements CompletionHandler<Integer, Object> {

@Override
public void completed(Integer read, Object attachment) {
complete(read > 0 ? read : -1);
}

@Override
public void failed(Throwable exc, Object attachment) {
if (exc instanceof AsynchronousCloseException) {
complete(-1);
return;
}
completeExceptionally(exc);
}
}
}
}