How do I stream text data using actix_web and tokio? #3060
Answered
by
timelessnesses
timelessnesses
asked this question in
Q&A
-
I want to stream the records of the table in the database and here's what I got so far. #[actix_web::get("/list")]
async fn listing(app: actix_web::web::Data<crate::routes::types::States>) -> impl actix_web::Responder {
match (app.postgres_db.is_some(), app.sqlite3_db.is_some()) {
(true, false) => {
match app.postgres_db.as_ref().as_ref().unwrap().query("SELECT * FROM redirect", &[]).await {
Ok(rows) => {
let (mut tx, rx) = tokio::sync::mpsc::channel::<String>(10);
tx.send("Listing all of available redirects!\n".to_owned()).await;
actix_web::rt::spawn(async move {
for row in rows {
let data = format!("URL: {}, ID: {}, Accessed: {}\n", row.get("URL"), row.get("ID"), row.get("accessed"));
if tx.send(data).await.is_err() {
break
}
}
});
return actix_web::HttpResponse::Ok().streaming(rx);
}
}
}
}
} I got the trait `futures::Stream` is not implemented for `tokio::sync::mpsc::Receiver<Result<std::string::String, std::string::String>>` Is there anyway to stream text data using tokio or there's simpler way to do this? |
Beta Was this translation helpful? Give feedback.
Answered by
timelessnesses
Jul 2, 2023
Replies: 1 comment 1 reply
-
nevermind I already got it! let (tx, mut rx) = tokio::sync::mpsc::channel::<
Result<actix_web::web::Bytes, actix_web::Error>,
>(10);
actix_web::rt::spawn(async move {
match tx
.send(Ok(actix_web::web::Bytes::from(
"List of redirect URLs!\n".to_owned().into_bytes(),
)))
.await
{
Ok(_) => {}
Err(e) => {
log::error!("Something went wrong when streaming data!: {}", e);
return;
}
}
if rows.len() == 0 {
match tx
.send(Ok(actix_web::web::Bytes::from(
"None!".to_owned().into_bytes(),
)))
.await
{
Ok(_) => {}
Err(e) => {
log::error!("Something went wrong when streaming data!: {}", e);
return;
}
}
}
for row in rows {
let id: String = row.id;
let url: String = row.url;
let accessed: i64 = row.accessed;
let data = format!("ID: {}, URL: {}, Accessed: {}", id, url, accessed)
.into_bytes();
match tx.send(Ok(actix_web::web::Bytes::from(data))).await {
Ok(_) => {}
Err(_) => break,
}
}
});
return actix_web::HttpResponse::Ok().streaming(async_stream::stream! {
while let Some(item) = rx.recv().await {
match item {
Ok(data) => yield Ok(data),
Err(err) => yield Err(actix_web::Error::from(err))
}
}
}); |
Beta Was this translation helpful? Give feedback.
1 reply
Answer selected by
timelessnesses
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
nevermind I already got it!