Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client not responding #13

Closed
dbstratta opened this issue Nov 17, 2021 · 21 comments
Closed

Client not responding #13

dbstratta opened this issue Nov 17, 2021 · 21 comments

Comments

@dbstratta
Copy link

dbstratta commented Nov 17, 2021

Description

Hi! Thanks for the awesome library.

I'm having a problem with the Redis client getting stuck before sending the command to the Redis server (it happens with any command).
This is happening in a Stream, but didn't try to replicate outside of one.

The pattern of getting stuck is random, sometimes it runs for tens of stream values and gets stuck and other times it gets stuck on the first value of the stream.

What I tried

I tried to track to the best of my abilities where it was getting stuck and I think it's at wait_for_response(rx) in src/utils.rs, but I'm not sure.

When setting a default timeout it gets hit, otherwise it waits forever.

I also ran redis-cli monitor on the Redis server and confirmed that the client gets stuck before sending the command.

Reproducibility

The minimal code to reproduce it is (UPDATE: this doesn't reproduce the issue, see #13 (comment)):

use std::time::Duration;

use futures::StreamExt;
use fred::prelude::*;
use fred::pool::StaticRedisPool;

#[tokio::main]
async fn main() {
  let pool = StaticRedisPool::new(RedisConfig::default(), 5);
  pool.connect(Some(ReconnectPolicy::default()));
  pool.wait_for_connect().await.unwrap();

  let stream = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_millis(100)))
    .then(move |_| {
      let pool = pool.clone();

      async move {
        let value: Option<String> = pool.get("key").await.unwrap(); // This call never responds.

        value
      }
    });

  futures::pin_mut!(stream);

  while let Some(value) = stream.next().await {
    println!("{:?}", value);
  }
}

Cargo.toml:

[package]
edition = "2021"

[dependencies]
fred = { version = "4.2.1", default-features = false, features = [
    "pool-prefer-active",
    "ignore-auth-error",
] }
futures = { version = "0.3.17", default-features = false }
tokio = { version = "1.13.0", features = ["rt", "macros", "time"] }
tokio-stream = "0.1.8"

OS and versions

Rust: 1.56.0
fred: 4.2.1
Redis: 6.2.6 inside Docker
OS: Ubuntu 21.04

@aembke aembke added the bug Something isn't working label Nov 17, 2021
@aembke
Copy link
Owner

aembke commented Nov 17, 2021

Thanks for filing a great bug repro @dbstratta .

I tried to repro this locally and unfortunately I wasn't able to reproduce it. That being said, I had to make some changes to your code, including a change to the type of Stream that is used.

This was the code I used:

use std::time::Duration;
use futures::stream::StreamExt;
use futures::pin_mut;
use fred::prelude::*;
use fred::pool::StaticRedisPool;

#[tokio::main]
async fn main() -> Result<(), RedisError> {
  let pool_config = RedisConfig {
    server: ServerConfig::default_centralized(),
    ..Default::default()
  };

  let pool = StaticRedisPool::new(pool_config, 5)?;
  pool.connect(Some(ReconnectPolicy::default()));
  pool.wait_for_connect().await?;

  let mut stream = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_millis(100)))
    .then(move |_| {
      let pool = pool.clone();

      async move {
        let value: Option<String> = pool.get("key").await?; // This call never responds.

        Ok::<_, RedisError>(value)
      }
    });
  pin_mut!(stream);

  for value in stream.next().await {
    println!("{:?}", value);
  }

  Ok(())
}

The most notable change is the switch from the StreamExt trait from tokio_stream to the StreamExt trait from the futures crate. In fred when I return a Stream it's the Stream trait from the futures crate rather than from the tokio_stream crate. I'm not too familiar with the differences between what's provided in tokio_stream vs futures, but that is likely the culprit.

I'll do a little digging to see what the differences are and whether those traits are compatible, but in the meantime can you try your code using the futures::StreamExt trait instead?

@dbstratta
Copy link
Author

dbstratta commented Nov 17, 2021

Thank you for your quick reply!

Yes, some changes were needed to the repro code, I'm sorry for that. I updated it with your changes.

I tried to reproduce it with the updated example and couldn't either. Earlier I was using a slightly modified version of it, in a subscription handler of async_graphql.

I'm going to try to see how async_graphql handles streams, but I'm positive it's the Redis client that is not responding.

@aembke
Copy link
Owner

aembke commented Nov 17, 2021

I just took a look at the tokio-stream docs and one thing jumped out at me here: https://docs.rs/tokio-stream/0.1.8/tokio_stream/#iterating-over-a-stream

It might be worth trying to use while let instead of for ... in, or switching to the Stream trait from futures which doesn't seem to have that limitation.

@dbstratta
Copy link
Author

You're right. I updated the example above with while let.

@aembke
Copy link
Owner

aembke commented Nov 17, 2021

This is a good callout though. I'm going to update the docs to make it more clear which Stream trait is returned, since this seems like a really easy hole to fall into. I'm not sure why there's several different Stream options, but the docs should make it more clear which ones fred uses, and how to avoid issues like this.

@dbstratta
Copy link
Author

dbstratta commented Nov 17, 2021

The Stream from tokio_stream is just a reexport from futures, although the StreamExt traits are different.

@dbstratta
Copy link
Author

I took a look at the async_graphql stream code and couldn't find anything suspicious.

I kept trying things and I noticed that when I disable pipelining the client doesn't get stuck.

@aembke
Copy link
Owner

aembke commented Nov 17, 2021

Huh that's odd. I tried your updated repro code (only adding an import for pin_mut) in a script for the last few hours and haven't been able to repro it (with pipelining enabled).

Can you try adding RUST_LOG=fred=trace to your repro and send over the trace logs?

@dbstratta
Copy link
Author

I couldn't reproduce it with the code from the original comment either, although it keeps happening in the async_graphql subscription if I enable pipelining.

I'll send the logs later today.

@dbstratta
Copy link
Author

This is a log leading to a halt. It was using a StaticRedisPool of 2 clients to make the logs less verbose, but it happens with more clients too. I logged the timeout errors too (timeout was 1 second, not reachable in normal circumstances on my machine).

[2021-11-18T03:41:18Z TRACE fred::multiplexer] fred-Qtwg1IWsBq: Skip waiting on cluster sync.
[2021-11-18T03:41:18Z DEBUG fred::multiplexer::utils] fred-Qtwg1IWsBq: Writing command DEL to localhost:6379
[2021-11-18T03:41:18Z TRACE fred::protocol::connection] fred-Qtwg1IWsBq: Sending command and flushing the sink.
[2021-11-18T03:41:18Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Encoded 43 bytes to 127.0.0.1:6379. Buffer len: 43
[2021-11-18T03:41:18Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Recv 4 bytes from 127.0.0.1:6379.
[2021-11-18T03:41:18Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Parsed 4 bytes from 127.0.0.1:6379
[2021-11-18T03:41:18Z TRACE fred::multiplexer::responses] fred-Qtwg1IWsBq: Processing response from localhost:6379 to DEL with frame kind Integer
[2021-11-18T03:41:18Z TRACE fred::multiplexer::responses] fred-Qtwg1IWsBq: Writing to multiplexer sender to unblock command loop.
[2021-11-18T03:41:18Z TRACE fred::multiplexer::responses] fred-Qtwg1IWsBq: Responding to caller for DEL
[2021-11-18T03:41:18Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Recv 0 bytes from 127.0.0.1:6379.
[2021-11-18T03:41:19Z TRACE fred::multiplexer::commands] fred-et96uUyZey: Recv command on multiplexer GET. Buffer len: 0
[2021-11-18T03:41:19Z TRACE fred::multiplexer] fred-et96uUyZey: Skip waiting on cluster sync.
[2021-11-18T03:41:19Z DEBUG fred::multiplexer::utils] fred-et96uUyZey: Writing command GET to localhost:6379
[2021-11-18T03:41:19Z TRACE fred::protocol::connection] fred-et96uUyZey: Sending command without flushing the sink.
Redis(Redis Error - kind: Timeout, details: Request timed out.)
[2021-11-18T03:41:20Z TRACE fred::multiplexer::commands] fred-Qtwg1IWsBq: Recv command on multiplexer GET. Buffer len: 0
[2021-11-18T03:41:20Z TRACE fred::multiplexer] fred-Qtwg1IWsBq: Skip waiting on cluster sync.
[2021-11-18T03:41:20Z DEBUG fred::multiplexer::utils] fred-Qtwg1IWsBq: Writing command GET to localhost:6379
[2021-11-18T03:41:20Z TRACE fred::protocol::connection] fred-Qtwg1IWsBq: Sending command and flushing the sink.
[2021-11-18T03:41:20Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Encoded 38 bytes to 127.0.0.1:6379. Buffer len: 38
[2021-11-18T03:41:20Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Recv 5 bytes from 127.0.0.1:6379.
[2021-11-18T03:41:20Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Parsed 5 bytes from 127.0.0.1:6379
[2021-11-18T03:41:20Z TRACE fred::multiplexer::responses] fred-Qtwg1IWsBq: Processing response from localhost:6379 to GET with frame kind Null
[2021-11-18T03:41:20Z TRACE fred::multiplexer::responses] fred-Qtwg1IWsBq: Writing to multiplexer sender to unblock command loop.
[2021-11-18T03:41:20Z TRACE fred::multiplexer::responses] fred-Qtwg1IWsBq: Responding to caller for GET
[2021-11-18T03:41:20Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Recv 0 bytes from 127.0.0.1:6379.
[2021-11-18T03:41:20Z TRACE fred::multiplexer::commands] fred-Qtwg1IWsBq: Recv command on multiplexer SET. Buffer len: 0
[2021-11-18T03:41:20Z TRACE fred::multiplexer] fred-Qtwg1IWsBq: Skip waiting on cluster sync.
[2021-11-18T03:41:20Z DEBUG fred::multiplexer::utils] fred-Qtwg1IWsBq: Writing command SET to localhost:6379
[2021-11-18T03:41:20Z TRACE fred::protocol::connection] fred-Qtwg1IWsBq: Sending command and flushing the sink.
[2021-11-18T03:41:20Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Encoded 109 bytes to 127.0.0.1:6379. Buffer len: 109
[2021-11-18T03:41:20Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Recv 5 bytes from 127.0.0.1:6379.
[2021-11-18T03:41:20Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Parsed 5 bytes from 127.0.0.1:6379
[2021-11-18T03:41:20Z TRACE fred::multiplexer::responses] fred-Qtwg1IWsBq: Processing response from localhost:6379 to SET with frame kind SimpleString
[2021-11-18T03:41:20Z TRACE fred::multiplexer::responses] fred-Qtwg1IWsBq: Writing to multiplexer sender to unblock command loop.
[2021-11-18T03:41:20Z TRACE fred::multiplexer::responses] fred-Qtwg1IWsBq: Responding to caller for SET
[2021-11-18T03:41:20Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Recv 0 bytes from 127.0.0.1:6379.
[2021-11-18T03:41:20Z TRACE fred::multiplexer::commands] fred-Qtwg1IWsBq: Recv command on multiplexer SET. Buffer len: 0
[2021-11-18T03:41:20Z TRACE fred::multiplexer] fred-Qtwg1IWsBq: Skip waiting on cluster sync.
[2021-11-18T03:41:20Z DEBUG fred::multiplexer::utils] fred-Qtwg1IWsBq: Writing command SET to localhost:6379
[2021-11-18T03:41:20Z TRACE fred::protocol::connection] fred-Qtwg1IWsBq: Sending command and flushing the sink.
[2021-11-18T03:41:20Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Encoded 171 bytes to 127.0.0.1:6379. Buffer len: 171
[2021-11-18T03:41:20Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Recv 5 bytes from 127.0.0.1:6379.
[2021-11-18T03:41:20Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Parsed 5 bytes from 127.0.0.1:6379
[2021-11-18T03:41:20Z TRACE fred::multiplexer::responses] fred-Qtwg1IWsBq: Processing response from localhost:6379 to SET with frame kind SimpleString
[2021-11-18T03:41:20Z TRACE fred::multiplexer::responses] fred-Qtwg1IWsBq: Writing to multiplexer sender to unblock command loop.
[2021-11-18T03:41:20Z TRACE fred::multiplexer::responses] fred-Qtwg1IWsBq: Responding to caller for SET
[2021-11-18T03:41:20Z TRACE fred::protocol::codec] fred-Qtwg1IWsBq: Recv 0 bytes from 127.0.0.1:6379.
[2021-11-18T03:41:20Z TRACE fred::multiplexer::commands] fred-Qtwg1IWsBq: Recv command on multiplexer GET. Buffer len: 0
[2021-11-18T03:41:20Z TRACE fred::multiplexer] fred-Qtwg1IWsBq: Skip waiting on cluster sync.
[2021-11-18T03:41:20Z DEBUG fred::multiplexer::utils] fred-Qtwg1IWsBq: Writing command GET to localhost:6379
[2021-11-18T03:41:20Z TRACE fred::protocol::connection] fred-Qtwg1IWsBq: Sending command without flushing the sink.
Redis(Redis Error - kind: Timeout, details: Request timed out.)

After this point there were no more logs and no more communication to the Redis server.

I started the stream again and the first batch of logs of a client for the next request contained a WARN log:

[2021-11-18T03:51:31Z TRACE fred::multiplexer::commands] fred-et96uUyZey: Recv command on multiplexer GET. Buffer len: 0
[2021-11-18T03:51:31Z TRACE fred::multiplexer] fred-et96uUyZey: Skip waiting on cluster sync.
[2021-11-18T03:51:31Z DEBUG fred::multiplexer::utils] fred-et96uUyZey: Writing command GET to localhost:6379
[2021-11-18T03:51:31Z TRACE fred::protocol::connection] fred-et96uUyZey: Sending command and flushing the sink.
[2021-11-18T03:51:31Z TRACE fred::protocol::codec] fred-et96uUyZey: Encoded 38 bytes to 127.0.0.1:6379. Buffer len: 38
[2021-11-18T03:51:31Z TRACE fred::protocol::codec] fred-et96uUyZey: Encoded 38 bytes to 127.0.0.1:6379. Buffer len: 76
[2021-11-18T03:51:31Z TRACE fred::protocol::codec] fred-et96uUyZey: Recv 10 bytes from 127.0.0.1:6379.
[2021-11-18T03:51:31Z TRACE fred::protocol::codec] fred-et96uUyZey: Parsed 5 bytes from 127.0.0.1:6379
[2021-11-18T03:51:31Z TRACE fred::multiplexer::responses] fred-et96uUyZey: Processing response from localhost:6379 to GET with frame kind Null
[2021-11-18T03:51:31Z TRACE fred::multiplexer::responses] fred-et96uUyZey: Writing to multiplexer sender to unblock command loop.
[2021-11-18T03:51:31Z TRACE fred::multiplexer::responses] fred-et96uUyZey: Responding to caller for GET
[2021-11-18T03:51:31Z WARN  fred::multiplexer::responses] fred-et96uUyZey: Failed to respond to caller.

Let me know if I can provide you with anything else.

@aembke
Copy link
Owner

aembke commented Nov 18, 2021

Ah ok that's interesting. The beginning of the first set of logs you sent probably contain the same WARN log as well, which would explain things.

The way this works under the hood is that each time you send a command fred creates a RedisCommand struct that has a field on it called tx which is a tokio::sync::oneshot::Sender. The receiver half is awaited on by the future returned to the caller on most async command functions.The client writes the response to the Sender half when the server responds.

That log line is emitted when the client receives an error calling send on the oneshot Sender. This happens when the receiver half is dropped before a message can be sent, which would seem to indicate that the future driving the command is dropped before the command receives a response.

Is it possible that in your code the future returned from pool.get is being dropped? Or maybe it's indirectly being dropped by the outer stream being dropped, or maybe the stream is not being polled? I've often seen this kind of issue manifest due to code far up in the call stack relative to where you originally see the error, since often it's an outer future in a long chain of nested futures that is being dropped. Is there any chance you see a note: futures do nothing unless you .await or poll them message from the compiler?

@aembke
Copy link
Owner

aembke commented Nov 18, 2021

Oh.... https://github.com/tokio-rs/tokio/releases/tag/tokio-1.13.1. That's unfortunate.

Can you try changing your tokio version to 1.14?

@dbstratta
Copy link
Author

What a coincidence! 😂

Although I updated tokio to 1.14.0 and the problem kept happening. Regarding your previous comment, I'm not getting the note: futures do nothing unless you .await or poll them message.

What makes me think that the pool.get(...) future is not being dropped is that if it was then I'd have a similar problem when I disable pipelining, but I don't. What do you think?

@aembke
Copy link
Owner

aembke commented Nov 18, 2021

Ah damn, that's too bad. Yeah I can't imagine why pipelining would affect this since from the logs you showed the request is sent and the response is received even when pipelining is enabled, and all the non-pipeline logic happens before that point. I also have a test harness that covers pretty much everything in the client and runs all the tests several times with both pipelined and non-pipelined clients, but who knows, anything is possible.

At this point I'm most concerned about the fact that we can't find a minimal repro that shows this happening. I'm sorry to ask this, but would it be possible to see more of your code (even if it uses graphql)? I'm happy to chat over a less public forum if you want to keep some of that code private, but at this point I think I need some form a repro to dig further into this.

@dbstratta
Copy link
Author

Sure, I'll send you a repro with the graphql code tomorrow.

@dbstratta
Copy link
Author

dbstratta commented Nov 19, 2021

Here's the minimal code I could reproduce the issue with:

use std::time::Duration;

use actix_web::{
    get, guard, post, web, App, HttpRequest, HttpResponse, HttpServer, Responder, Result,
};
use async_graphql::http::{playground_source, GraphQLPlaygroundConfig};
use async_graphql::{Context, EmptyMutation, Object, Subscription};
use async_graphql_actix_web::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
use fred::pool::StaticRedisPool;
use fred::prelude::*;
use futures::{Stream, StreamExt};

const GRAPHQL_ENDPOINT_URL: &str = "/graphql";

type Schema = async_graphql::Schema<Query, EmptyMutation, Subscription>;

#[actix_web::main]
async fn main() {
    fred::globals::set_default_command_timeout(1000);

    let redis_pool = StaticRedisPool::new(
        RedisConfig {
            server: ServerConfig::new_centralized("127.0.0.1", 6379),
            pipeline: true,

            ..Default::default()
        },
        2,
    )
    .unwrap();

    redis_pool.connect(Some(ReconnectPolicy::default()));
    redis_pool.wait_for_connect().await.unwrap();

    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(
                Schema::build(Query, EmptyMutation, Subscription)
                    .data(redis_pool.clone())
                    .finish(),
            ))
            .service(
                web::resource(GRAPHQL_ENDPOINT_URL)
                    .guard(guard::Header("upgrade", "websocket"))
                    .route(web::get().to(graphql_ws)),
            )
            .service(playground)
            .service(graphql)
    })
    .bind(("127.0.0.1", 4000))
    .unwrap()
    .run()
    .await
    .unwrap();
}

#[post("/graphql")]
async fn graphql(graphql_request: GraphQLRequest, schema: web::Data<Schema>) -> GraphQLResponse {
    schema.execute(graphql_request.into_inner()).await.into()
}

async fn graphql_ws(
    request: HttpRequest,
    schema: web::Data<Schema>,
    payload: web::Payload,
) -> Result<HttpResponse> {
    GraphQLSubscription::new(Schema::clone(&*schema)).start(&request, payload)
}

#[get("/graphql")]
async fn playground() -> impl Responder {
    HttpResponse::Ok()
        .content_type("text/html")
        .body(playground_source(
            GraphQLPlaygroundConfig::new(GRAPHQL_ENDPOINT_URL)
                .subscription_endpoint(GRAPHQL_ENDPOINT_URL),
        ))
}

pub struct Subscription;

#[Subscription]
impl Subscription {
    async fn tick(&self, ctx: &Context<'_>) -> impl Stream<Item = Option<bool>> {
        let redis_pool = ctx.data_unchecked::<StaticRedisPool>().clone();

        tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_millis(
            100,
        )))
        .then(move |_| {
            let redis_pool = redis_pool.clone();

            async move {
                let value = redis_pool.get::<Option<bool>, _>("key").await.unwrap();

                value
            }
        })
    }
}

pub struct Query;

#[Object]
impl Query {
    async fn hello(&self) -> &str {
        "world"
    }
}

With Cargo.toml:

[package]
name = "repro"
version = "0.0.1"
edition = "2021"

[dependencies]
actix-web = { version = "= 4.0.0-beta.11", default-features = false }
async-graphql = { version = "3.0.4" }
async-graphql-actix-web = { version = "3.0.4" }
fred = { version = "4.2.1", default-features = false, features = [
    "pool-prefer-active",
    "ignore-auth-error",
] }
futures = { version = "0.3.17", default-features = false }
tokio = { version = "1.14.0", features = ["rt", "macros", "time"] }
tokio-stream = "0.1.8"

Go to http://localhost:4000/graphql, in the playground enter the following text and start the subscription (with the play button):

subscription {
  tick
}

That will start the stream and start responding (on the right panel of the playground) with:

{
  "data": {
    "tick": null
  }
}

Let it be subscribed for 10 seconds, stop it (with the stop button in the playground) and start it again for 10 seconds. Repeat this until it crashes because of the fred timeout (it should happen around the 5th time or so).

Screenshot from 2021-11-19 17-28-24
A screenshot of the playground for reference.

@aembke
Copy link
Owner

aembke commented Nov 30, 2021

Hi @dbstratta

First off thanks a lot for taking the time to write all that up.

I tried your example and was able to successfully repro the issue with 4.2.1 of fred after a few tries, but I did also have to change the actix version to 4.0.0-beta.13 from 4.0.0-beta.11 since apparently some intermediate imports in the dependency chain had changed. I doubt that affects anything, but figured I'd call it out.

This morning I released 4.2.2 for fred which addressed #14. I had hoped that would also address this issue since both issues used the same static pool code paths, but unfortunately that doesn't seem to be the case. As I was writing this comment I saw a repro even on 4.2.2 with your code above, so I'll keep looking and update you when I know more.

Possibly unrelated - is it normal to see this in the console?

{
  "error": "Could not connect to websocket endpoint ws://localhost:4000/graphql. Please check if the endpoint url is correct."
}

I'll sometimes see that after starting the subscriber on the client, but it happens without any issues/panics from fred.

@aembke
Copy link
Owner

aembke commented Nov 30, 2021

Hi @dbstratta

I have some good news and some bad news, depending on how you look at it. The good news is that I was able to repro this issue without fred in play at all, and the bad news is that I think this is an actix issue. Given that they're still in beta for 4.x that may be somewhat expected, but unfortunately means I can't help too much here.

The logs from your original repro showed that the receiver half of the oneshot sender used by fred (which is returned to the caller) was being dropped before fred could send a response. I modified your repro to simulate how fred works, but just using vanilla tokio tasks and channels with fake data. I also removed all the timeout management logic, as well as tokio-stream since I didn't think there were any issues there that could cause this.

This was the resulting minimum repro case I could come up with:

use actix_web::{get, guard, post, web, App, HttpRequest, HttpResponse, HttpServer, Responder, Result};
use async_graphql::http::{playground_source, GraphQLPlaygroundConfig};
use async_graphql::{Context, EmptyMutation, Object, Subscription};
use async_graphql_actix_web::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
use fred::error::RedisError; // i still use fred's error types, but that's it
use futures::stream::repeat;
use futures::{Stream, StreamExt};
use tokio::sync::oneshot::channel;

const GRAPHQL_ENDPOINT_URL: &str = "/graphql";
type Schema = async_graphql::Schema<Query, EmptyMutation, Subscription>;

#[derive(Clone)]
struct RedisClient;

impl RedisClient {
  async fn simulate_redis(&self) -> Result<Option<bool>, RedisError> {
  // this is essentially how fred works under the hood for all requests
    let (tx, rx) = channel();
    tokio::spawn(async move {
      if let Err(e) = tx.send(Ok(None)) {
        println!("Error sending fake response: {:?}", e);
      }
    });

    rx.await?
  }
}

#[actix_web::main]
async fn main() {
  pretty_env_logger::init();
  let client = RedisClient;

  HttpServer::new(move || {
    App::new()
      .app_data(web::Data::new(
        Schema::build(Query, EmptyMutation, Subscription)
          .data(client.clone())
          .finish(),
      ))
      .service(
        web::resource(GRAPHQL_ENDPOINT_URL)
          .guard(guard::Header("upgrade", "websocket"))
          .route(web::get().to(graphql_ws)),
      )
      .service(playground)
      .service(graphql)
  })
  .bind(("127.0.0.1", 4000))
  .unwrap()
  .run()
  .await
  .unwrap();
}

#[post("/graphql")]
async fn graphql(graphql_request: GraphQLRequest, schema: web::Data<Schema>) -> GraphQLResponse {
  schema.execute(graphql_request.into_inner()).await.into()
}

async fn graphql_ws(request: HttpRequest, schema: web::Data<Schema>, payload: web::Payload) -> Result<HttpResponse> {
  GraphQLSubscription::new(Schema::clone(&*schema)).start(&request, payload)
}

#[get("/graphql")]
async fn playground() -> impl Responder {
  HttpResponse::Ok().content_type("text/html").body(playground_source(
    GraphQLPlaygroundConfig::new(GRAPHQL_ENDPOINT_URL).subscription_endpoint(GRAPHQL_ENDPOINT_URL),
  ))
}

pub struct Subscription;

#[Subscription]
impl Subscription {
  async fn tick(&self, ctx: &Context<'_>) -> impl Stream<Item = Option<bool>> {
    let pool = ctx.data_unchecked::<RedisClient>().clone();

    repeat(0).scan(pool, move |pool, _| {
      let pool = pool.clone();
      async move { pool.simulate_redis().await.ok() }
    })
  }
}

pub struct Query;

#[Object]
impl Query {
  async fn hello(&self) -> &str {
    "world"
  }
}

This log line in particular is what shows the error occurring:

println!("Error sending fake response: {:?}", e);

I was able to repro this consistently after making these changes, pointing to an issue with actix most likely dropping the simulate_redis future. I've actually had something very similar to this happen in a different project, and it was due to some strange behaviour with how select! was being used.

I'm not too familiar with actix so unfortunately I can't be much help here in the short term, but I'll see if I can slim down the repro even further and submit an issue there. Hopefully this is something they already know about or is something that they can give some guidance on.

@dbstratta
Copy link
Author

@aembke Thank you so much for your help!!! Good to know that fred isn't the problem here. I'll too try to slim down the repro to find the root cause and open an issue on the corresponding project. I'll link it here to let you know.

Possibly unrelated - is it normal to see this in the console?

{
 "error": "Could not connect to websocket endpoint ws://localhost:4000/graphql. Please check if the endpoint url is correct."
}

I don't remember getting it, but I'll pay attention to it next time.

@aembke
Copy link
Owner

aembke commented Dec 1, 2021

No problem. Feel free to tag me if you do open up an issue, and I'll do the same if I'm able to get a smaller repro working.

The first 2 issues since I open sourced this have been related to getting it to work with actix, and while I don't personally have a lot of experience with it I keep hearing great things. It was actually really interesting and compelling that you're able to spin up a non-trivial API as easily as you showed in the repro here too. This is probably a sign that I should get more familiar with it and likely add some examples on how to get things working well with fred + actix. In the meantime I'm happy to help out filing or debugging this in actix as well since it seems several people are trying to use these two libraries together.

@aembke aembke removed the bug Something isn't working label Dec 1, 2021
@aembke
Copy link
Owner

aembke commented Dec 1, 2021

I'm going to close this out in the meantime but feel free to add to this you learn more, and i'll do the same.

@aembke aembke closed this as completed Dec 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants