Skip to content
This repository has been archived by the owner on Mar 26, 2023. It is now read-only.
/ rio Public archive

Commit

Permalink
Merge pull request #16 from cqfn/15
Browse files Browse the repository at this point in the history
Using whitebox subscriber
  • Loading branch information
g4s8 committed Jul 28, 2020
2 parents 1de0413 + f3c59c1 commit 8713c2e
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 16 deletions.
4 changes: 4 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
* text=auto eol=lf
*.java ident
*.xml ident
*.png binary
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ jobs:
restore-keys: |
${{ runner.os }}-jdk-${{ matrix.java }}-maven-
- name: Build it with Maven
run: mvn -B verify
run: mvn -B verify -Pqulice
2 changes: 1 addition & 1 deletion .pdd
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
--source=.
--verbose
--exclude target/**/*
--exclude src/test/resources/**/*
--exclude src/test/resources-binary/**/*
--exclude .idea/**/*
--rule min-words:15
--rule min-estimate:15
Expand Down
17 changes: 17 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ OTHER DEALINGS IN THE SOFTWARE.
<directory>${basedir}/src/test/resources</directory>
<filtering>false</filtering>
</testResource>
<testResource>
<directory>${basedir}/src/test/resources-binary</directory>
<filtering>false</filtering>
</testResource>
</testResources>
<plugins>
<plugin>
Expand Down Expand Up @@ -230,6 +234,19 @@ OTHER DEALINGS IN THE SOFTWARE.
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>com.qulice</groupId>
<artifactId>qulice-maven-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>checkstyle:/src/test/resources-binary/.*</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
Expand Down
118 changes: 104 additions & 14 deletions src/test/java/org/cqfn/rio/file/WriteSubscriberTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
import java.util.Locale;
import java.util.concurrent.Executors;
import org.reactivestreams.Subscriber;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
Expand All @@ -46,14 +47,15 @@
*
* @since 1.0
* @checkstyle MagicNumberCheck (500 lines)
* @checkstyle AnonInnerLengthCheck (500 lines)
*/
@SuppressWarnings(
{
"PMD.TestClassWithoutTestCases", "PMD.OnlyOneReturn",
"PMD.JUnit4TestShouldUseBeforeAnnotation"
}
)
public final class WriteSubscriberTest extends SubscriberBlackboxVerification<ByteBuffer> {
public final class WriteSubscriberTest extends SubscriberWhiteboxVerification<ByteBuffer> {

/**
* Ctor.
Expand All @@ -62,33 +64,121 @@ public WriteSubscriberTest() {
super(new TestEnvironment());
}

@BeforeClass
public void setUp() {
if (System.getProperty("os.name").toLowerCase(Locale.US).contains("win")) {
throw new SkipException("Disabled for windows");
}
}

@Override
public Subscriber<ByteBuffer> createSubscriber() {
public Subscriber<ByteBuffer> createSubscriber(
final WhiteboxSubscriberProbe<ByteBuffer> probe
) {
final Path tmp;
try {
tmp = Files.createTempFile(this.getClass().getSimpleName(), ".tmp");
tmp.toFile().deleteOnExit();
return new WriteSubscriber(
FileChannel.open(tmp, Collections.singleton(StandardOpenOption.WRITE)),
WriteGreed.SINGLE,
Executors.newCachedThreadPool()
return new SubscriberWithProbe<>(
new WriteSubscriber(
FileChannel.open(tmp, Collections.singleton(StandardOpenOption.WRITE)),
WriteGreed.SINGLE,
Executors.newCachedThreadPool()
),
probe
);
} catch (final IOException err) {
throw new UncheckedIOException(err);
}
}

@BeforeClass
public void setUp() {
if (System.getProperty("os.name").toLowerCase(Locale.US).contains("win")) {
throw new SkipException("Disabled for windows");
}
}

@Override
public ByteBuffer createElement(final int element) {
final byte[] arr = new byte[1024];
Arrays.fill(arr, (byte) element);
return ByteBuffer.wrap(arr);
}

/**
* Subscriber with probe.
* @param <T> Subscriber type
* @since 0.2
*/
private static class SubscriberWithProbe<T> implements Subscriber<T> {

/**
* Target subscriber.
*/
private final Subscriber<T> target;

/**
* Test probe.
*/
private final WhiteboxSubscriberProbe<T> probe;

/**
* Ctor.
* @param target Subscriber
* @param probe For test
*/
SubscriberWithProbe(final Subscriber<T> target,
final WhiteboxSubscriberProbe<T> probe) {
this.target = target;
this.probe = probe;
}

@Override
public void onSubscribe(final Subscription subscription) {
this.target.onSubscribe(subscription);
this.probe.registerOnSubscribe(new ProbePuppet(subscription));
}

@Override
public void onNext(final T next) {
this.target.onNext(next);
this.probe.registerOnNext(next);
}

@Override
public void onError(final Throwable err) {
this.target.onError(err);
this.probe.registerOnError(err);
}

@Override
public void onComplete() {
this.target.onComplete();
this.probe.registerOnComplete();
}
}

/**
* Puppet for subscriber probe.
* @since 0.2
*/
private static class ProbePuppet implements SubscriberPuppet {

/**
* Actual subscription.
*/
private final Subscription subscription;

/**
* New puppet.
* @param subscription Of subscriber
*/
ProbePuppet(final Subscription subscription) {
this.subscription = subscription;
}

@Override
public void triggerRequest(final long elements) {
this.subscription.request(elements);
}

@Override
public void signalCancel() {
this.subscription.cancel();
}
}
}
File renamed without changes.

0 comments on commit 8713c2e

Please sign in to comment.