From 6052355c333b8340da40dc2a18d21cae76476b94 Mon Sep 17 00:00:00 2001 From: hdsdi3g Date: Wed, 13 Jan 2021 13:19:47 +0100 Subject: [PATCH] Create DataExchangeFilter and pipeline filter processing for #26 --- .../transfertfiles/DataExchangeFilter.java | 43 ++++++++++++++++ .../DataExchangeInOutStream.java | 50 ++++++++++++++++++- 2 files changed, 91 insertions(+), 2 deletions(-) create mode 100644 src/main/java/tv/hd3g/transfertfiles/DataExchangeFilter.java diff --git a/src/main/java/tv/hd3g/transfertfiles/DataExchangeFilter.java b/src/main/java/tv/hd3g/transfertfiles/DataExchangeFilter.java new file mode 100644 index 0000000..d7f250b --- /dev/null +++ b/src/main/java/tv/hd3g/transfertfiles/DataExchangeFilter.java @@ -0,0 +1,43 @@ +/* + * This file is part of transfertfiles. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2021 + * + */ +package tv.hd3g.transfertfiles; + +import java.nio.ByteBuffer; +import java.util.List; + +@FunctionalInterface +public interface DataExchangeFilter { + + /** + * @param dataSource in read-only + * @return empty for "ignore" filter/data collector filter + * null for stop data transfert operation + * returned ByteBuffers can be re-used here (processed internaly in read-ony) + */ + List applyDataFilter(ByteBuffer dataSource); + + /** + * Next applyDataFilter call will be the last + */ + default void close() { + } + + default String getFilterName() { + return null; + } + +} diff --git a/src/main/java/tv/hd3g/transfertfiles/DataExchangeInOutStream.java b/src/main/java/tv/hd3g/transfertfiles/DataExchangeInOutStream.java index 773ef04..864a23b 100644 --- a/src/main/java/tv/hd3g/transfertfiles/DataExchangeInOutStream.java +++ b/src/main/java/tv/hd3g/transfertfiles/DataExchangeInOutStream.java @@ -20,7 +20,12 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,13 +39,15 @@ public class DataExchangeInOutStream { private final InternalInputStream internalInputStream; private final InternalOutputStream internalOutputStream; - private final ByteBuffer buffer; + private final List filters; + private final ByteBuffer buffer;// TODO create write buffer + buffer queue private volatile boolean stopped; - private volatile boolean inWrite; + private volatile boolean inWrite;// TODO replace by consume queue aproach public DataExchangeInOutStream(final int bufferSize) { internalInputStream = new InternalInputStream(); internalOutputStream = new InternalOutputStream(); + filters = Collections.synchronizedList(new ArrayList<>()); stopped = false; inWrite = true; buffer = ByteBuffer.allocate(bufferSize); @@ -74,6 +81,8 @@ public int read(final byte[] b, final int off, final int len) throws IOException return -1; } + // TODO wait queue items and for next item all this: + final var toRead = Math.min(buffer.remaining(), len); if (toRead == 0) { inWrite = true; @@ -145,6 +154,7 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti while (inWrite == false) { Thread.onSpinWait(); } + // TODO wait queue empty for next read, or with 1 item ? Or more ? buffer.clear(); final var writedOnLoop = Math.min(toWrite, buffer.remaining()); @@ -183,6 +193,38 @@ public void close() throws IOException { } + private class StoppedByFilter extends RuntimeException { + StoppedByFilter(final DataExchangeFilter filter) { + super(Optional.ofNullable(filter.getFilterName()).orElse(filter.getClass().getName())); + } + } + + private void processFilters() {// TODO catch StoppedByFilter + if (internalOutputStream.closed) { + filters.forEach(DataExchangeFilter::close); + } + + var nextBuffers = List.of(buffer.asReadOnlyBuffer()); + for (var pos = 0; pos < filters.size(); pos++) { + final var currentFilter = filters.get(pos); + nextBuffers = nextBuffers.stream() + .map(currentBuffer -> { + final var filterResult = currentFilter.applyDataFilter(currentBuffer); + if (filterResult == null) { + throw new StoppedByFilter(currentFilter); + } else if (filterResult.isEmpty()) { + return List.of(currentBuffer); + } + return filterResult; + }) + .flatMap(List::stream) + .map(ByteBuffer::asReadOnlyBuffer) + .collect(Collectors.toUnmodifiableList()); + } + + // XXX push nextBuffers to main read list + } + public int getBufferSize() { return buffer.capacity(); } @@ -209,4 +251,8 @@ public synchronized boolean isStopped() { public synchronized void stop() { stopped = true; } + + public List getFilters() {// TODO test + return filters; + } }