Skip to content
This repository has been archived by the owner on Mar 26, 2023. It is now read-only.
/ rio Public archive

Commit

Permalink
Merge pull request #11 from g4s8/8
Browse files Browse the repository at this point in the history
Fixed qulice warnings
  • Loading branch information
g4s8 committed Jul 23, 2020
2 parents b7eedc8 + f0fe7ab commit 0099546
Show file tree
Hide file tree
Showing 24 changed files with 301 additions and 110 deletions.
40 changes: 40 additions & 0 deletions src/main/java/wtf/g4s8/rio/file/CloseChanOnError.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
/*
* MIT License
*
* Copyright (c) 2020 g4s8
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights * to use, copy, modify,
* merge, publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package wtf.g4s8.rio.file;

import com.jcabi.log.Logger;
Expand All @@ -10,25 +34,41 @@
*/
final class CloseChanOnError implements Runnable {

/**
* Decorated job to run.
*/
private final Runnable origin;

/**
* Channel to close.
*/
private final FileChannel chan;

/**
* Wraps runnable with close channel action on exit.
* @param origin Runnable to decorate
* @param chan Channel to close
*/
CloseChanOnError(final Runnable origin, final FileChannel chan) {
this.origin = origin;
this.chan = chan;
}

@Override
@SuppressWarnings("PMD.AvoidCatchingThrowable")
public void run() {
try {
this.origin.run();
// @checkstyle IllegalCatchCheck (1 line)
} catch (final Throwable err) {
this.close();
throw err;
}
}

/**
* Close channel.
*/
private void close() {
if (this.chan.isOpen()) {
try {
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/wtf/g4s8/rio/file/ErrorOnException.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
/*
* MIT License
*
* Copyright (c) 2020 g4s8
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights * to use, copy, modify,
* merge, publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package wtf.g4s8.rio.file;

/**
Expand Down Expand Up @@ -28,9 +52,11 @@ final class ErrorOnException implements Runnable {
}

@Override
@SuppressWarnings("PMD.AvoidCatchingThrowable")
public void run() {
try {
this.runnable.run();
// @checkstyle IllegalCatchCheck (1 line)
} catch (final Throwable exx) {
this.sub.onError(exx);
}
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/wtf/g4s8/rio/file/File.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public Publisher<ByteBuffer> content(final ExecutorService exec) {
return new ReadFlow(this.path, Buffers.Standard.K8, exec);
}


/**
* File's content.
* @param buf Buffers policy
Expand Down Expand Up @@ -141,7 +140,10 @@ public CompletionStage<Void> write(final Publisher<ByteBuffer> data, final Write
* @param exec Executor service to perform IO operations
* @param opts Options
* @return Future
* @checkstyle ReturnCountCheck (20 lines)
* @checkstyle ParameterNumberCheck (7 lines)
*/
@SuppressWarnings("PMD.OnlyOneReturn")
public CompletionStage<Void> write(final Publisher<ByteBuffer> data,
final WriteGreed greed,
final ExecutorService exec,
Expand Down Expand Up @@ -170,7 +172,7 @@ public CompletionStage<Void> write(final Publisher<ByteBuffer> data,
* @param src User specified options
* @return Fixed options
*/
private static OpenOption[] writeOpts(final OpenOption[] src) {
private static OpenOption[] writeOpts(final OpenOption... src) {
final OpenOption[] opts;
if (src.length == 0) {
opts = new OpenOption[]{StandardOpenOption.WRITE, StandardOpenOption.CREATE};
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/wtf/g4s8/rio/file/ReadFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void cancel() {
private final Buffers buffers;

/**
* Executor service for IO operations
* Executor service for IO operations.
*/
private final ExecutorService exec;

Expand All @@ -90,7 +90,7 @@ public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
try {
chan = FileChannel.open(this.path, StandardOpenOption.READ);
} catch (final IOException err) {
subscriber.onSubscribe(DUMMY);
subscriber.onSubscribe(ReadFlow.DUMMY);
subscriber.onError(err);
return;
}
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/wtf/g4s8/rio/file/ReadRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
/**
* Read request.
* @since 0.1
* @checkstyle ExecutableStatementCountCheck (500 lines)
*/
@SuppressWarnings("PMD.MissingStaticMethodInNonInstantiatableClass")
abstract class ReadRequest {

/**
* Flow subscriber.
* @checkstyle VisibilityModifierCheck (5 lines)
*/
protected final ReadSubscriberState<? super ByteBuffer> sub;

Expand All @@ -53,7 +56,7 @@ private ReadRequest(final ReadSubscriberState<? super ByteBuffer> sub) {
* Process file channel.
* @param channel Channel to process
*/
abstract void process(final FileChannel channel);
abstract void process(FileChannel channel);

/**
* Next request.
Expand All @@ -74,6 +77,7 @@ static final class Next extends ReadRequest {
/**
* New read request.
* @param sub Subscriber
* @param buffers Buffer allocation strategy
* @param count Amount of requests
*/
Next(final ReadSubscriberState<? super ByteBuffer> sub, final Buffers buffers,
Expand All @@ -83,9 +87,11 @@ static final class Next extends ReadRequest {
this.count = count;
}

// @checkstyle ReturnCountCheck (50 lines)
@Override
@SuppressWarnings({"PMD.OnlyOneReturn", "PMD.AvoidCatchingGenericException"})
void process(final FileChannel channel) {
for (int i = 0; i < this.count; ++i) {
for (int cnt = 0; cnt < this.count; ++cnt) {
if (this.sub.done()) {
return;
}
Expand All @@ -109,6 +115,7 @@ void process(final FileChannel channel) {
if (read >= 0) {
try {
this.sub.onNext(buf);
// @checkstyle IllegalCatchCheck (1 line)
} catch (final Exception exx) {
try {
channel.close();
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/wtf/g4s8/rio/file/ReadSubscriberState.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

/**
* Read subscriber wrapper which remembers current state.
* @param <T> Subscriber target type
* @since 0.1
*/
final class ReadSubscriberState<T> implements Subscriber<T> {
Expand All @@ -48,7 +49,7 @@ final class ReadSubscriberState<T> implements Subscriber<T> {
* Decorates subscriber.
* @param origin Subscriber to decorate
*/
public ReadSubscriberState(final Subscriber<T> origin) {
ReadSubscriberState(final Subscriber<T> origin) {
this.origin = origin;
this.completed = new AtomicBoolean();
}
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/wtf/g4s8/rio/file/ReadSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ final class ReadSubscription implements Subscription {
*/
private final Buffers buffers;


/**
* Tasks queue.
*/
private final ReadTaskQueue queue;

/**
Expand All @@ -66,10 +68,12 @@ public void request(final long count) {
}
if (count <= 0) {
this.queue.clear();
this.sub.onError(new IllegalArgumentException(String.format("Requested %d items", count)));
return;
this.sub.onError(
new IllegalArgumentException(String.format("Requested %d items", count))
);
} else {
this.queue.accept(new ReadRequest.Next(this.sub, this.buffers, count));
}
this.queue.accept(new ReadRequest.Next(this.sub, this.buffers, count));
}

@Override
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/wtf/g4s8/rio/file/ReadTaskQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
/**
* Read loop for read requests.
* @since 0.1
* @checkstyle NestedIfDepthCheck (500 lines)
*/
final class ReadTaskQueue implements Runnable {

Expand Down Expand Up @@ -69,8 +70,9 @@ final class ReadTaskQueue implements Runnable {
* @param sub Subscriber
* @param channel File channel
* @param exec Executor service to process tasks
* @checkstyle MagicNumberCheck (10 lines)
*/
public ReadTaskQueue(final ReadSubscriberState<? super ByteBuffer> sub,
ReadTaskQueue(final ReadSubscriberState<? super ByteBuffer> sub,
final FileChannel channel, final Executor exec) {
this.queue = new SpscUnboundedArrayQueue<>(128);
this.sub = sub;
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/wtf/g4s8/rio/file/WriteGreed.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public interface WriteGreed {
/**
* Request next chunks from subscription.
* @param sub Subscription to request
* @return True if reuqested successfully
*/
boolean request(Subscription sub);

Expand Down Expand Up @@ -83,6 +84,7 @@ final class Constant implements WriteGreed {
* @param amount Amount to request
* @param shift Request items before shifted amount was processed
*/
@SuppressWarnings("PMD.ConstructorOnlyInitializesOrCallOtherConstructors")
public Constant(final long amount, final long shift) {
if (shift >= amount) {
throw new IllegalArgumentException("Shift should be less than amount");
Expand All @@ -94,13 +96,17 @@ public Constant(final long amount, final long shift) {

@Override
public boolean request(final Subscription sub) {
// @checkstyle AvoidInlineConditionalsCheck (1 line)
final long pos = this.cnt.updateAndGet(prev -> prev <= this.shift ? this.amount : prev);
final boolean result;
if (pos == this.amount) {
sub.request(this.amount);
return true;
result = true;
} else {
this.cnt.decrementAndGet();
result = false;
}
this.cnt.decrementAndGet();
return false;
return result;
}
}
}
22 changes: 12 additions & 10 deletions src/main/java/wtf/g4s8/rio/file/WriteRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
* Request to write.
* @since 0.1
*/
@SuppressWarnings("PMD.MissingStaticMethodInNonInstantiatableClass")
abstract class WriteRequest {

/**
* Write future.
* @checkstyle VisibilityModifierCheck (5 lines)
*/
protected final CompletableFuture<Void> future;

Expand Down Expand Up @@ -71,7 +73,7 @@ static final class Next extends WriteRequest {
* @param future Future
* @param target Buffer
*/
public Next(final CompletableFuture<Void> future, final ByteBuffer target) {
Next(final CompletableFuture<Void> future, final ByteBuffer target) {
super(future);
this.target = target;
}
Expand Down Expand Up @@ -112,7 +114,7 @@ static final class Complete extends WriteRequest {
* New complete signal.
* @param future Write future
*/
public Complete(final CompletableFuture<Void> future) {
Complete(final CompletableFuture<Void> future) {
super(future);
}

Expand Down Expand Up @@ -149,11 +151,16 @@ static final class Error extends WriteRequest {
* @param future Write future
* @param err Error
*/
public Error(final CompletableFuture<Void> future, final Throwable err) {
Error(final CompletableFuture<Void> future, final Throwable err) {
super(future);
this.err = err;
}

@Override
public String toString() {
return String.format("Error: %s", this.err.getMessage());
}

@Override
void process(final FileChannel chan) {
try {
Expand All @@ -166,24 +173,19 @@ void process(final FileChannel chan) {
}
this.future.completeExceptionally(this.err);
}

@Override
public String toString() {
return String.format("Error: %s", this.err.getMessage());
}
}

/**
* Init request.
* @since 0.2
*/
public static final class Init extends WriteRequest {
static final class Init extends WriteRequest {

/**
* Ctor.
* @param future Write future
*/
public Init(final CompletableFuture<Void> future) {
Init(final CompletableFuture<Void> future) {
super(future);
}

Expand Down

0 comments on commit 0099546

Please sign in to comment.