Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
  • 3 commits
  • 2 files changed
  • 0 comments
  • 1 contributor
263  src/org/jgroups/util/BlockingInputStream.java
... ...
@@ -0,0 +1,263 @@
  1
+package org.jgroups.util;
  2
+
  3
+import org.jgroups.annotations.GuardedBy;
  4
+
  5
+import java.io.IOException;
  6
+import java.io.InputStream;
  7
+import java.util.concurrent.locks.Condition;
  8
+import java.util.concurrent.locks.Lock;
  9
+import java.util.concurrent.locks.ReentrantLock;
  10
+
  11
+/**
  12
+ * Bounded input stream. A consumer reads bytes until the end of the stream is reached, or the input stream is closed.
  13
+ * The producer writes bytes to the tail and blocks if the capacity has been reached (until the consumer reads more bytes).
  14
+ * <p/>
  15
+ * This class is for only 1 producer and 1 consumer; multiple producers/consumers will most likely yield incorrect
  16
+ * results !
  17
+ * <p/>
  18
+ * Note that the implementation of this class is optimized for reading and adding a few K at a time; performance will
  19
+ * be suboptimal if single bytes are added and read.
  20
+ * @author Bela Ban
  21
+ * @since 2.12.2
  22
+ */
  23
+public class BlockingInputStream extends InputStream {
  24
+    /** Set to true when {@link #close()} is called */
  25
+    protected boolean         closed=false;
  26
+
  27
+    /** The transfer buffer */
  28
+    protected final byte[]    buf;
  29
+
  30
+    /** Index into buf at which the next bytes will be read. Has to be between 0 and buf.length-1 */
  31
+    protected int             read_pos=0;
  32
+
  33
+    /** Index into buf at which bytes will be written. Has to be between 0 and buf.length-1 */
  34
+    protected int             write_pos=0;
  35
+
  36
+    protected final Lock      lock=new ReentrantLock();
  37
+
  38
+    /** Signalled when the buf becomes 'writeable'; ie. is not full anymore */
  39
+    protected final Condition not_full=lock.newCondition();
  40
+
  41
+    /** Signalled when the buf becomes 'readable'; ie. is not empty anymore */
  42
+    protected final Condition not_empty=lock.newCondition();
  43
+
  44
+
  45
+    public BlockingInputStream() {
  46
+        this(100000);
  47
+    }
  48
+
  49
+    public BlockingInputStream(int capacity) {
  50
+        buf=new byte[capacity];
  51
+    }
  52
+
  53
+
  54
+
  55
+
  56
+
  57
+    /** {@inheritDoc} */
  58
+    public int read() throws IOException {
  59
+        lock.lock();
  60
+        try {
  61
+            while(true) {
  62
+                if(read_pos < write_pos) {
  63
+                    int retval=buf[read_pos++];
  64
+                    not_full.signal();
  65
+                    return retval;
  66
+                }
  67
+                if(closed)
  68
+                    return -1; // EOF
  69
+                try {
  70
+                    not_empty.await();
  71
+                }
  72
+                catch(InterruptedException e) {
  73
+                }
  74
+            }
  75
+        }
  76
+        finally {
  77
+            lock.unlock();
  78
+        }
  79
+    }
  80
+
  81
+
  82
+    /** {@inheritDoc} */
  83
+    public int read(byte[] b) throws IOException {
  84
+        return read(b, 0, b.length);
  85
+    }
  86
+
  87
+    /** {@inheritDoc} */
  88
+    public int read(byte[] b, int off, int len) throws IOException {
  89
+        sanityCheck(b, off, len);
  90
+
  91
+        lock.lock();
  92
+        try {
  93
+            while(true) {
  94
+                if(read_pos < write_pos) {
  95
+                    int bytes_to_read=Math.min(len, size());
  96
+                    System.arraycopy(buf, read_pos, b, off, bytes_to_read);
  97
+                    read_pos+=bytes_to_read;
  98
+                    not_full.signal();
  99
+                    return bytes_to_read;
  100
+                }
  101
+                if(closed)
  102
+                    return -1; // EOF
  103
+                try {
  104
+                    not_empty.await();
  105
+                }
  106
+                catch(InterruptedException e) {
  107
+                }
  108
+            }
  109
+        }
  110
+        finally {
  111
+            lock.unlock();
  112
+        }
  113
+    }
  114
+
  115
+    /**
  116
+     * Appends bytes to the end of the stream
  117
+     * @param buf
  118
+     * @throws IOException
  119
+     * @see #write(byte[],int,int) 
  120
+     */
  121
+    public void write(byte[] buf) throws IOException {
  122
+        if(buf != null)
  123
+            write(buf, 0, buf.length);
  124
+    }
  125
+
  126
+    /**
  127
+     * Appends bytes to the end of the stream. If the number of bytes to be written is greater than the remaining
  128
+     * capacity, write() will block until the bytes can be added, or the stream is closed.
  129
+     * <p/>
  130
+     * This method will try to append partial buffers to the stream, e.g. if the remaining capacity is 2K, but the
  131
+     * length of the buffer is 5K, 2K will be written and then write() will block until the remaining 3K can be
  132
+     * added.
  133
+     * @param buf The buffer to be added to the end of the stream
  134
+     * @param offset The offset within buf at which bytes are read
  135
+     * @param length The number of bytes to be added
  136
+     * @throws IOException
  137
+     */
  138
+    public void write(byte[] buf, int offset, int length) throws IOException {
  139
+        if(buf == null) return;
  140
+        sanityCheck(buf, offset, length);
  141
+
  142
+        lock.lock();
  143
+        try {
  144
+            if(closed) return;
  145
+            int bytes_written=0;
  146
+            while(bytes_written < length && !closed) {
  147
+
  148
+                if(read_pos > 0 && length - bytes_written > remaining())
  149
+                    compact();
  150
+
  151
+                if(remaining() > 0) {
  152
+                    int bytes_to_write=Math.min(length-bytes_written, remaining());
  153
+                    System.arraycopy(buf, offset+bytes_written, this.buf, write_pos, bytes_to_write);
  154
+                    write_pos+=bytes_to_write;
  155
+                    bytes_written+=bytes_to_write;
  156
+                    not_empty.signal();
  157
+                }
  158
+                else {
  159
+                    try {
  160
+                        not_full.await();
  161
+                    }
  162
+                    catch(InterruptedException e) {
  163
+                    }
  164
+                }
  165
+            }
  166
+        }
  167
+        finally {
  168
+            lock.unlock();
  169
+        }
  170
+    }
  171
+
  172
+
  173
+
  174
+
  175
+    public long skip(long n) throws IOException {
  176
+        throw new IOException("skip() not supported");
  177
+    }
  178
+
  179
+    /** {@inheritDoc} */
  180
+    public int available() throws IOException {
  181
+        lock.lock();
  182
+        try {
  183
+            return size();
  184
+        }
  185
+        finally {
  186
+            lock.unlock();
  187
+        }
  188
+    }
  189
+
  190
+
  191
+    public int capacity() {
  192
+        return buf.length;
  193
+    }
  194
+
  195
+    /**
  196
+     * Closes the stream. Writes to a closed stream will fail, reads will successfully read the bytes that are already
  197
+     * in the buffer and then return -1 (EOF)
  198
+     * @throws IOException
  199
+     */
  200
+    public void close() throws IOException {
  201
+        lock.lock();
  202
+        try {
  203
+            if(closed) return;
  204
+            closed=true;
  205
+            not_empty.signal();
  206
+            not_full.signal();
  207
+        }
  208
+        finally {
  209
+            lock.unlock();
  210
+        }
  211
+    }
  212
+
  213
+    public String toString() {
  214
+        return size() + "/" + capacity() + ", size=" + size() + ", remaining=" + remaining() + (closed? " (closed)" : "");
  215
+    }
  216
+
  217
+    @GuardedBy("lock")
  218
+    protected int size() {
  219
+        return write_pos-read_pos;
  220
+    }
  221
+
  222
+    @GuardedBy("lock")
  223
+    protected int remaining() {
  224
+        return buf.length - write_pos;
  225
+    }
  226
+
  227
+
  228
+
  229
+    /**
  230
+     *  Moves the bytes between [read_pos and write_pos] read_pos bytes to the left, such that the new read_pos is 0
  231
+     *  and the write_pos is write_pos - read_pos. Lock must be held.
  232
+     */
  233
+    @GuardedBy("lock")
  234
+    protected void compact() {
  235
+        if(read_pos == 0)
  236
+            return;
  237
+        if(size() == 0) {
  238
+            read_pos=write_pos=0;
  239
+            return;
  240
+        }
  241
+        int length=write_pos - read_pos;
  242
+        System.arraycopy(buf, read_pos, buf, 0, length);
  243
+        write_pos-=read_pos;
  244
+        read_pos=0;
  245
+        not_full.signal();
  246
+    }
  247
+
  248
+
  249
+    /**
  250
+     * Verifies that length doesn't exceed a buffer's length
  251
+     * @param buf
  252
+     * @param offset
  253
+     * @param length
  254
+     */
  255
+    protected static void sanityCheck(byte[] buf, int offset, int length) {
  256
+        if(buf == null) throw new NullPointerException("buffer is null");
  257
+        if(offset + length > buf.length)
  258
+            throw new ArrayIndexOutOfBoundsException("length (" + length + ") + offset (" + offset +
  259
+                                                       ") > buf.length (" + buf.length + ")");
  260
+    }
  261
+
  262
+
  263
+}
241  tests/junit-functional/org/jgroups/tests/BlockingInputStreamTest.java
... ...
@@ -0,0 +1,241 @@
  1
+package org.jgroups.tests;
  2
+
  3
+import org.jgroups.Global;
  4
+import org.jgroups.util.BlockingInputStream;
  5
+import org.jgroups.util.Util;
  6
+import org.testng.annotations.Test;
  7
+
  8
+import java.io.IOException;
  9
+import java.io.InputStream;
  10
+import java.util.concurrent.CountDownLatch;
  11
+
  12
+/**
  13
+ * Tests {@link org.jgroups.util.BlockingInputStream}
  14
+ * @author Bela Ban
  15
+ */
  16
+@Test(groups=Global.FUNCTIONAL,sequential=false)
  17
+public class BlockingInputStreamTest {
  18
+
  19
+    
  20
+    public void testCreation() throws IOException {
  21
+        BlockingInputStream in=new BlockingInputStream(2000);
  22
+        System.out.println("in = " + in);
  23
+        assert in.available() == 0 && in.capacity() == 2000;
  24
+
  25
+        in.write(new byte[]{'b', 'e', 'l', 'a'});
  26
+        System.out.println("in = " + in);
  27
+        assert in.available() == 4 && in.capacity() == 2000;
  28
+    }
  29
+
  30
+
  31
+    public void testRead() throws IOException {
  32
+        final BlockingInputStream in=new BlockingInputStream(100);
  33
+        byte[] input=new byte[]{'B', 'e', 'l', 'a'};
  34
+        in.write(input);
  35
+        in.close();
  36
+
  37
+        assert in.available() == 4;
  38
+        for(int i=0; i < input.length; i++) {
  39
+            int b=in.read();
  40
+            assert b == input[i];
  41
+        }
  42
+        int b=in.read();
  43
+        assert b == -1;
  44
+    }
  45
+
  46
+
  47
+    public void testBlockingReadAndClose() throws IOException {
  48
+        final BlockingInputStream in=new BlockingInputStream(100);
  49
+        final CountDownLatch latch=new CountDownLatch(1);
  50
+        byte[] buf=new byte[100];
  51
+        
  52
+        new Closer(latch, in, 1000L).start(); // closes input stream after 1 sec
  53
+        latch.countDown();
  54
+        int num=in.read(buf, 0, buf.length);
  55
+        assert num == -1 : " expected -1 (EOF) but got " + num;
  56
+    }
  57
+
  58
+    
  59
+    public void testBlockingWriteAndClose() throws IOException {
  60
+        final BlockingInputStream in=new BlockingInputStream(3);
  61
+        final CountDownLatch latch=new CountDownLatch(1);
  62
+        byte[] buf=new byte[]{'B', 'e', 'l', 'a'};
  63
+
  64
+        new Closer(latch, in, 1000L).start(); // closes input stream after 1 sec
  65
+        latch.countDown();
  66
+        in.write(buf, 0, buf.length);
  67
+    }
  68
+
  69
+
  70
+    public void testReadOnClosedInputStream() throws IOException {
  71
+        final BlockingInputStream in=new BlockingInputStream(100);
  72
+        in.close();
  73
+        byte[] buf=new byte[100];
  74
+        int num=in.read(buf, 0, buf.length);
  75
+        assert num == -1 : " expected -1 (EOF) but got " + num;
  76
+    }
  77
+
  78
+    
  79
+    public void testWriteCloseRead() throws IOException {
  80
+        final BlockingInputStream in=new BlockingInputStream(100);
  81
+        for(int i=1; i <= 5; i++) {
  82
+            byte[] buf=("Hello world " + i).getBytes();
  83
+            in.write(buf);
  84
+        }
  85
+        in.close();
  86
+
  87
+        int size=in.available();
  88
+        byte[] buf=new byte[100];
  89
+        int num=in.read(buf);
  90
+        assert num == size;
  91
+    }
  92
+
  93
+
  94
+    public void testWriteCloseRead2() throws IOException {
  95
+        final BlockingInputStream in=new BlockingInputStream(100);
  96
+        StringBuilder sb=new StringBuilder();
  97
+        for(int i=1; i <=10; i++)
  98
+            sb.append("Hello world " + i);
  99
+        byte[] buffer=sb.toString().getBytes();
  100
+        new Writer(in, buffer).start();
  101
+
  102
+        Util.sleep(500);
  103
+        int size=in.available();
  104
+        byte[] buf=new byte[200];
  105
+        int num=in.read(buf);
  106
+        assert num == size;
  107
+    }
  108
+
  109
+    
  110
+    public void testSimpleTransfer() throws IOException {
  111
+        final BlockingInputStream in=new BlockingInputStream(100);
  112
+        byte[] buffer=new byte[500];
  113
+        for(int i=0; i < buffer.length; i++)
  114
+            buffer[i]=(byte)(i % 2 == 0? 0 : 1);
  115
+        new Writer(in, buffer).start();
  116
+        
  117
+        byte[] tmp=new byte[500];
  118
+        int offset=0;
  119
+        while(true) {
  120
+            int bytes=in.read(tmp, offset, tmp.length - offset);
  121
+            if(bytes == -1)
  122
+                break;
  123
+            offset+=bytes;
  124
+        }
  125
+        System.out.println("read " + offset + " bytes");
  126
+        assert offset == 500 : "offset is " + offset + " but expected 500";
  127
+        for(int i=0; i < tmp.length; i++) {
  128
+            if(i % 2 == 0)
  129
+                assert tmp[i] == 0;
  130
+            else
  131
+                assert tmp[i] == 1;
  132
+        }
  133
+    }
  134
+
  135
+
  136
+    public void testLargeTransfer() throws IOException {
  137
+        final BlockingInputStream in=new BlockingInputStream(2048);
  138
+        final byte[] buffer=generateBuffer(100000);
  139
+        new Writer(in, buffer).start();
  140
+        byte[] tmp=new byte[buffer.length];
  141
+        int offset=0;
  142
+        while(true) {
  143
+            int bytes=in.read(tmp, offset, tmp.length - offset);
  144
+            if(bytes == -1)
  145
+                break;
  146
+            offset+=bytes;
  147
+        }
  148
+        System.out.println("read " + offset + " bytes");
  149
+        assert offset == buffer.length : "offset is " + offset + " but expected " + buffer.length;
  150
+        System.out.print("Verifying that the buffers are the same: ");
  151
+        for(int i=0; i < tmp.length; i++)
  152
+            assert buffer[i] == tmp[i];
  153
+        System.out.println("OK");
  154
+    }
  155
+
  156
+
  157
+    public void testWriteExceedingCapacity() throws IOException {
  158
+        final BlockingInputStream in=new BlockingInputStream(10);
  159
+        new Thread() {
  160
+            public void run() {
  161
+                byte[] tmp=new byte[20];
  162
+                int num=0;
  163
+                try {
  164
+                    while(true) {
  165
+                        int read=in.read(tmp);
  166
+                        if(read == -1)
  167
+                            break;
  168
+                        num+=read;
  169
+                    }
  170
+                    System.out.println("read " + num + " bytes");
  171
+                }
  172
+                catch(IOException e) {
  173
+                    e.printStackTrace();
  174
+                }
  175
+            }
  176
+        }.start();
  177
+
  178
+        byte[] buffer=new byte[15];
  179
+        try {
  180
+            in.write(buffer);
  181
+        }
  182
+        finally {
  183
+            Util.close(in);
  184
+        }
  185
+    }
  186
+
  187
+
  188
+    protected byte[] generateBuffer(int size) {
  189
+        byte[] buf=new byte[size];
  190
+        for(int i=0; i < buf.length; i++)
  191
+            buf[i]=(byte)(Util.random(size) % Byte.MAX_VALUE);
  192
+        return buf;
  193
+    }
  194
+
  195
+
  196
+    protected static final class Closer extends Thread {
  197
+        protected final CountDownLatch latch;
  198
+        protected final InputStream in;
  199
+        protected final long timeout;
  200
+
  201
+        public Closer(CountDownLatch latch, InputStream in, long timeout) {
  202
+            this.latch=latch;
  203
+            this.in=in;
  204
+            this.timeout=timeout;
  205
+        }
  206
+
  207
+        public void run() {
  208
+            try {
  209
+                latch.await();
  210
+                Util.sleep(timeout);
  211
+                in.close();
  212
+            }
  213
+            catch(Exception e) {
  214
+                e.printStackTrace();
  215
+            }
  216
+        }
  217
+    }
  218
+
  219
+    protected static final class Writer extends Thread {
  220
+        protected final BlockingInputStream in;
  221
+        protected final byte[] buffer;
  222
+
  223
+        public Writer(BlockingInputStream in, byte[] buffer) {
  224
+            this.in=in;
  225
+            this.buffer=buffer;
  226
+        }
  227
+
  228
+        public void run() {
  229
+            try {
  230
+                in.write(buffer);
  231
+            }
  232
+            catch(IOException e) {
  233
+            }
  234
+            finally {
  235
+                Util.close(in);
  236
+            }
  237
+        }
  238
+    }
  239
+
  240
+
  241
+}

No commit comments for this range

Something went wrong with that request. Please try again.