Skip to content
This repository has been archived by the owner on May 6, 2022. It is now read-only.

Commit

Permalink
Create DataExchangeFilter and pipeline filter processing for #26
Browse files Browse the repository at this point in the history
  • Loading branch information
hdsdi3g committed Jan 13, 2021
1 parent 8980441 commit 6052355
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 2 deletions.
43 changes: 43 additions & 0 deletions src/main/java/tv/hd3g/transfertfiles/DataExchangeFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* This file is part of transfertfiles.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* Copyright (C) hdsdi3g for hd3g.tv 2021
*
*/
package tv.hd3g.transfertfiles;

import java.nio.ByteBuffer;
import java.util.List;

@FunctionalInterface
public interface DataExchangeFilter {

/**
* @param dataSource in read-only
* @return empty for "ignore" filter/data collector filter
* null for stop data transfert operation
* returned ByteBuffers can be re-used here (processed internaly in read-ony)
*/
List<ByteBuffer> applyDataFilter(ByteBuffer dataSource);

/**
* Next applyDataFilter call will be the last
*/
default void close() {
}

default String getFilterName() {
return null;
}

}
50 changes: 48 additions & 2 deletions src/main/java/tv/hd3g/transfertfiles/DataExchangeInOutStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -34,13 +39,15 @@ public class DataExchangeInOutStream {
private final InternalInputStream internalInputStream;
private final InternalOutputStream internalOutputStream;

private final ByteBuffer buffer;
private final List<DataExchangeFilter> filters;
private final ByteBuffer buffer;// TODO create write buffer + buffer queue
private volatile boolean stopped;
private volatile boolean inWrite;
private volatile boolean inWrite;// TODO replace by consume queue aproach

public DataExchangeInOutStream(final int bufferSize) {
internalInputStream = new InternalInputStream();
internalOutputStream = new InternalOutputStream();
filters = Collections.synchronizedList(new ArrayList<>());
stopped = false;
inWrite = true;
buffer = ByteBuffer.allocate(bufferSize);
Expand Down Expand Up @@ -74,6 +81,8 @@ public int read(final byte[] b, final int off, final int len) throws IOException
return -1;
}

// TODO wait queue items and for next item all this:

final var toRead = Math.min(buffer.remaining(), len);
if (toRead == 0) {
inWrite = true;
Expand Down Expand Up @@ -145,6 +154,7 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
while (inWrite == false) {
Thread.onSpinWait();
}
// TODO wait queue empty for next read, or with 1 item ? Or more ?
buffer.clear();

final var writedOnLoop = Math.min(toWrite, buffer.remaining());
Expand Down Expand Up @@ -183,6 +193,38 @@ public void close() throws IOException {

}

private class StoppedByFilter extends RuntimeException {
StoppedByFilter(final DataExchangeFilter filter) {
super(Optional.ofNullable(filter.getFilterName()).orElse(filter.getClass().getName()));
}
}

private void processFilters() {// TODO catch StoppedByFilter
if (internalOutputStream.closed) {
filters.forEach(DataExchangeFilter::close);
}

var nextBuffers = List.of(buffer.asReadOnlyBuffer());
for (var pos = 0; pos < filters.size(); pos++) {
final var currentFilter = filters.get(pos);
nextBuffers = nextBuffers.stream()
.map(currentBuffer -> {
final var filterResult = currentFilter.applyDataFilter(currentBuffer);
if (filterResult == null) {
throw new StoppedByFilter(currentFilter);
} else if (filterResult.isEmpty()) {
return List.of(currentBuffer);
}
return filterResult;
})
.flatMap(List::stream)
.map(ByteBuffer::asReadOnlyBuffer)
.collect(Collectors.toUnmodifiableList());
}

// XXX push nextBuffers to main read list
}

public int getBufferSize() {
return buffer.capacity();
}
Expand All @@ -209,4 +251,8 @@ public synchronized boolean isStopped() {
public synchronized void stop() {
stopped = true;
}

public List<DataExchangeFilter> getFilters() {// TODO test
return filters;
}
}

0 comments on commit 6052355

Please sign in to comment.