Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ hs_err_pid*
/target/
/bin/

# jcstress / jqwik test outputs (generated in repo root)
/results/
/jcstress-results-*.bin.gz
/.jqwik-database

# IDEA files
/.idea/
*.iml
Expand Down
4 changes: 4 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ mvn test -Dtest=StreamBufferTest#testSimpleRoundTrip
mvn org.pitest:pitest-maven:mutationCoverage
```

`mvn test` also runs:
- **jqwik properties** (`StreamBufferProperties`) — picked up by Surefire as a JUnit 5 engine.
- **jcstress** tests under `net.ladenthin.streambuffer.jcstress` — executed in a forked JVM via `exec-maven-plugin` in the `test` phase (`-m quick` mode).

## Architecture

`StreamBuffer` is a single-class Java library (`net.ladenthin.streambuffer`) that connects an `OutputStream` and `InputStream` through a dynamic FIFO queue — solving the fixed-buffer and cross-thread-deadlock limitations of Java's `PipedInputStream`/`PipedOutputStream`.
Expand Down
33 changes: 33 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ SPDX-License-Identifier: Apache-2.0
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<java.test.version>21</java.test.version>
<jcstress.version>0.16</jcstress.version>
<jqwik.version>1.9.2</jqwik.version>
<project.build.outputTimestamp>${git.commit.time}</project.build.outputTimestamp>
</properties>
<inceptionYear>2014</inceptionYear>
Expand Down Expand Up @@ -95,6 +97,18 @@ SPDX-License-Identifier: Apache-2.0
<version>1.37</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>
<version>${jqwik.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jcstress</groupId>
<artifactId>jcstress-core</artifactId>
<version>${jcstress.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down Expand Up @@ -224,6 +238,25 @@ SPDX-License-Identifier: Apache-2.0
<mainClass>org.openjdk.jmh.Main</mainClass>
<classpathScope>test</classpathScope>
</configuration>
<executions>
<execution>
<id>jcstress</id>
<phase>test</phase>
<goals><goal>exec</goal></goals>
<configuration>
<executable>${java.home}/bin/java</executable>
<classpathScope>test</classpathScope>
<arguments>
<argument>-classpath</argument>
<classpath/>
<argument>org.openjdk.jcstress.Main</argument>
<argument>-v</argument>
<argument>-m</argument>
<argument>default</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.pitest</groupId>
Expand Down
146 changes: 146 additions & 0 deletions src/test/java/net/ladenthin/streambuffer/StreamBufferProperties.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// SPDX-FileCopyrightText: 2014-2026 Bernard Ladenthin <bernard.ladenthin@gmail.com>
//
// SPDX-License-Identifier: Apache-2.0
package net.ladenthin.streambuffer;

import net.jqwik.api.ForAll;
import net.jqwik.api.Property;
import net.jqwik.api.constraints.IntRange;
import net.jqwik.api.constraints.Size;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;

public class StreamBufferProperties {

@Property
boolean writeAllThenReadAvailableYieldsConcatenation(
@ForAll @Size(min = 0, max = 32) List<@Size(min = 0, max = 256) byte[]> chunks
) throws IOException {
StreamBuffer sb = new StreamBuffer();
OutputStream os = sb.getOutputStream();
InputStream is = sb.getInputStream();

ByteArrayOutputStream expected = new ByteArrayOutputStream();
for (byte[] chunk : chunks) {
os.write(chunk);
expected.write(chunk);
}

int available = is.available();
if (available != expected.size()) return false;
if (available == 0) return true;

byte[] read = new byte[available];
int n = is.read(read, 0, available);
if (n != available) return false;

return java.util.Arrays.equals(read, expected.toByteArray());
}

@Property
boolean readChunkSizeDoesNotAffectContent(
@ForAll @Size(min = 1, max = 1024) byte[] payload,
@ForAll @IntRange(min = 1, max = 64) int writeChunk,
@ForAll @IntRange(min = 1, max = 64) int readChunk
) throws IOException {
StreamBuffer sb = new StreamBuffer();
OutputStream os = sb.getOutputStream();
InputStream is = sb.getInputStream();

for (int off = 0; off < payload.length; off += writeChunk) {
int len = Math.min(writeChunk, payload.length - off);
os.write(payload, off, len);
}

byte[] out = new byte[payload.length];
int read = 0;
while (read < payload.length) {
int len = Math.min(readChunk, payload.length - read);
int n = is.read(out, read, len);
if (n <= 0) return false;
read += n;
}

return java.util.Arrays.equals(payload, out);
}

@Property
boolean counterAccountingIsConsistent(
@ForAll @Size(min = 0, max = 16) List<@Size(min = 0, max = 128) byte[]> writes,
@ForAll @IntRange(min = 0, max = 16) int readChunk
) throws IOException {
StreamBuffer sb = new StreamBuffer();
OutputStream os = sb.getOutputStream();
InputStream is = sb.getInputStream();

long expectedWritten = 0;
for (byte[] w : writes) {
os.write(w);
expectedWritten += w.length;
}

long expectedRead = 0;
if (readChunk > 0) {
byte[] buf = new byte[readChunk];
while (is.available() > 0) {
int n = is.read(buf, 0, Math.min(readChunk, is.available()));
if (n <= 0) break;
expectedRead += n;
}
}

if (sb.getTotalBytesWritten() != expectedWritten) return false;
if (sb.getTotalBytesRead() != expectedRead) return false;
return sb.getTotalBytesWritten() == sb.getTotalBytesRead() + is.available();
}

@Property
boolean safeWriteIsolatesFromExternalMutation(
@ForAll @Size(min = 1, max = 256) byte[] payload
) throws IOException {
StreamBuffer sb = new StreamBuffer();
sb.setSafeWrite(true);
OutputStream os = sb.getOutputStream();
InputStream is = sb.getInputStream();

byte[] expected = payload.clone();
os.write(payload);
// Mutate the source after writing; safeWrite=true must have cloned on write.
java.util.Arrays.fill(payload, (byte) 0x7F);

byte[] read = new byte[expected.length];
int n = is.read(read, 0, expected.length);
if (n != expected.length) return false;
return java.util.Arrays.equals(read, expected);
}

@Property
boolean trimPreservesContentAcrossMaxBufferElements(
@ForAll @Size(min = 1, max = 64) List<@Size(min = 0, max = 64) byte[]> chunks,
@ForAll @IntRange(min = 0, max = 16) int maxBufferElements
) throws IOException {
StreamBuffer sb = new StreamBuffer();
sb.setMaxBufferElements(maxBufferElements);
OutputStream os = sb.getOutputStream();
InputStream is = sb.getInputStream();

ByteArrayOutputStream expected = new ByteArrayOutputStream();
for (byte[] chunk : chunks) {
os.write(chunk);
expected.write(chunk);
}

int available = is.available();
if (available != expected.size()) return false;
if (available == 0) return true;

byte[] read = new byte[available];
int n = is.read(read, 0, available);
if (n != available) return false;
return java.util.Arrays.equals(read, expected.toByteArray());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// SPDX-FileCopyrightText: 2014-2026 Bernard Ladenthin <bernard.ladenthin@gmail.com>
//
// SPDX-License-Identifier: Apache-2.0
package net.ladenthin.streambuffer.jcstress;

import net.ladenthin.streambuffer.StreamBuffer;
import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Description;
import org.openjdk.jcstress.annotations.Expect;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Mode;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.Signal;
import org.openjdk.jcstress.annotations.State;

import java.io.IOException;
import java.io.InputStream;

@JCStressTest(Mode.Termination)
@Description("A reader blocked in read() must be unblocked when close() is invoked.")
@Outcome(id = "TERMINATED", expect = Expect.ACCEPTABLE, desc = "close() unblocked the reader")
@Outcome(id = "STALE", expect = Expect.FORBIDDEN, desc = "Reader stuck after close()")
@State
public class CloseDuringReadRace {

private final StreamBuffer sb = new StreamBuffer();
private final InputStream is = sb.getInputStream();

@Actor
public void reader() {
try {
is.read();
} catch (IOException ignored) {
// acceptable: close races may surface as IOException in some paths
}
}

@Signal
public void closer() throws IOException {
sb.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// SPDX-FileCopyrightText: 2014-2026 Bernard Ladenthin <bernard.ladenthin@gmail.com>
//
// SPDX-License-Identifier: Apache-2.0
package net.ladenthin.streambuffer.jcstress;

import net.ladenthin.streambuffer.StreamBuffer;
import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.Description;
import org.openjdk.jcstress.annotations.Expect;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.Z_Result;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;

@JCStressTest
@Description("Two concurrent writers must each appear contiguously in the FIFO; bytes must not interleave.")
@Outcome(id = "true", expect = Expect.ACCEPTABLE, desc = "Both payloads intact in some order")
@Outcome(id = "false", expect = Expect.FORBIDDEN, desc = "Torn / interleaved write")
@State
public class ConcurrentWriteRace {

private static final byte[] A = new byte[]{1, 2};
private static final byte[] B = new byte[]{3, 4};

private final StreamBuffer sb = new StreamBuffer();
private final OutputStream os = sb.getOutputStream();
private final InputStream is = sb.getInputStream();

@Actor
public void writerA() {
try { os.write(A); } catch (IOException ignored) { }

Check warning on line 37 in src/test/java/net/ladenthin/streambuffer/jcstress/ConcurrentWriteRace.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this block of code, fill it in, or add a comment explaining why it is empty.

See more on https://sonarcloud.io/project/issues?id=bernardladenthin_streambuffer&issues=AZ5VIhizm5Gz0TQXN8yE&open=AZ5VIhizm5Gz0TQXN8yE&pullRequest=74
}

@Actor
public void writerB() {
try { os.write(B); } catch (IOException ignored) { }

Check warning on line 42 in src/test/java/net/ladenthin/streambuffer/jcstress/ConcurrentWriteRace.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this block of code, fill it in, or add a comment explaining why it is empty.

See more on https://sonarcloud.io/project/issues?id=bernardladenthin_streambuffer&issues=AZ5VIhizm5Gz0TQXN8yF&open=AZ5VIhizm5Gz0TQXN8yF&pullRequest=74
}

@Arbiter
public void check(Z_Result r) {
try {
int available = is.available();
byte[] all = new byte[available];
int n = (available == 0) ? 0 : is.read(all, 0, available);
byte[] got = Arrays.copyOf(all, Math.max(n, 0));
r.r1 = Arrays.equals(got, new byte[]{1, 2, 3, 4})
|| Arrays.equals(got, new byte[]{3, 4, 1, 2});
} catch (IOException e) {
r.r1 = false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// SPDX-FileCopyrightText: 2014-2026 Bernard Ladenthin <bernard.ladenthin@gmail.com>
//
// SPDX-License-Identifier: Apache-2.0
package net.ladenthin.streambuffer.jcstress;

import net.ladenthin.streambuffer.StreamBuffer;
import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Description;
import org.openjdk.jcstress.annotations.Expect;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Mode;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.Signal;
import org.openjdk.jcstress.annotations.State;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

@JCStressTest(Mode.Termination)
@Description("A reader blocked in read() must be unblocked when a writer publishes a byte.")
@Outcome(id = "TERMINATED", expect = Expect.ACCEPTABLE, desc = "write() unblocked the reader")
@Outcome(id = "STALE", expect = Expect.FORBIDDEN, desc = "Reader stuck after write()")
@State
public class WriteUnblocksReadRace {

private final StreamBuffer sb = new StreamBuffer();
private final InputStream is = sb.getInputStream();
private final OutputStream os = sb.getOutputStream();

@Actor
public void reader() {
try {
is.read();
} catch (IOException ignored) {
// not expected on this path, but tolerated
}
}

@Signal
public void writer() throws IOException {
os.write(0x42);
}
}
Loading