Skip to content

Commit

Permalink
Enrich Writer by the information for the integration layer
Browse files Browse the repository at this point in the history
Signed-off-by: Jan Supol <jan.supol@oracle.com>
  • Loading branch information
jansupol committed Apr 22, 2020
1 parent 24c2a1e commit 8947cd1
Show file tree
Hide file tree
Showing 9 changed files with 567 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2017 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -16,6 +16,8 @@

package org.glassfish.tyrus.core;

import org.glassfish.tyrus.spi.WriterInfo;

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.ExecutionException;
Expand All @@ -28,6 +30,10 @@
*/
class OutputStreamToAsyncBinaryAdapter extends OutputStream {
private final TyrusWebSocket socket;
private static final WriterInfo BINARY_CONTINUATION_INFO =
new WriterInfo(WriterInfo.MessageType.BINARY_CONTINUATION, WriterInfo.RemoteEndpointType.BASIC);
private static final WriterInfo BINARY_INFO =
new WriterInfo(WriterInfo.MessageType.BINARY, WriterInfo.RemoteEndpointType.BASIC);

public OutputStreamToAsyncBinaryAdapter(TyrusWebSocket socket) {
this.socket = socket;
Expand All @@ -43,7 +49,7 @@ public void write(byte b[], int off, int len) throws IOException {
return;
}

final Future<?> future = socket.sendBinary(b, off, len, false);
final Future<?> future = socket.sendBinary(b, off, len, false, BINARY_CONTINUATION_INFO);
try {
future.get();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -71,6 +77,6 @@ public void flush() throws IOException {

@Override
public void close() throws IOException {
socket.sendBinary(new byte[]{}, true);
socket.sendBinary(new byte[]{}, true, BINARY_INFO);
}
}
82 changes: 59 additions & 23 deletions core/src/main/java/org/glassfish/tyrus/core/ProtocolHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2017 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -48,6 +48,7 @@
import org.glassfish.tyrus.spi.UpgradeRequest;
import org.glassfish.tyrus.spi.UpgradeResponse;
import org.glassfish.tyrus.spi.Writer;
import org.glassfish.tyrus.spi.WriterInfo;

/**
* Tyrus protocol handler.
Expand Down Expand Up @@ -81,6 +82,9 @@ public final class ProtocolHandler {
private volatile MessageEventListener messageEventListener = MessageEventListener.NO_OP;
private volatile SendingFragmentState sendingFragment = SendingFragmentState.IDLE;

private static final WriterInfo CLOSE = new WriterInfo(WriterInfo.MessageType.CLOSE, WriterInfo.RemoteEndpointType.SUPER);
private static final WriterInfo NULL_INFO = new WriterInfo(null, null);

/**
* Synchronizes all public send* (including stream variants) methods.
* <p>
Expand Down Expand Up @@ -238,30 +242,42 @@ public void setMessageEventListener(MessageEventListener messageEventListener) {
* Not message frames - ping/pong/...
*/
/* package */
final Future<Frame> send(TyrusFrame frame) {
return send(frame, null, true);
final Future<Frame> send(TyrusFrame frame, WriterInfo writerInfo) {
return send(frame, null, writerInfo, true);
}

private Future<Frame> send(TyrusFrame frame, CompletionHandler<Frame> completionHandler, Boolean useTimeout) {
return write(frame, completionHandler, useTimeout);
private
Future<Frame> send(TyrusFrame frame, CompletionHandler<Frame> completionHandler, WriterInfo writerInfo, Boolean useTimeout) {
return write(frame, completionHandler, writerInfo, useTimeout);
}

private Future<Frame> send(ByteBuffer frame, CompletionHandler<Frame> completionHandler, Boolean useTimeout) {
return write(frame, completionHandler, useTimeout);
private
Future<Frame> send(ByteBuffer frame, CompletionHandler<Frame> completionHandler, WriterInfo writerInfo, Boolean useTimeout) {
return write(frame, completionHandler, writerInfo, useTimeout);
}

@Deprecated
public Future<Frame> send(byte[] data) {
return send(data, NULL_INFO);
}

public Future<Frame> send(byte[] data, WriterInfo writerInfo) {
lock.lock();
try {
checkSendingFragment();

return send(new BinaryFrame(data, false, true), null, true);
return send(new BinaryFrame(data, false, true), null, writerInfo, true);
} finally {
lock.unlock();
}
}

@Deprecated
public void send(final byte[] data, final SendHandler handler) {
send(data, handler, NULL_INFO);
}

public void send(final byte[] data, final SendHandler handler, WriterInfo writerInfo) {
lock.lock();

try {
Expand All @@ -277,24 +293,34 @@ public void failed(Throwable throwable) {
public void completed(Frame result) {
handler.onResult(new SendResult());
}
}, true);
}, writerInfo, true);
} finally {
lock.unlock();
}
}

@Deprecated
public Future<Frame> send(String data) {
return send(data, NULL_INFO);
}

public Future<Frame> send(String data, WriterInfo writerInfo) {
lock.lock();

try {
checkSendingFragment();
return send(new TextFrame(data, false, true));
return send(new TextFrame(data, false, true), writerInfo);
} finally {
lock.unlock();
}
}

@Deprecated
public void send(final String data, final SendHandler handler) {
send(data, handler, NULL_INFO);
}

public void send(final String data, final SendHandler handler, WriterInfo writerInfo) {
lock.lock();

try {
Expand All @@ -310,7 +336,7 @@ public void failed(Throwable throwable) {
public void completed(Frame result) {
handler.onResult(new SendResult());
}
}, true);
}, writerInfo, true);
} finally {
lock.unlock();
}
Expand All @@ -328,7 +354,7 @@ public Future<Frame> sendRawFrame(ByteBuffer data) {
try {
checkSendingFragment();

return send(data, null, true);
return send(data, null, new WriterInfo(WriterInfo.MessageType.BINARY, WriterInfo.RemoteEndpointType.BROADCAST), true);
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -364,14 +390,19 @@ private void checkSendingFragment() {
}
}

@Deprecated
public Future<Frame> stream(boolean last, byte[] bytes, int off, int len) {
return stream(last, bytes, off, len, NULL_INFO);
}

public Future<Frame> stream(boolean last, byte[] bytes, int off, int len, WriterInfo writerInfo) {
lock.lock();

try {
switch (sendingFragment) {
case SENDING_BINARY:
Future<Frame> frameFuture = send(
new BinaryFrame(Arrays.copyOfRange(bytes, off, off + len), true, last));
new BinaryFrame(Arrays.copyOfRange(bytes, off, off + len), true, last), writerInfo);
if (last) {
sendingFragment = SendingFragmentState.IDLE;
idleCondition.signalAll();
Expand All @@ -381,26 +412,31 @@ public Future<Frame> stream(boolean last, byte[] bytes, int off, int len) {
case SENDING_TEXT:
checkSendingFragment();
sendingFragment = (last ? SendingFragmentState.IDLE : SendingFragmentState.SENDING_BINARY);
return send(new BinaryFrame(Arrays.copyOfRange(bytes, off, off + len), false, last));
return send(new BinaryFrame(Arrays.copyOfRange(bytes, off, off + len), false, last), writerInfo);

default:
// IDLE
sendingFragment = (last ? SendingFragmentState.IDLE : SendingFragmentState.SENDING_BINARY);
return send(new BinaryFrame(Arrays.copyOfRange(bytes, off, off + len), false, last));
return send(new BinaryFrame(Arrays.copyOfRange(bytes, off, off + len), false, last), writerInfo);
}

} finally {
lock.unlock();
}
}

@Deprecated
public Future<Frame> stream(boolean last, String fragment) {
return stream(last, fragment, NULL_INFO);
}

public Future<Frame> stream(boolean last, String fragment, WriterInfo writerInfo) {
lock.lock();

try {
switch (sendingFragment) {
case SENDING_TEXT:
Future<Frame> frameFuture = send(new TextFrame(fragment, true, last));
Future<Frame> frameFuture = send(new TextFrame(fragment, true, last), writerInfo);
if (last) {
sendingFragment = SendingFragmentState.IDLE;
idleCondition.signalAll();
Expand All @@ -410,12 +446,12 @@ public Future<Frame> stream(boolean last, String fragment) {
case SENDING_BINARY:
checkSendingFragment();
sendingFragment = (last ? SendingFragmentState.IDLE : SendingFragmentState.SENDING_TEXT);
return send(new TextFrame(fragment, false, last));
return send(new TextFrame(fragment, false, last), writerInfo);

default:
// IDLE
sendingFragment = (last ? SendingFragmentState.IDLE : SendingFragmentState.SENDING_TEXT);
return send(new TextFrame(fragment, false, last));
return send(new TextFrame(fragment, false, last), writerInfo);
}

} finally {
Expand All @@ -440,15 +476,15 @@ public synchronized Future<Frame> close(final int code, final String reason) {
outgoingCloseFrame = new CloseFrame(closeReason);
}

final Future<Frame> send = send(outgoingCloseFrame, null, false);
final Future<Frame> send = send(outgoingCloseFrame, null, CLOSE, false);

webSocket.onClose(new CloseFrame(closeReason));

return send;
}

private Future<Frame> write(final TyrusFrame frame, final CompletionHandler<Frame> completionHandler,
boolean useTimeout) {
WriterInfo data, boolean useTimeout) {
final Writer localWriter = writer;
final TyrusFuture<Frame> future = new TyrusFuture<Frame>();

Expand All @@ -457,22 +493,22 @@ private Future<Frame> write(final TyrusFrame frame, final CompletionHandler<Fram
}

final ByteBuffer byteBuffer = frame(frame);
localWriter.write(byteBuffer, new CompletionHandlerWrapper(completionHandler, future, frame));
localWriter.write(byteBuffer, new CompletionHandlerWrapper(completionHandler, future, frame), data);
messageEventListener.onFrameSent(frame.getFrameType(), frame.getPayloadLength());

return future;
}

private Future<Frame> write(final ByteBuffer frame, final CompletionHandler<Frame> completionHandler,
boolean useTimeout) {
WriterInfo data, boolean useTimeout) {
final Writer localWriter = writer;
final TyrusFuture<Frame> future = new TyrusFuture<Frame>();

if (localWriter == null) {
throw new IllegalStateException(LocalizationMessages.CONNECTION_NULL());
}

localWriter.write(frame, new CompletionHandlerWrapper(completionHandler, future, null));
localWriter.write(frame, new CompletionHandlerWrapper(completionHandler, future, null), data);

return future;
}
Expand Down

0 comments on commit 8947cd1

Please sign in to comment.