Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IndexedChronicle TCP replication not working #101

Closed
postol opened this issue Nov 20, 2014 · 1 comment
Closed

IndexedChronicle TCP replication not working #101

postol opened this issue Nov 20, 2014 · 1 comment
Assignees
Labels

Comments

@postol
Copy link

postol commented Nov 20, 2014

Below is the code for Master.java and Slave.java. As you can see master just creates an IndexedChronicle wrapped by ChronicleSource and starts publishing some data. Slave creates its separate IndexedChronicle wrapped by ChronicleSink and starts listening for data. Everything works fine until you stop the slave after it has received some data and then start it again ... in such scenario the slave is not able to reconnect and continue with message processing because it seems the initial index check is always shifted by one ... please see the exception below:

2014-11-18 18:13:41.700 [INFO] - Connected to localhost/127.0.0.1:7777
2014-11-18 18:13:41.716 [INFO] - Lost connection to localhost/127.0.0.1:7777 retrying
java.io.StreamCorruptedException: Expected index 41 but got 40
at net.openhft.chronicle.tcp.ChronicleSink$PersistentIndexedSinkExcerpt.readNextExcerpt(ChronicleSink.java:491)
at net.openhft.chronicle.tcp.ChronicleSink$PersistentIndexedSinkExcerpt.readNextExcerpt(ChronicleSink.java:484)
at net.openhft.chronicle.tcp.ChronicleSink$AbstractPersistentSinkExcerpt.readNext(ChronicleSink.java:391)
at net.openhft.chronicle.tcp.ChronicleSink$AbstractPersistentSinkExcerpt.nextIndex(ChronicleSink.java:346)
at org.dett.tests.chronicle.Slave$Reader.run(Slave.java:31)
at java.lang.Thread.run(Thread.java:745)

Steps to reproduce:

  1. start master
  2. start slave
  3. hit enter in master process to start publishing data
  4. stop/kill slave
  5. start slave again

If we change IndexedChronicle to VanillaChronicle everything seems to work as expected.

I'm testing on Windows 8.1 x64 ... tried both last published version to maven central (3.2.5) and current snapshot (3.2.6-SNAPSHOT).

public class Master {

    public static void main(String[] args) throws IOException, InterruptedException {
        final Chronicle source = new ChronicleSource(new IndexedChronicle("./master"), 7777);                                       
        ExcerptAppender excerpt = source.createAppender();
        ConsoleUtils.waitForEnter("Hit <enter> to start sending ...");
        for (int i = 1; i <= 1000; i++) {
            // use a size which will cause mis-alignment.
            excerpt.startExcerpt();
            excerpt.writeLong(i);
            excerpt.append(' ');
            excerpt.append(i);
            excerpt.append('\n');
            excerpt.finish();
            Thread.sleep(100);                       
        }
        System.out.println("Finished writing messages ...");            
        ConsoleUtils.waitForEnter("Hit <enter> to close source ...");        
        source.close();
    }
}

public class Slave {

    private static final int PORT = 7777;   
    private static final AtomicBoolean reading = new AtomicBoolean(true);

    public static void main(String[] args) throws IOException, InterruptedException {                                                         
        Thread t = new Thread(new Reader());
        t.start();      
        ConsoleUtils.waitForEnter("Hit <enter> to stop reading ...");       
        reading.set(false);
        ConsoleUtils.waitForEnter("Hit <enter> to terminate ...");                     
    }

    private static class Reader implements Runnable {           
        @Override
        public void run() {          
            try {
                Chronicle sink = new ChronicleSink(new IndexedChronicle("./slave"), "localhost", PORT);                 
                ExcerptTailer excerpt = sink.createTailer();                
                while (reading.get()) {                 
                    while (!excerpt.nextIndex()) { }                    
                    long n = excerpt.readLong();
                    System.out.println(String.format("Message %s received", n));
                    excerpt.finish();                   
                }
                System.out.println("Reading stopped ...");
                sink.close();
            } catch (IOException e) {               
                e.printStackTrace();
            }
        }
    };
}
@lburgazzoli lburgazzoli self-assigned this Nov 29, 2014
@lburgazzoli
Copy link
Contributor

This should have been solved by 3.3.0-SNAPSHOT

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants