-
-
Notifications
You must be signed in to change notification settings - Fork 807
/
session.rs
196 lines (175 loc) · 6.34 KB
/
session.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
//! `ClientSession` is an actor, it manages peer tcp connection and
//! proxies commands from peer to `ChatServer`.
use std::{
io, net,
str::FromStr,
time::{Duration, Instant},
};
use actix::{prelude::*, spawn};
use tokio::{
io::{split, WriteHalf},
net::{TcpListener, TcpStream},
};
use tokio_util::codec::FramedRead;
use crate::{
codec::{ChatCodec, ChatRequest, ChatResponse},
server::{self, ChatServer},
};
/// Chat server sends this messages to session
#[derive(Message)]
#[rtype(result = "()")]
pub struct Message(pub String);
/// `ChatSession` actor is responsible for tcp peer communications.
pub struct ChatSession {
/// unique session id
id: usize,
/// this is address of chat server
addr: Addr<ChatServer>,
/// Client must send ping at least once per 10 seconds, otherwise we drop
/// connection.
hb: Instant,
/// joined room
room: String,
/// Framed wrapper
framed: actix::io::FramedWrite<ChatResponse, WriteHalf<TcpStream>, ChatCodec>,
}
impl Actor for ChatSession {
/// For tcp communication we are going to use `FramedContext`.
/// It is convenient wrapper around `Framed` object from `tokio_io`
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
// we'll start heartbeat process on session start.
self.hb(ctx);
// register self in chat server. `AsyncContext::wait` register
// future within context, but context waits until this future resolves
// before processing any other events.
let addr = ctx.address();
self.addr
.send(server::Connect {
addr: addr.recipient(),
})
.into_actor(self)
.then(|res, act, ctx| {
match res {
Ok(res) => act.id = res,
// something is wrong with chat server
_ => ctx.stop(),
}
actix::fut::ready(())
})
.wait(ctx);
}
fn stopping(&mut self, _: &mut Self::Context) -> Running {
// notify chat server
self.addr.do_send(server::Disconnect { id: self.id });
Running::Stop
}
}
impl actix::io::WriteHandler<io::Error> for ChatSession {}
/// To use `Framed` we have to define Io type and Codec
impl StreamHandler<Result<ChatRequest, io::Error>> for ChatSession {
/// This is main event loop for client requests
fn handle(&mut self, msg: Result<ChatRequest, io::Error>, ctx: &mut Context<Self>) {
match msg {
Ok(ChatRequest::List) => {
// Send ListRooms message to chat server and wait for response
println!("List rooms");
self.addr
.send(server::ListRooms)
.into_actor(self)
.then(|res, act, _| {
match res {
Ok(rooms) => {
act.framed.write(ChatResponse::Rooms(rooms));
}
_ => println!("Something is wrong"),
}
actix::fut::ready(())
})
.wait(ctx)
// .wait(ctx) pauses all events in context,
// so actor wont receive any new messages until it get list of rooms back
}
Ok(ChatRequest::Join(name)) => {
println!("Join to room: {name}");
name.clone_into(&mut self.room);
self.addr.do_send(server::Join {
id: self.id,
name: name.clone(),
});
self.framed.write(ChatResponse::Joined(name));
}
Ok(ChatRequest::Message(message)) => {
// send message to chat server
println!("Peer message: {message}");
self.addr.do_send(server::Message {
id: self.id,
msg: message,
room: self.room.clone(),
})
}
// we update heartbeat time on ping from peer
Ok(ChatRequest::Ping) => self.hb = Instant::now(),
_ => ctx.stop(),
}
}
}
/// Handler for Message, chat server sends this message, we just send string to
/// peer
impl Handler<Message> for ChatSession {
type Result = ();
fn handle(&mut self, msg: Message, _: &mut Context<Self>) {
// send message to peer
self.framed.write(ChatResponse::Message(msg.0));
}
}
/// Helper methods
impl ChatSession {
pub fn new(
addr: Addr<ChatServer>,
framed: actix::io::FramedWrite<ChatResponse, WriteHalf<TcpStream>, ChatCodec>,
) -> ChatSession {
ChatSession {
id: 0,
addr,
hb: Instant::now(),
room: "main".to_owned(),
framed,
}
}
/// helper method that sends ping to client every second.
///
/// also this method check heartbeats from client
fn hb(&self, ctx: &mut Context<Self>) {
ctx.run_interval(Duration::new(1, 0), |act, ctx| {
// check client heartbeats
if Instant::now().duration_since(act.hb) > Duration::new(10, 0) {
// heartbeat timed out
println!("Client heartbeat failed, disconnecting!");
// notify chat server
act.addr.do_send(server::Disconnect { id: act.id });
// stop actor
ctx.stop();
}
act.framed.write(ChatResponse::Ping);
// if we can not send message to sink, sink is closed (disconnected)
});
}
}
/// Define TCP server that will accept incoming TCP connection and create
/// chat actors.
pub fn tcp_server(_s: &str, server: Addr<ChatServer>) {
// Create server listener
let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap();
spawn(async move {
let listener = TcpListener::bind(&addr).await.unwrap();
while let Ok((stream, _)) = listener.accept().await {
let server = server.clone();
ChatSession::create(|ctx| {
let (r, w) = split(stream);
ChatSession::add_stream(FramedRead::new(r, ChatCodec), ctx);
ChatSession::new(server, actix::io::FramedWrite::new(w, ChatCodec, ctx))
});
}
});
}