Skip to content

Commit

Permalink
use type impl trait feature
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <igxnon@gmail.com>
  • Loading branch information
iGxnon committed Feb 6, 2024
1 parent 6ab5653 commit c53c842
Show file tree
Hide file tree
Showing 7 changed files with 399 additions and 192 deletions.
233 changes: 129 additions & 104 deletions src/client/handler/offline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::task::{ready, Context, Poll};

use bytes::{Bytes, BytesMut};
use futures::{Sink, Stream};
use log::{debug, error, info, warn};
use log::debug;
use pin_project_lite::pin_project;

use crate::errors::CodecError;
Expand All @@ -18,23 +18,49 @@ pub(crate) struct Config {
protocol_version: u8,
}

pub(crate) trait HandleOffline: Sized {
fn handle_offline(self, server_addr: SocketAddr, config: Config) -> OfflineHandler<Self>;
}

impl<F> HandleOffline for F
where
F: Stream<Item = (Packet<Frames<BytesMut>>, SocketAddr)>
+ Sink<(Packet<Frames<Bytes>>, SocketAddr), Error = CodecError>,
{
fn handle_offline(self, server_addr: SocketAddr, config: Config) -> OfflineHandler<Self> {
OfflineHandler {
frame: self,
state: State::SendOpenConnectionRequest1(Packet::Unconnected(
unconnected::Packet::OpenConnectionRequest1 {
magic: (),
protocol_version: config.protocol_version,
mtu: config.mtu,
},
)),
mtu: config.mtu,
server_addr,
config,
}
}
}

pin_project! {
pub(crate) struct OfflineHandler<F> {
#[pin]
frame: F,
state: OfflineState,
connected: bool,
state: State,
mtu: u16,
server_addr: SocketAddr,
handshaking: bool,
config: Config,
}
}

enum OfflineState {
Listening,
SendingPrepare(Option<(Packet<Frames<Bytes>>, SocketAddr)>),
SendingFlush,
enum State {
SendOpenConnectionRequest1(Packet<Frames<Bytes>>),
WaitOpenConnectionReply1,
SendOpenConnectionRequest2(Packet<Frames<Bytes>>),
WaitOpenConnectionReply2,
Connected,
}

impl<F> Stream for OfflineHandler<F>
Expand All @@ -48,120 +74,119 @@ where
let mut this = self.project();
loop {
match this.state {
OfflineState::Listening => {
if !*this.connected && !*this.handshaking {
*this.state = OfflineState::SendingPrepare(Some((
Packet::Unconnected(unconnected::Packet::OpenConnectionRequest1 {
magic: (),
protocol_version: this.config.protocol_version,
mtu: this.config.mtu,
}),
*this.server_addr,
)));
*this.handshaking = true;
continue;
}
}
OfflineState::SendingPrepare(pack) => {
State::SendOpenConnectionRequest1(pack) => {
if let Err(err) = ready!(this.frame.as_mut().poll_ready(cx)) {
error!("[offline] send error: {err}");
*this.state = OfflineState::Listening;
debug!(
"[offline] SendingOpenConnectionRequest1 poll_ready error: {err}, retrying"
);
continue;
}
if let Err(err) = this.frame.as_mut().start_send(pack.take().unwrap()) {
error!("[offline] send error: {err}");
*this.state = OfflineState::Listening;
if let Err(err) = this
.frame
.as_mut()
.start_send((pack.clone(), *this.server_addr))
{
debug!(
"[offline] SendingOpenConnectionRequest1 start_send error: {err}, retrying"
);
continue;
}
*this.state = OfflineState::SendingFlush;
continue;
}
OfflineState::SendingFlush => {
if let Err(err) = ready!(this.frame.as_mut().poll_flush(cx)) {
error!("[offline] send error: {err}");
debug!(
"[offline] SendingOpenConnectionRequest1 poll_flush error: {err}, retrying"
);
continue;
}
*this.state = OfflineState::Listening;
*this.state = State::WaitOpenConnectionReply1;
}
}

let Some((packet, addr)) = ready!(this.frame.as_mut().poll_next(cx)) else {
return Poll::Ready(None);
};

let pack = match packet {
Packet::Unconnected(pack) => pack,
Packet::Connected(pack) => {
if *this.connected {
return Poll::Ready(Some((
pack,
Peer {
addr: *this.server_addr,
mtu: *this.mtu,
},
)));
State::WaitOpenConnectionReply1 => {
let Some((pack, addr)) = ready!(this.frame.as_mut().poll_next(cx)) else {
return Poll::Ready(None);
};
if addr != *this.server_addr {
continue;
}
debug!("[offline] ignore connected packet from unconnected server");
*this.handshaking = false;
continue;
let next = match pack {
Packet::Unconnected(unconnected::Packet::OpenConnectionReply1 {
mtu,
..
}) => {
*this.mtu = mtu;
Packet::Unconnected(unconnected::Packet::OpenConnectionRequest2 {
magic: (),
server_address: *this.server_addr,
mtu,
client_guid: this.config.client_guid,
})
}
_ => continue,
};
*this.state = State::SendOpenConnectionRequest2(next);
}
};

let resp = match pack {
unconnected::Packet::OpenConnectionReply1 {
use_encryption,
mtu,
..
} => {
if use_encryption {
debug!("use_encryption enabled, failed connection");
*this.handshaking = false;
State::SendOpenConnectionRequest2(pack) => {
if let Err(err) = ready!(this.frame.as_mut().poll_ready(cx)) {
debug!(
"[offline] SendOpenConnectionRequest2 poll_ready error: {err}, retrying"
);
continue;
}
*this.mtu = mtu;
unconnected::Packet::OpenConnectionRequest2 {
magic: (),
server_address: addr,
mtu,
client_guid: this.config.client_guid,
if let Err(err) = this
.frame
.as_mut()
.start_send((pack.clone(), *this.server_addr))
{
debug!(
"[offline] SendOpenConnectionRequest2 start_send error: {err}, retrying"
);
continue;
}
}
unconnected::Packet::OpenConnectionReply2 { mtu, .. } => {
*this.mtu = mtu;
*this.connected = true;
*this.handshaking = false;
debug!("connected to the server");
continue;
}
unconnected::Packet::IncompatibleProtocol {
server_protocol, ..
} => {
if !*this.connected {
*this.handshaking = false;
info!("failed to connect to server, got incompatible protocol error, server protocol: {server_protocol}");
if let Err(err) = ready!(this.frame.as_mut().poll_flush(cx)) {
debug!(
"[offline] SendOpenConnectionRequest2 poll_flush error: {err}, retrying"
);
continue;
}
continue;
*this.state = State::WaitOpenConnectionReply2;
}
unconnected::Packet::AlreadyConnected { .. } => {
if !*this.connected {
*this.handshaking = false;
info!("failed to connect to server, got already connected error");
State::WaitOpenConnectionReply2 => {
let Some((pack, addr)) = ready!(this.frame.as_mut().poll_next(cx)) else {
return Poll::Ready(None);
};
if addr != *this.server_addr {
continue;
}
continue;
}
unconnected::Packet::ConnectionRequestFailed { .. } => {
info!("failed to handshake with server, got connection request failed");
continue;
match pack {
Packet::Unconnected(unconnected::Packet::OpenConnectionReply2 {
mtu,
..
}) => {
*this.mtu = mtu;
}
_ => continue,
};
*this.state = State::Connected;
}
_ => {
warn!(
"received a package({:?}) that should not be received on the client.",
pack.pack_type()
);
continue;
State::Connected => {
let Some((pack, addr)) = ready!(this.frame.as_mut().poll_next(cx)) else {
return Poll::Ready(None);
};
if addr != *this.server_addr {
continue;
}
match pack {
Packet::Connected(pack) => {
return Poll::Ready(Some((
pack,
Peer {
addr,
mtu: *this.mtu,
},
)))
}
_ => continue,
};
}
};

*this.state = OfflineState::SendingPrepare(Some((Packet::Unconnected(resp), addr)));
}
}
}
}

0 comments on commit c53c842

Please sign in to comment.