Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug?] 6.0.0: Reading concurrently from multiple connections #84

Closed
kika opened this issue Dec 17, 2022 · 12 comments
Closed

[Bug?] 6.0.0: Reading concurrently from multiple connections #84

kika opened this issue Dec 17, 2022 · 12 comments
Assignees
Labels
bug Something isn't working

Comments

@kika
Copy link
Contributor

kika commented Dec 17, 2022

It's either a bug in my DNA (© old programming joke) or in fred.

Redis version - 7.0.5
Platform - Mac
Using Docker and/or Kubernetes - no
Deployment type - centralized

Describe the bug

A brief explanation to why I'm doing things the way the repro is written: I need to read a lot of STREAMs from a Redis Cluster. XREAD is able to read many streams at once, just not in the cluster environment. In cluster environment it can only read from streams that hash into the same slot (not the same node!). So to minimize the number of XREADs I group the channel names by the result of redis_keyslot(), for every slot that has streams I open a connection to the cluster node (not a clustered connection, but a result of the clone_new() from one of split_cluster()) and run XREAD on this connection. And that doesn't work the way I intended. After I run a test case with just a few streams only one stream works as supposed. The others wake up the connection but do not trigger XREADs. If you keep trying to XADD to these "ignored" streams, they suddenly start working (from 1-2 attempts, sometimes more). The original stream that was working stops working. But the data sent to the "ignored" channels to wake them up is lost which probably means that these other XREADs are not even started because otherwise they'd have captured the data due to their "$" wildcard ID. Phew.

To Reproduce

This repro simulates this behavior without the cluster and that's why it looks the way it is. The easiest way to produce the repro is to put it into the examples folder and RUST_LOG=trace cargo run --example bug84. Then from another terminal run redis-cli and issue a few commands:

xadd test0 * v test0
xadd test1 * v test1
xadd test0 * v test2
etc

I had this problem with 5.2.0 and moved to 6.0.0 hoping it's going to be better.

Quite possibly it's my insufficient understanding on how the async Rust works...

Repro itself:

use std::sync::Arc;

use fred::{prelude::*, types::XReadResponse};
use futures::stream::FuturesUnordered;
use tokio_stream::StreamExt;

async fn _read(
        client: Arc<RedisClient>,
        stream_name: &String,
    ) -> Result<Vec<(String, String)>, RedisError> {
        let r: XReadResponse<String, String, String, String> =
            client.xread_map(None, Some(0), stream_name, "$".to_string()).await?;
        let mut result = vec![];
        for (stream_name, records) in r.iter() {
            for (last_id, _record) in records {
                result.push((stream_name.to_string(), last_id.to_string()));
            }
        }
        Ok(result)
    }

#[tokio::main]
async fn main() -> Result<(), RedisError> {
    pretty_env_logger::init();

    let config = RedisConfig::default();
    let mut clients: Vec<Arc<RedisClient>> = vec![];
    let mut stream_names: Vec<String> = vec![];

    for i in 0..3 {
        let c = RedisClient::new(config.clone(), None, None);
        c.connect();
        c.wait_for_connect().await?;
        clients.push(Arc::new(c));
        stream_names.push(format!("test{i}"));
    }

    loop {
        let mut xreads = FuturesUnordered::new();
        for (i, stream_name) in stream_names.iter().enumerate() {
            xreads.push(_read(clients.get(i).unwrap().clone(), stream_name))
        }
        tokio::select! {
            Some(Ok(events)) = xreads.next() => {
                for event in events {
                    println!("Stream {} last ID {}", event.0, event.1);
                }
            }
        }
    }
}
@kika kika added the bug Something isn't working label Dec 17, 2022
@aembke
Copy link
Owner

aembke commented Dec 18, 2022

I think this might be a symptom of how the select macro is used here. If I'm reading this correctly you're selecting over the list of futures, waiting for one of them to complete, and then dropping all the others when the loop continues. So in each loop iteration only one of the futures will ever complete. I suspect there's also a missing await call after the select, but I haven't used that macro often so I may be wrong there.

I'll have some time this week to explore other ways of structuring this, but I suspect it can be fixed with some changes in the app code. There's no shared state between clients, even when they're cloned via clone_new or split_cluster, so I'd be very surprised if it were something in the connection layer.

@kika
Copy link
Contributor Author

kika commented Dec 18, 2022

@aembke thanks for your comment, I'm still digging into this, and it looks like that the problem might be in the "dropping all the others" part. In the real app, the list of streams is dynamically created, as clients add and remove the stream names to the list (through sending commands over the mpsc channel, which is included in the select, so it makes the loop run around).
See this MONITOR log from the app test case (not the repro) but working in single connection mode (not clustered):

"CLIENT" "SETNAME" "fred-U1TVF8giJT" <-- app started and connected
"XREAD" "BLOCK" "0" "STREAMS" "test0" "$" <- first subscription, but where are all the others? (test1 and test2)
"xadd" "test0" "*" "v" "test0" <- this is me sending through CLI
"XREAD" "BLOCK" "0" "STREAMS" "test0" "test1" "test2" "$" "$" "$" <- only now I see the rest of the channels
"xadd" "test0" "*" "v" "test0"
"XREAD" "BLOCK" "0" "STREAMS" "test0" "test1" "test2" "1671377131021-0" "$" "$"
... so it kinda works
"CLIENT" "SETNAME" "fred-hAuRd4L4FW" <- restarted the app
"XREAD" "BLOCK" "0" "STREAMS" "test0" "test1" "$" "$" <- now I see 2 out of 3 channels, test2 is ignored
"xadd" "test2" "*" "v" "test2" <- ignored indeed, nothing happens
"xadd" "test1" "*" "v" "test1" <- this one is not ignored, the connection wakes
"XREAD" "BLOCK" "0" "STREAMS" "test0" "test1" "test2" "$" "$" "$" <- causes the xread to run, but the data I sent above is missing

It appears that something is missing around the cancellation of promises and canceling the blocking XREADs

@kika
Copy link
Contributor Author

kika commented Dec 18, 2022

After writing the above comment I SUDDENLY realized that I have blocking policy in the RedisConfig at default, and set it to Blocking::Interrupt. Surprisingly, there's no change in overall behavior.

Now I sometimes see

DEBUG fred::modules::backchannel] fred-4vzg2X3AyT: Sending CLIENT UNBLOCK (0) on backchannel to 127.0.0.1:6379

and sometimes

ERROR fred::utils] fred-6ZUIwh9OGX: Failed to interrupt blocked connection: Redis Error - kind: Unknown, details: Connection is not blocked.

on two consecutive runs.
But in the first case, I don't see CLIENT UNBLOCK in the MONITOR stream from the server. But this stream does indeed looks different (and better, but not good enough):

"CLIENT" "SETNAME" "fred-4vzg2X3AyT"
"CLIENT" "ID"
"INFO" "server"
"XREAD" "BLOCK" "0" "STREAMS" "test0" "$"
"CLIENT" "SETNAME" "fred-4vzg2X3AyT" <- UNBLOCK looks like reconnect?
"CLIENT" "ID"
"INFO" "server"
"XREAD" "BLOCK" "0" "STREAMS" "test0" "test1" "$" "$" <- but still, no third stream name!

@kika
Copy link
Contributor Author

kika commented Dec 18, 2022

Another spammy comment, but might be helpful. I switched back to RESP3 and now I don't see any attempts to unblock in the trace logs. But here's the interesting part: a log from fred interspersed with my own logging from the app. I see myself sending 3 XREADs to the connection and fred processing only 2, and only 1 flushed down the pipe. Which is basically what I see in the MONITOR stream. Why?

DEBUG fred::multiplexer::commands] fred-0QxHjl5ScK: Starting command processing stream...
TRACE redis_test] Centralized configuration
TRACE fred::protocol::codec] fred-0QxHjl5ScK: Recv 0 bytes from 127.0.0.1:6379 (RESP3).
TRACE redis_test] ============ CacheMgr::run() tick ============
TRACE redis_test] number of xreads to scan: 1
DEBUG redis_test] CacheMgr: received command: Subscribe { name: "test0", resp: Sender { inner: Some(Inner { state: State { is_complete: false, is_closed: false, is_rx_task_set: true, is_tx_task_set: false } }) } }
TRACE redis_test] ============ CacheMgr::run() tick ============
TRACE redis_test] number of xreads to scan: 1
TRACE redis_test] XREAD ["test0"] ["$"] -> fred-0QxHjl5ScK
TRACE fred::interfaces] fred-0QxHjl5ScK: Sending command XREAD (0) to multiplexer.
TRACE fred::multiplexer::commands] fred-0QxHjl5ScK: Recv command: MultiplexerCommand { kind: "Command", command: "XREAD" }
DEBUG redis_test] CacheMgr: received command: Subscribe { name: "test1", resp: Sender { inner: Some(Inner { state: State { is_complete: false, is_closed: false, is_rx_task_set: true, is_tx_task_set: false } }) } }
TRACE fred::protocol::command] fred-0QxHjl5ScK: Pipeline check XREAD: false
TRACE redis_test] ============ CacheMgr::run() tick ============
TRACE redis_test] number of xreads to scan: 1
TRACE fred::multiplexer] fred-0QxHjl5ScK: Writing command 0
TRACE redis_test] XREAD ["test1", "test0"] ["$", "$"] -> fred-0QxHjl5ScK
TRACE fred::interfaces] fred-0QxHjl5ScK: Sending command XREAD (0) to multiplexer.
TRACE fred::multiplexer::utils] fred-0QxHjl5ScK: Writing command 0. Timed out: false, Force flush: false
DEBUG redis_test] CacheMgr: received command: Subscribe { name: "test2", resp: Sender { inner: Some(Inner { state: State { is_complete: false, is_closed: false, is_rx_task_set: true, is_tx_task_set: false } }) } }
TRACE fred::multiplexer::utils] fred-0QxHjl5ScK: Sending command XREAD to 127.0.0.1:6379, ID: 0
TRACE redis_test] ============ CacheMgr::run() tick ============
TRACE redis_test] number of xreads to scan: 1
TRACE redis_protocol::resp3::encode] Attempting to encode Array with total size 64
TRACE redis_protocol::utils] allocating more, len: 0, amt: 64
TRACE redis_protocol::resp3::encode] Attempting to encode Array with total size 64
TRACE redis_test] XREAD ["test1", "test2", "test0"] ["$", "$", "$"] -> fred-0QxHjl5ScK
TRACE redis_protocol::resp3::encode] Attempting to encode BlobString with total size 11
TRACE redis_protocol::resp3::encode] Attempting to encode BlobString with total size 11
TRACE fred::interfaces] fred-0QxHjl5ScK: Sending command XREAD (0) to multiplexer.
TRACE redis_protocol::resp3::encode] Attempting to encode BlobString with total size 7
TRACE redis_protocol::resp3::encode] Attempting to encode BlobString with total size 13
TRACE redis_protocol::resp3::encode] Attempting to encode BlobString with total size 11
TRACE redis_protocol::resp3::encode] Attempting to encode BlobString with total size 7
TRACE fred::protocol::codec] fred-0QxHjl5ScK: Encoded 64 bytes to 127.0.0.1:6379. Buffer len: 64 (RESP3)
TRACE fred::multiplexer::commands] fred-0QxHjl5ScK: Sent command to 127.0.0.1:6379. Flushed: true
DEBUG fred::multiplexer::commands] fred-0QxHjl5ScK: Waiting on multiplexer channel.

@aembke
Copy link
Owner

aembke commented Dec 18, 2022

I wont have time to really dive into this until later in the week, but from a quick glance it looks like what you're trying to do may be inherently prone to race conditions, which could explain why you're seeing inconsistent results. That's not necessarily a bad thing, but just worth considering. The redis protocol is not multiplexed, so when you send a blocking XREAD command it will block the entire client until that call finishes. That may explain why you're seeing fewer XREAD logs than you expect, but it's hard to parse given how you're distributing commands among the clients.

If I were doing this I'd put each client in it's own tokio task with a loop around the mpsc recv and XREAD calls and just avoid all this select logic. I'm sure it's possible to do it that way, but it seems like you want to process each stream concurrently anyways, and since these commands are all blocking then you wont really see any benefit from select'ing over multiple futures since the commands all block each other and run in series under the hood (at least in the context of a single client).

Take all that with a grain of salt though, I haven't had a chance to really dive into this yet.

@kika
Copy link
Contributor Author

kika commented Dec 18, 2022

@aembke thanks, are you saying that even having exactly one XREAD per connection within the same library instance is prone to race conditions? Because that's what I'm trying to achieve here.

Basically, what I'm trying to do is this: lets say streams A and B are destined to use one connection and thus I issue XREAD A B $ $ . Now, while waiting, I want to add stream C to the list, so I want to cancel the XREAD and issue a new XREAD A B C $ $ $ instead.

@kika
Copy link
Contributor Author

kika commented Dec 19, 2022

Here's the minimum repro, much simpler than the original (should I replace it?):

use log::*;
use fred::prelude::*;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), RedisError> {
    pretty_env_logger::init();

    let mut config = RedisConfig::default();
    config.blocking = Blocking::Interrupt;
    let client = RedisClient::new(config.clone(), None, None);
    let mut stream_names = vec![];
    let mut ids = vec![];
    let (tx, mut rx) = mpsc::unbounded_channel();

    client.connect();
    client.wait_for_connect().await?;

    let jh = tokio::spawn(async move {
        loop {
            tokio::select! {
                Some(new_name) = rx.recv() => {
                    debug!("Adding new stream: {}", new_name);
                    stream_names.push(new_name);
                    ids.push("$");
                }
                Ok(r) = client.xread_map::<String, String, String, String, Vec<&str>, Vec<&str>>(None, Some(0), stream_names.clone(), ids.clone()) => {
                    for s in r.iter() {
                        println!("Result: {:?}", s);
                    }
                }
            }
        }
    });

    let _ = tx.send("test0");
    let _ = tx.send("test1");
    let _ = tx.send("test2");
    let _ = jh.await;
    Ok(())
}

So my problems were twofold: RedisConfig::blocking has to be set to Interrupt for this to even attempt to work and then there's obviously a race condition.

If you introduce a delay after spawn and before first tx.send() then it has a better chance at working. In my app (which is much more involved at the receiving end of the channel) 3 seconds is enough, but I believe it depends on the computer and redis instance.

As you can see, there's nothing illegal happening: one connection, just multiple commands (but only one at a time) interrupting each other. My impression is that if the interrupt happens while XREAD is not fully settled then some internal mechanism in the library locks up. If you send data to the channel it managed to listen to (usually "test0") then this data go missing for 1 or 2 attempts but then the mechanism wakes up, syncs up with itself and then works.

@kika
Copy link
Contributor Author

kika commented Dec 20, 2022

I rewrote the module from my app into the threaded model, one client - one task (expected it to be much more complex and it ended up being the same line count, but arguably harder to read and comprehend) and it has the same problem. I need to cancel the outstanding XREAD to update the list of streams and if I do it too fast it locks up.
As for somewhat nondeterministic behavior it appears that it's tokio::select!() fairness. It randomly choses the arms to run first. If you swap the order of the arms and put biased; before the first arm, it will almost guarantee the wrong behavior.

@aembke
Copy link
Owner

aembke commented Dec 20, 2022

Basically, what I'm trying to do is this: lets say streams A and B are destined to use one connection and thus I issue XREAD A B $ $ . Now, while waiting, I want to add stream C to the list, so I want to cancel the XREAD and issue a new XREAD A B C $ $ $ instead.

This helped a lot, thanks. Apologies, I misunderstood this originally.

Try this:

use fred::prelude::*;
use futures::future::pending;
use std::{collections::HashMap, time::Duration};
use tokio::{sync::mpsc, time::sleep};

#[tokio::main]
async fn main() -> Result<(), RedisError> {
  pretty_env_logger::init();

  let mut config = RedisConfig::default();
  config.blocking = Blocking::Interrupt;
  let client = RedisClient::new(config.clone(), None, None);
  let mut stream_names: Vec<String> = vec![];
  let mut ids = vec![];
  let (tx, mut rx) = mpsc::unbounded_channel();

  client.connect();
  client.wait_for_connect().await?;

  let jh = tokio::spawn(async move {
    loop {
      tokio::select! {
          Some(new_name) = rx.recv() => {
              println!("Adding new stream: {}", new_name);
              stream_names.push(new_name);
              ids.push("$");
          }
          Ok(r) = async {
              if !stream_names.is_empty() {
                  client.xread_map::<String, String, String, String, Vec<String>, Vec<&str>>(None, Some(0), stream_names.clone(), ids.clone()).await
              }else{
                  println!("Skip XREAD.");
                  // return a future that never resolves. the select! macro will cancel the future when `rx` gets a message
                  let _: () = pending().await;
                  Ok(HashMap::new())
              }
          } => {
              for s in r.iter() {
                  println!("Result: {:?}", s);
              }
          }
      }
    }
  });

  let _ = tx.send("test0".into());
  let _ = tx.send("test1".into());
  let _ = tx.send("test2".into());

  for i in 0 .. 50 {
    let _ = tx.send(format!("test{}", i + 3));
    sleep(Duration::from_secs(5)).await;
  }

  let _ = jh.await;
  Ok(())
}

It looks like there were two race conditions here - one in the client in and one in the app code. This PR #87 fixes the one in the client, and the code above adds a check for the other one.

XREAD returns a syntax error if called like that without any arguments, which can result in the app failing to pick up any new records until something writes to tx. The new if/else branch adds a check for that.

@kika
Copy link
Contributor Author

kika commented Dec 20, 2022

Right, sorry, I missed that part, the actual code does have a guard against empty XREAD, it should be written like this:

use log::*;
use fred::prelude::*;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), RedisError> {
    pretty_env_logger::init();

    let mut config = RedisConfig::default();
    config.blocking = Blocking::Interrupt;
    let client = RedisClient::new(config.clone(), None, None);
    let mut stream_names = vec![];
    let mut ids = vec![];
    let (tx, mut rx) = mpsc::unbounded_channel();

    client.connect();
    client.wait_for_connect().await?;

    let jh = tokio::spawn(async move {
        loop {
            tokio::select! {
                Some(new_name) = rx.recv() => {
                    debug!("Adding new stream: {}", new_name);
                    stream_names.push(new_name);
                    ids.push("$");
                }
                Ok(r) = client.xread_map::<String, String, String, String, Vec<&str>, Vec<&str>>
                                  (None, Some(0), stream_names.clone(), ids.clone()),
                 if stream_names.len() > 0  => {
                    for s in r.iter() {
                        println!("Result: {:?}", s);
                    }
                }
            }
        }
    });

    let _ = tx.send("test0");
    let _ = tx.send("test1");
    let _ = tx.send("test2");
    let _ = jh.await;
    Ok(())
}

@kika
Copy link
Contributor Author

kika commented Dec 20, 2022

Some late night testing with the app test case shows that #87 did the trick, thanks! I'll test more tomorrow, but the hopes are high, thank you very much!

@kika
Copy link
Contributor Author

kika commented Dec 20, 2022

Can't reproduce anymore.
Closed via #87

@kika kika closed this as completed Dec 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants