-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsession.rs
133 lines (116 loc) · 3.6 KB
/
session.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use std::time::{Duration, Instant};
use bitflags::bitflags;
use bytes::Bytes;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use redis_codec_core::req_decoder::ReqDecoder;
use redis_codec_core::resp_decoder::ResFramedData;
use redis_proxy_common::ReqPkt;
use crate::connection::Connection;
bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SessionFlags : u32 {
const None = 0_u32;
const InTrasactions = 1_u32 << 0;
const InSubscribe = 1_u32 << 1;
}
}
/// see https://redis.io/commands/client-list/ for more details
pub struct SessionAttrs {
pub flags: SessionFlags,
//pub db: u8,
// to record the channels that the client is subscribed to
pub sub: usize,
pub ssub: usize,
}
impl SessionAttrs {
pub fn reset(&mut self) {
self.flags = SessionFlags::None;
self.sub = 0;
self.ssub = 0;
}
}
pub struct Session {
downstream_reader: FramedRead<OwnedReadHalf, ReqDecoder>,
downstream_writer: OwnedWriteHalf,
pub attrs: SessionAttrs,
pub upstream_conn: Option<Connection>,
pub dw_conn: Option<Connection>,
pub req_start: Instant,
pub(crate) upstream_start: Instant,
pub upstream_elapsed: Duration,
pub upstream_conn_elapsed: Duration,
pub res_is_ok: bool,
pub req_size: usize,
pub res_size: usize,
}
impl Session {
pub fn new(stream: TcpStream) -> Self {
let (r, w) = stream.into_split();
let r = FramedRead::new(r, ReqDecoder::new());
Session {
downstream_reader: r,
downstream_writer: w,
attrs: SessionAttrs {
flags: SessionFlags::None,
sub: 0, ssub: 0,
},
upstream_conn: None,
dw_conn: None,
req_start: Instant::now(),
upstream_start: Instant::now(),
upstream_elapsed: Default::default(),
upstream_conn_elapsed: Default::default(),
res_is_ok: true,
req_size: 0,
res_size: 0,
}
}
pub fn init_from_req(&mut self, req_pkt: &ReqPkt) {
self.req_size = req_pkt.bytes_total;
self.res_size = 0;
self.req_start = self.downstream_reader.decoder().req_start();
}
#[inline]
pub fn insert_client_flags(&mut self, flags: SessionFlags) {
self.attrs.flags.insert(flags);
}
#[inline]
pub fn remove_client_flags(&mut self, flags: SessionFlags) {
self.attrs.flags.remove(flags);
}
#[inline]
pub fn contains_client_flags(&self, flags: SessionFlags) -> bool {
self.attrs.flags.contains(flags)
}
#[inline]
pub fn in_subscribe(&self) -> bool {
self.attrs.sub > 0
|| self.attrs.ssub > 0
}
#[inline]
pub async fn send_resp_to_downstream(&mut self, data: Bytes) -> anyhow::Result<()> {
self.downstream_writer.write_all(&data).await?;
Ok(())
}
#[inline]
pub async fn read_req_pkt(&mut self) -> Option<anyhow::Result<ReqPkt>> {
self.downstream_reader.next().await
}
#[inline]
pub async fn write_downstream_batch(&mut self, bytes: Vec<ResFramedData>) -> anyhow::Result<()> {
for data in bytes {
self.downstream_writer.write(&data.data).await?;
}
self.downstream_writer.flush().await?;
Ok(())
}
#[inline]
pub async fn write_downstream(&mut self, data: &[u8]) -> anyhow::Result<()> {
self.downstream_writer.write_all(data).await?;
Ok(())
}
}