Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem with ringbuffer.readManyAsync and minSize is 0 #6787

Closed
pveentjer opened this issue Nov 19, 2015 · 0 comments

Comments

Projects
None yet
1 participant
@pveentjer
Copy link
Member

commented Nov 19, 2015

Hello,

I'm a bit stumped by the non-blocking behaviour of readManyAsync method (Hazelcast 3.5).

If I want a non-blocking call (minCount = 0) :

ReadResultSet rs = rb.readManyAsync(sequence, 0, 10, null).get();

the ReadResultSet NEVER returns anything.

If I make the readManyAsync call blocking with a minCount > 0, then it works fine.

ReadResultSet rs = rb.readManyAsync(sequence, 1, 10, null).get();

(If I want an interruptible thread, I can also add a timeout on the get, it works fine too) :

ReadResultSet rs = rb.readManyAsync(seq.get(), 1, 10, null).get(500, TimeUnit.MILLISECONDS);

But if I specify a minCount of 0, it never reads anything.

I attach a minimal test case, but really wondering what I'm missing here, any help appreciated :-)

RingbufferTest .java

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.Test;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;


public class RingbufferTest {

    @Test
    public void test() throws InterruptedException, ExecutionException {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        Ringbuffer<String> rb = hz.getRingbuffer("testrb");


        final StringBuffer result = new StringBuffer();
        final AtomicLong seq = new AtomicLong(rb.headSequence());
        final AtomicBoolean running = new AtomicBoolean(true);

        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                while(running.get()){
                    System.out.print("readManyAsync...");
                    ReadResultSet<String> rs;
                    try {
                                                // 
                        rs = rb.readManyAsync(seq.get(), 0, 10, null).get();
                        rs.forEach((str) ->{
                            result.append(str);
                            // stop the thread
                            running.set(false);
                        });
                        System.out.println(rs.readCount());
                        seq.addAndGet(rs.readCount());
                        Thread.sleep(500);
                    } catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                    } 
                }

            }

        });
        rb.add("string1");
        assertEquals(1,rb.size());

        t.start();

        Thread.sleep(5000);
        System.out.print("Stopping...");
        // stop the Thread
        running.set(false);
        Thread.sleep(1000);
        assertTrue(result.length() > 0);
        hz.shutdown();

    }

}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.