Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source-Sink issues #97

Closed
puiuvlad opened this issue Nov 6, 2014 · 10 comments
Closed

Source-Sink issues #97

puiuvlad opened this issue Nov 6, 2014 · 10 comments
Assignees
Labels

Comments

@puiuvlad
Copy link

puiuvlad commented Nov 6, 2014

Hi Peter,

Indeed, reading from a sink and writing to a local chronicle on a blocking thread, and then reading from the local chronicle (non blocking) in a tight loop seems a good idea.

However, I am having trouble implementing it: it seems that in certain circumstances VolatileExcerptTailer.nextIndex() is reading two records when only one is published on its port (the second record may come from another sink on a different port?).

I have built a simplified test case that exemplifies this issue (see the attached file). The application consists of three processes: P1, P2 and P3. One needs to start P2 and P3 first in any order and then P1. The flow is as follows:

(1) in a loop, P1 writes a record to chr1 and waits to read a record from chr5.
(2) P2 reads a record from chr1 and writes it to a source chr2 on port2. On a separate thread, a NonBlockingSink reads from a source on port5 and writes to chr5.
(3) in its NonBlockingSink, on a separate thread, P3 reads from a source on port3 (same as port2) and writes to chr3. On its main thread, in a tight loop, P3 reads from chr3 and writes to source chr4 on port4 (same as port5).

P1, P2, and P3 are all running on the same machine, but the intent is to have P1 and P2 on one host and P3 on a different host (that's why P2 and P3 communicate via source/sink pairs).

P1 is supposed to send one record, wait until it reads it coming back, send another record and so on, for a total of 10 records. However, it receives only 1 record and then blocks.

I added some tracing to ChronicleSink.VolatileExcerptTailer.nextIndex() (see below) and I get the following trace in P3 (but a similar trace is obtained in P2):

excerptSize=20 Zamolxis/127.0.0.1:15311
receivedIndex=0 Zamolxis/127.0.0.1:15311 Thread-1
positionAddr=188696728
reading from sink 15311
read from sink 15311 { long=1000 string=foo bar int=0 }
wrote to local { long=1000 string=foo bar int=0 }
excerptSize=1000 Zamolxis/127.0.0.1:15311
receivedIndex=8029748840875687936 Zamolxis/127.0.0.1:15311 Thread-1

Notice the second excerptSize=1000 (I only send small messages) and receivedIndex=8029748840875687936 (while the first index was 0).

Am I doing something wrong? I am using version 3.2.6-SNAPSHOT of chronicle.

Thanks,
Vladimir

            int excerptSize = buffer.getInt();
            long receivedIndex = buffer.getLong();

            System.out.println("excerptSize="+excerptSize+" "+address);

            switch (excerptSize) {
                case ChronicleTcp.IN_SYNC_LEN:
                case ChronicleTcp.PADDED_LEN:
                case ChronicleTcp.SYNC_IDX_LEN:
                    return false;
            }

            System.out.println("receivedIndex="+receivedIndex+" "+address+" "+Thread.currentThread().getName());

            if (excerptSize > 128 << 20 || excerptSize < 0) {
                throw new StreamCorruptedException("Size was " + excerptSize);
            }

            if(buffer.remaining() < excerptSize) {
                if(!connector.read(excerptSize)) {
                    return false;
                }
            }

            index = receivedIndex;
            positionAddr = startAddr + buffer.position();
            limitAddr = positionAddr + excerptSize;
            lastSize = excerptSize;
            finished = false;
            System.out.println("positionAddr="+positionAddr);
@puiuvlad
Copy link
Author

puiuvlad commented Nov 6, 2014

test.artifact.zip is on Google groups:

https://groups.google.com/forum/#!topic/java-chronicle/xA5jC00XfP8

@puiuvlad
Copy link
Author

puiuvlad commented Nov 6, 2014

One more thing: when I change all IndexedChronicles to VanillaChronicles, P3 never gets any message from P2 on its sink. Am I doing something wrong?

@lburgazzoli
Copy link
Contributor

A question about the use case : is P2 supposed to do something or it is only a proxy to expose the Chronicle-Queues used by P1 to P3?

@puiuvlad
Copy link
Author

puiuvlad commented Nov 7, 2014

P2 is a concentrator of messages from multiple P1s.

On Nov 7, 2014, at 1:50 AM, lburgazzoli notifications@github.com wrote:

A question about the use case : is P2 supposed to do something or it is only a proxy to expose the Chronicle-Queues used by P1 to P3?


Reply to this email directly or view it on GitHub.

@lburgazzoli
Copy link
Contributor

The problem comes from Msg#read(ExcertpTailer) which does not call ExcertpTailer.finish() which is absolutely needed as it moves the cursor to the next entry.

I'll make VolatileExcerptTailer.nextIndex() automatically call finish() if needed.

@puiuvlad
Copy link
Author

Works now, both IndexedChronicle and VanillaChronicle.

Thanks a lot Luca!

@puiuvlad puiuvlad reopened this Nov 11, 2014
@puiuvlad
Copy link
Author

There's still is a minor issue: with VanillaChronicle, when you start with no chronicle files (e.g., delete the ../chr/h1 and ../chr/h2 directories) P1 sends one message to P2 and does not receive anything back from P3. If you stop all processes and restart them then it works. It's as if some logic fails if there is no existing chronicle somewhere.

@lburgazzoli
Copy link
Contributor

I will have a look

@lburgazzoli
Copy link
Contributor

Problem is located in NonBlockingSink's constructor

_sink = new ChronicleSink(host, port);
_tailer = _sink.createTailer();
_tailer.toEnd();

the call _tailer.toEnd() seems the source of the problms, I'm gonna check it.

A few notes:

@lburgazzoli lburgazzoli self-assigned this Nov 26, 2014
@lburgazzoli
Copy link
Contributor

This should have been solved by 3.3.0-SNAPSHOT

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants