Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.savant
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
restifyVersion = "4.1.2"
testngVersion = "7.7.1"

project(group: "io.fusionauth", name: "java-http", version: "0.1.13", licenses: ["ApacheV2_0"]) {
project(group: "io.fusionauth", name: "java-http", version: "0.1.14-RC.6", licenses: ["ApacheV2_0"]) {
workflow {
fetch {
// Dependency resolution order:
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.fusionauth</groupId>
<artifactId>java-http</artifactId>
<version>0.1.13</version>
<version>0.1.14-RC.4</version>
<packaging>jar</packaging>

<name>Java HTTP library (client and server)</name>
Expand Down Expand Up @@ -170,4 +170,4 @@
</build>
</profile>
</profiles>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ public ChunkedBodyProcessor(NonBlockingByteBufferOutputStream outputStream) {

@Override
public ByteBuffer[] currentBuffers() {
ByteBuffer buffer = outputStream.readableBuffer();
if (buffer != null && currentBuffers[1] != buffer) {
buildChunk(buffer);
// If the current chunk still has data, write it
if (currentBuffers[1] != null && (currentBuffers[0].hasRemaining() || currentBuffers[1].hasRemaining() || currentBuffers[2].hasRemaining())) {
return currentBuffers;
}

// Otherwise, get the next chunk if any is ready to go
ByteBuffer buffer = outputStream.readableBuffer();
if (buffer != null) {
buildChunk(buffer);
return currentBuffers;
}

Expand All @@ -62,7 +65,7 @@ public ByteBuffer[] currentBuffers() {

@Override
public boolean isComplete() {
return outputStream.isClosed();
return outputStream.isClosed() && !Final[0].hasRemaining();
}

private void buildChunk(ByteBuffer buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public ContentLengthBodyProcessor(NonBlockingByteBufferOutputStream outputStream

@Override
public ByteBuffer[] currentBuffers() {
if (currentBuffers[0] != null && currentBuffers[0].hasRemaining()) {
return currentBuffers;
}

currentBuffers[0] = outputStream.readableBuffer();
return currentBuffers[0] != null ? currentBuffers : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void clear() {

/**
* Flushes and then marks the stream closed. The flush must occur first so that the readers have access to the buffers before they are
* aware of the streams closure.
* aware of the stream's closure.
*/
@Override
public void close() {
Expand All @@ -83,31 +83,26 @@ public void flush() {
}
}

public boolean hasReadableBuffer() {
return buffers.peek() != null;
}

public boolean isClosed() {
return closed;
return buffers.isEmpty() && closed;
}

public boolean isEmpty() {
return !used;
}

/**
* Used by the reader side (the selector/processor) so that bytes can be read from the worker thread and written back to the client.
* Used by the reader side (the selector/processor) so that bytes can be read from the worker thread and written back to the client. This
* method should only be called once per buffer. It pops the buffer off the queue.
*
* @return A ByteBuffer that is used to read bytes that will be written back to the client or null if there aren't any buffers ready yet.
*/
public ByteBuffer readableBuffer() {
while (buffers.peek() != null) {
ByteBuffer head = buffers.peek();
if (head.hasRemaining()) {
return head;
}

// Throw out the head node
buffers.poll();
}

return null;
return buffers.poll();
}

@Override
Expand Down Expand Up @@ -155,7 +150,7 @@ public void write(byte[] b, int off, int len) {
private void addBuffer(boolean notify) {
currentBuffer.flip();
if (!buffers.offer(currentBuffer)) {
throw new IllegalStateException("The LinkedBlockingQueue is borked. It should never reject an offer() operation.");
throw new IllegalStateException("The ConcurrentLinkedQueue is borked. It should never reject an offer() operation.");
}

currentBuffer = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, FusionAuth, All Rights Reserved
* Copyright (c) 2022-2023, FusionAuth, All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,7 +64,7 @@ public synchronized ByteBuffer[] currentBuffer() {
if (state == ResponseState.Preamble || state == ResponseState.Expect) {
// We can't write the preamble under normal conditions if the worker thread is still working. Expect handling is different and the
// client is waiting for a pre-canned response
if (state != ResponseState.Expect && outputStream.readableBuffer() == null && !outputStream.isClosed()) {
if (state != ResponseState.Expect && !outputStream.hasReadableBuffer() && !outputStream.isClosed()) {
return null;
}

Expand Down
17 changes: 16 additions & 1 deletion src/main/java/io/fusionauth/http/server/HTTPServerThread.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, FusionAuth, All Rights Reserved
* Copyright (c) 2022-2023, FusionAuth, All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.List;
import java.util.Map;

import io.fusionauth.http.ParseException;
import io.fusionauth.http.log.Logger;
Expand Down Expand Up @@ -242,6 +243,18 @@ private void cleanup() {
var client = (SocketChannel) key.channel();
try {
logger.debug("Closing client connection [{}] due to inactivity", client.getRemoteAddress().toString());

StringBuilder threadDump = new StringBuilder();
for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
threadDump.append(entry.getKey()).append(" ").append(entry.getKey().getState()).append("\n");
for (StackTraceElement ste : entry.getValue()) {
threadDump.append("\tat ").append(ste).append("\n");
}
threadDump.append("\n");
}

logger.debug("Thread dump from server side.\n" + threadDump);

} catch (IOException e) {
// Ignore because we are just debugging
}
Expand All @@ -265,6 +278,7 @@ private void read(SelectionKey key) throws IOException {
if (buffer != null) {
int num = client.read(buffer);
if (num < 0) {
logger.debug("Client terminated the connection. Num bytes is [{}]. Closing connection", num);
state = processor.close(true);
} else {
logger.debug("Read [{}] bytes from client", num);
Expand Down Expand Up @@ -299,6 +313,7 @@ private void write(SelectionKey key) throws IOException {
}

if (num < 0) {
logger.debug("Client refused bytes or terminated the connection. Num bytes is [{}]. Closing connection", num);
state = processor.close(true);
} else {
if (num > 0) {
Expand Down
Loading