Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to XREAD last element #19

Open
sedyh opened this issue Feb 28, 2024 · 6 comments
Open

How to XREAD last element #19

sedyh opened this issue Feb 28, 2024 · 6 comments

Comments

@sedyh
Copy link

sedyh commented Feb 28, 2024

As far as I understand, gtrs consumer reads data via xread, I would like to be able to read the last element of the stream. As far as I understand, this cannot be done now, right?
redis/redis#7388

Reads all records:

NewConsumer[Event](ctx, rdb, StreamIDs{"my-stream": "0-0"})

Reads only new records:

NewConsumer[Event](ctx, rdb, StreamIDs{"my-stream": "$"})
@sedyh
Copy link
Author

sedyh commented Mar 12, 2024

Hello, any update on this?

@dranikpg
Copy link
Owner

Hi. Sorry this got lost in the pile of incoming notifications 😓

Wow, this is a really new feature 😮 Currently, gtrs code is as follows:

for {
  entry = read(lastId)
  chan <- entry
  lastId = entry.Id
}

So it works both when you specify $ and id-id. Theoretically it should also work with +, as after reading the last entry you'll fetch its id and then continue reading from it.


that's right, IIUC the user will use + only on the first call, and then use $ on later calls. the first call will only block if the stream is empty.

Yet I also don't understand this comment from oranagra.... If I use $ after +, it means that I can skip an entry that was added in-between reading + and $ 🤷🏻


Can you please try it out? If not, I'll publish a fix to what the issue is 🙂

@sedyh
Copy link
Author

sedyh commented Mar 18, 2024

Hello, sorry for the long delay. I'll try it today or tomorrow.
Do you mean I can just specify stream-id like "+-$" and it could work?

@sedyh
Copy link
Author

sedyh commented Mar 28, 2024

So it works both when you specify $ and id-id. Theoretically it should also work with +, as after reading the last entry you'll fetch its id and then continue reading from it. Can you please try it out? If not, I'll publish a fix to what the issue is.

The problem that "+" don't works in xread, only in xrange/xrevrange. So I can read only all records "0-0" or only new records "$". Any other id will result to: ERR Invalid stream ID specified as stream command argument.

@sedyh
Copy link
Author

sedyh commented Mar 28, 2024

I guess it was added later, so you probably need to update your dependencies.
redis/redis#13117

I also know I can pass last id from xrevrange to gtrs consumer, but I'm not sure how to make that id inclusive (i.e. include the specified id in stream after start listening).

@sedyh
Copy link
Author

sedyh commented Apr 11, 2024

The only way it will work right now.
@dranikpg Please check if redis/redis#13117 has any impact on the go-redis dependency.

stream := "main"
id := LastMessageID(ctx, redis, stream)
stream := gtrs.NewConsumer[Data](ctx, redis, gtrs.StreamIDs{stream: id})
func LastMessageID(ctx context.Context, client redis.Cmdable, stream string) string {
	messages, err := client.XRevRangeN(ctx, stream, "+", "-", 1).Result()
	if err != nil {
		return "$", nil
	}

	// If there are no messages, we can only wait for the next one
	if len(messages) == 0 {
		return "$", nil
	}

	id, err := strconv.Atoi(str.LeftPart(messages[0].ID, "-"))
	if err != nil {
		return "$", nil
	}

	// We are looking for the closest entry before this one.
	// We assume that no one will have time to send 1 million records in 1 ms.
	return fmt.Sprintf("%d-%d", id-1, 99999), nil
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants