Skip to content

Commit

Permalink
use log
Browse files Browse the repository at this point in the history
  • Loading branch information
gengteng committed Jan 30, 2022
1 parent b8f5222 commit 73ffa57
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 18 deletions.
6 changes: 2 additions & 4 deletions jinshu-comet/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ impl ConnectionManager {
let pusher = client_writer.clone();
self.connections
.insert(user_id, Connection::create(user_id, pusher));
self.session_store
.set_ex(user_id, &self.service_uri, 600)
.await?;
self.session_store.store(user_id, &self.service_uri).await?;

let ss = self.session_store.clone();
let mut receiver = self.receiver.clone();
Expand Down Expand Up @@ -213,7 +211,7 @@ impl ConnectionManager {
}
}

if let Err(error) = ss.del(user_id).await {
if let Err(error) = ss.remove(user_id).await {
tracing::warn!(%error, "Failed to remove session");
}

Expand Down
2 changes: 1 addition & 1 deletion jinshu-pusher/src/pusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Pusher {
#[allow(dead_code)]
pub async fn send(&self, pdu: Pdu) -> anyhow::Result<()> {
let user_id: Uuid = pdu.to.parse()?;
match self.session_store.get(user_id).await? {
match self.session_store.load(user_id).await? {
Some(uri) => {
if let Some(mut client) = self.clients.get_mut(&uri) {
client.push(Request::new(pdu)).await?;
Expand Down
10 changes: 4 additions & 6 deletions jinshu-redis/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,19 @@ impl SessionStore {
Self { redis }
}

pub async fn set_ex(&self, user_id: Uuid, service_key: &str, ttl: usize) -> crate::Result<()> {
pub async fn store(&self, user_id: Uuid, service_key: &str) -> crate::Result<()> {
let mut conn = self.redis.get().await?;
let _: () = conn
.set_ex(get_user_session_key(user_id), service_key, ttl)
.await?;
let _: () = conn.set(get_user_session_key(user_id), service_key).await?;
Ok(())
}

pub async fn get(&self, user_id: Uuid) -> crate::Result<Option<String>> {
pub async fn load(&self, user_id: Uuid) -> crate::Result<Option<String>> {
let mut conn = self.redis.get().await?;
let endpoint: Option<String> = conn.get(get_user_session_key(user_id)).await?;
Ok(endpoint)
}

pub async fn del(&self, user_id: Uuid) -> crate::Result<()> {
pub async fn remove(&self, user_id: Uuid) -> crate::Result<()> {
let mut conn = self.redis.get().await?;
let _: () = conn.del(get_user_session_key(user_id)).await?;
Ok(())
Expand Down
3 changes: 2 additions & 1 deletion jinshu-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ tokio-util = { version = "0.6", features = ["codec"]}
anyhow = "1"
clap = "3"
mime = "0.3"
tracing = "0.1"
log = "0.4"
env_logger = "0.9"
uuid = { version = "1.0.0-alpha.1", features = ["serde", "v4", "fast-rng"]}

[[bin]]
Expand Down
20 changes: 14 additions & 6 deletions jinshu-sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::Parser;
use futures::{SinkExt, StreamExt};
use jinshu_protocol::{Codec, Content, Endpoint, Message, Pdu, PduCodec, Request, Response};
use jinshu_utils::current_millisecond;
use log::LevelFilter;
use std::time::{Duration, Instant};
use tokio::net::TcpStream;
use tokio::time::sleep;
Expand All @@ -26,6 +27,10 @@ struct Opts {
/// 使用的编码, 0.json | 1.msgpack | 2.cbor | 3.flexbuffers
#[clap(short = 'c', long)]
codec: Codec,

/// 日志级别
#[clap(short = 'l', long, default_value = "INFO")]
log_level: LevelFilter,
}

#[tokio::main]
Expand All @@ -35,8 +40,11 @@ async fn main() -> anyhow::Result<()> {
user_id,
token,
codec,
log_level,
} = Opts::parse();

env_logger::builder().filter_level(log_level).try_init()?;

let socket = TcpStream::connect(addr).await?;
let mut framed = Framed::new(socket, PduCodec::new(codec));

Expand All @@ -46,9 +54,9 @@ async fn main() -> anyhow::Result<()> {

match framed.next().await {
Some(Ok(Pdu::Resp(Response::SignedIn { extension }))) => {
tracing::info!("Sign in ok");
log::info!("Sign in ok");
if let Some(extension) = extension {
tracing::info!("extension: {}", extension);
log::info!("extension: {}", extension);
}
}
Some(Ok(Pdu::Resp(Response::InvalidToken { user_id }))) => {
Expand Down Expand Up @@ -79,9 +87,9 @@ async fn main() -> anyhow::Result<()> {

if let Some(result) = framed.next().await {
let pdu = result?;
tracing::info!("pdu received: {:?} ({}ms)", pdu, now.elapsed().as_millis());
log::info!("pdu received: {:?} ({}ms)", pdu, now.elapsed().as_millis());
} else {
tracing::error!("The connection was closed before receiving Pong");
log::error!("The connection was closed before receiving Pong");
}

if id >= 10 {
Expand Down Expand Up @@ -109,13 +117,13 @@ async fn main() -> anyhow::Result<()> {

if let Some(qr) = framed.next().await {
let pdu = qr?;
tracing::info!(
log::info!(
"message queued: {:?} ({}ms)",
pdu,
now.elapsed().as_millis()
);
} else {
tracing::error!("The connection was closed before receiving Pong");
log::error!("The connection was closed before receiving Pong");
}

sleep(Duration::from_secs(1)).await;
Expand Down

0 comments on commit 73ffa57

Please sign in to comment.