Skip to content

Commit

Permalink
hvsock: Add new command Upgrade <port_num> to shake hand
Browse files Browse the repository at this point in the history
This commit add a simple handshake interaction like WebSocket:
A new command called Upgrade <port_num>\n to request connect port.
hybrid vsock response 101\n (Switching Protocol) if connected.
hybrid vsock response 504\n (Service Unavailable) if guest has not
listened on the port.

Signed-off-by: Hui Zhu <teawater@antfin.com>
  • Loading branch information
teawater committed Dec 3, 2019
1 parent 28b18c8 commit d77b8a2
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 17 deletions.
38 changes: 34 additions & 4 deletions src/devices/src/virtio/vsock/csm/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ pub struct VsockConnection<S: Read + Write + AsRawFd> {
/// Instant when this connection should be scheduled for immediate termination, due to some
/// timeout condition having been fulfilled.
expiry: Option<Instant>,
/// If this true, Reply the connection status before transfer data or close this connection.
need_reply: bool,
}

impl<S> VsockChannel for VsockConnection<S>
Expand Down Expand Up @@ -304,9 +306,34 @@ where
// Next up: receiving a response / confirmation for a host-initiated connection.
// We'll move to an Established state, and pass on the good news through the host
// stream.
ConnState::LocalInit if pkt.op() == uapi::VSOCK_OP_RESPONSE => {
self.expiry = None;
self.state = ConnState::Established;
ConnState::LocalInit if pkt.op() == uapi::VSOCK_OP_RESPONSE || pkt.op() == uapi::VSOCK_OP_RST => {
let mut is_response = false;
if pkt.op() == uapi::VSOCK_OP_RESPONSE {
is_response = true;
}
if self.need_reply {
self.need_reply = false;
let mut reply = b"503\n";
if is_response {
reply = b"101\n";
}
if let Err(err) = self.send_bytes(reply) {
// If we can't write to the host stream, that's an unrecoverable error, so
// we'll terminate this connection.
warn!(
"vsock: error writing to local stream (lp={}, pp={}): {:?}",
self.local_port, self.peer_port, err
);
if is_response {
self.kill();
}
return Ok(());
}
}
if is_response {
self.expiry = None;
self.state = ConnState::Established;
}
}

// The peer wants to shut down an established connection. If they have nothing
Expand Down Expand Up @@ -478,6 +505,7 @@ where
last_fwd_cnt_to_peer: Wrapping(0),
pending_rx: PendingRxSet::from(PendingRx::Response),
expiry: None,
need_reply: false,
}
}

Expand All @@ -488,6 +516,7 @@ where
peer_cid: u64,
local_port: u32,
peer_port: u32,
need_reply: bool,
) -> Self {
Self {
local_cid,
Expand All @@ -504,6 +533,7 @@ where
last_fwd_cnt_to_peer: Wrapping(0),
pending_rx: PendingRxSet::from(PendingRx::Request),
expiry: None,
need_reply: need_reply,
}
}

Expand Down Expand Up @@ -756,7 +786,7 @@ mod tests {
PEER_BUF_ALLOC,
),
ConnState::LocalInit => VsockConnection::<TestStream>::new_local_init(
stream, LOCAL_CID, PEER_CID, LOCAL_PORT, PEER_PORT,
stream, LOCAL_CID, PEER_CID, LOCAL_PORT, PEER_PORT, false,
),
ConnState::Established => {
let mut conn = VsockConnection::<TestStream>::new_peer_init(
Expand Down
32 changes: 19 additions & 13 deletions src/devices/src/virtio/vsock/unix/muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,20 +227,19 @@ impl VsockChannel for VsockMuxer {
return Ok(());
}

// Alright, everything looks in order - forward this packet to its owning connection.
let mut res: VsockResult<()> = Ok(());
self.apply_conn_mutation(conn_key, |conn| {
res = conn.send_pkt(pkt);
});

// Right, we know where to send this packet, then (to `conn_key`).
// However, if this is an RST, we have to forcefully terminate the connection, so
// there's no point in forwarding it the packet.
if pkt.op() == uapi::VSOCK_OP_RST {
self.remove_connection(conn_key);
return Ok(());
}

// Alright, everything looks in order - forward this packet to its owning connection.
let mut res: VsockResult<()> = Ok(());
self.apply_conn_mutation(conn_key, |conn| {
res = conn.send_pkt(pkt);
});

res
}

Expand Down Expand Up @@ -381,8 +380,8 @@ impl VsockMuxer {
Some(EpollListener::LocalStream(_)) => {
if let Some(EpollListener::LocalStream(mut stream)) = self.remove_listener(fd) {
Self::read_local_stream_port(&mut stream)
.and_then(|peer_port| Ok((self.allocate_local_port(), peer_port)))
.and_then(|(local_port, peer_port)| {
.and_then(|(peer_port, need_reply)| Ok((self.allocate_local_port(), peer_port, need_reply)))
.and_then(|(local_port, peer_port, need_reply)| {
self.add_connection(
ConnMapKey {
local_port,
Expand All @@ -394,6 +393,7 @@ impl VsockMuxer {
self.cid,
local_port,
peer_port,
need_reply,
),
)
})
Expand All @@ -409,8 +409,8 @@ impl VsockMuxer {
}
}

/// Parse a host "connect" command, and extract the destination vsock port.
fn read_local_stream_port(stream: &mut UnixStream) -> Result<u32> {
/// Parse a host "connect" and "upgrade" command, and extract the destination vsock port.
fn read_local_stream_port(stream: &mut UnixStream) -> Result<(u32, bool)> {
let mut buf = [0u8; 32];

// This is the minimum number of bytes that we should be able to read, when parsing a
Expand All @@ -437,19 +437,25 @@ impl VsockMuxer {
.map_err(|_| Error::InvalidPortRequest)?
.split_whitespace();

word_iter
let mut need_reply = false;
let port = word_iter
.next()
.ok_or(Error::InvalidPortRequest)
.and_then(|word| {
if word.to_lowercase() == "connect" {
Ok(())
} else if word.to_lowercase() == "upgrade" {
need_reply = true;
Ok(())
} else {
Err(Error::InvalidPortRequest)
}
})
.and_then(|_| word_iter.next().ok_or(Error::InvalidPortRequest))
.and_then(|word| word.parse::<u32>().map_err(|_| Error::InvalidPortRequest))
.map_err(|_| Error::InvalidPortRequest)
.map_err(|_| Error::InvalidPortRequest)?;

Ok((port, need_reply))
}

/// Add a new connection to the active connection pool.
Expand Down

0 comments on commit d77b8a2

Please sign in to comment.