Skip to content

Commit

Permalink
Read RESP data type from request and echo it back to client
Browse files Browse the repository at this point in the history
  • Loading branch information
dheerajgopi committed Aug 27, 2024
1 parent db9328b commit 30a65f7
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"

[dependencies]
anyhow = "1.0.86"
bytes = "1.6.0"
tokio = { version = "1.38.0", features = [
"rt-multi-thread",
"macros",
Expand Down
5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// src/main.rs

// Include the server module defined in server.rs
mod resp;
mod server;

use crate::server::Server;
Expand All @@ -24,7 +25,7 @@ async fn main() -> Result<()> {
Ok(tcp_listener) => {
info!("TCP listener started on port 6379");
tcp_listener
},
}
// If there is an error, panic and print the error message
// This could happen if the port is already in use, for example
Err(e) => panic!("Could not bind the TCP listener to {}. Err: {}", &addr, e),
Expand All @@ -40,4 +41,4 @@ async fn main() -> Result<()> {
// This Ok(()) is technically unreachable as server.run() loops infinitely,
// but it's needed to satisfy the Result return type of main()
Ok(())
}
}
22 changes: 22 additions & 0 deletions src/resp/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
pub mod types;

/// Represents errors that can occur during RESP parsing.
#[derive(Debug)]
pub enum RespError {
/// Represents an error in parsing a bulk string, with an error message.
InvalidBulkString(String),
/// Represents an error in parsing a simple string, with an error message.
InvalidSimpleString(String),
/// Represents any other error with a descriptive message.
Other(String),
}

impl std::fmt::Display for RespError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RespError::Other(msg) => msg.as_str().fmt(f),
RespError::InvalidBulkString(msg) => msg.as_str().fmt(f),
RespError::InvalidSimpleString(msg) => msg.as_str().fmt(f),
}
}
}
163 changes: 163 additions & 0 deletions src/resp/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/// This enum is a wrapper for the different data types in RESP.
#[derive(Clone, Debug)]
pub enum RespType {
/// Refer <https://redis.io/docs/latest/develop/reference/protocol-spec/#simple-strings>
SimpleString(String),
/// Refer <https://redis.io/docs/latest/develop/reference/protocol-spec/#bulk-strings>
BulkString(String),
/// Refer <https://redis.io/docs/latest/develop/reference/protocol-spec/#simple-errors>
SimpleError(String),
}

use super::RespError;
use bytes::{Bytes, BytesMut};

impl RespType {
/// Parse the given bytes into its respective RESP type and return the parsed RESP value and
/// the number of bytes read from the buffer.
///
/// More details on the parsing logic is available at
/// <https://redis.io/docs/latest/develop/reference/protocol-spec/#resp-protocol-description>.
///
/// # Errors
/// Error will be returned in the following scenarios:
/// - If first byte is an invalid character.
/// - If the parsing fails due to encoding issues etc.
pub fn parse(buffer: BytesMut) -> Result<(RespType, usize), RespError> {
let c = buffer[0] as char;
return match c {
'$' => Self::parse_bulk_string(buffer),
'+' => Self::parse_simple_string(buffer),
_ => Err(RespError::Other(String::from("Invalid RESP data type"))),
};
}

/// Parse the given bytes into a BulkString RESP value. This will return the parsed RESP
/// value and the number of bytes read from the buffer.
///
/// Example BulkString: `$5\r\nhello\r\n`
///
/// # BulkString Parts:
/// ```
/// $ | 5 | \r\n | hello | \r\n
/// identifier | string length in bytes | CRLF | string value | CRLF
/// ```
///
/// # Parsing Logic:
/// - The buffer is read until CRLF characters ("\r\n") are encountered.
/// - That slice of bytes are then parsed into an int. That will be the string length in bytes (let's say `bulkstr_len`)
/// - `bulkstr_len` number of bytes are read from the buffer again from where it was stopped previously.
/// - This 2nd slice of bytes is then parsed into an UTF-8 string.
///
/// Note: The first byte in the buffer is skipped since it's just an identifier for the
/// RESP type and is not the part of the actual value itself.
pub fn parse_bulk_string(buffer: BytesMut) -> Result<(RespType, usize), RespError> {
// read until CRLF and parse length
let (bulkstr_len, bytes_consumed) =
if let Some((buf_data, len)) = Self::read_till_crlf(&buffer[1..]) {
let bulkstr_len = Self::parse_usize_from_buf(buf_data)?;
(bulkstr_len, len + 1)
} else {
return Err(RespError::InvalidBulkString(String::from(
"Invalid value for bulk string",
)));
};

// validate if buffer contains the complete string data based on
// the length parsed in the previous step.
let bulkstr_end_idx = bytes_consumed + bulkstr_len as usize;
if bulkstr_end_idx >= buffer.len() {
return Err(RespError::InvalidBulkString(String::from(
"Invalid value for bulk string length",
)));
}

// convert raw bytes into UTF-8 string.
let bulkstr = String::from_utf8(buffer[bytes_consumed..bulkstr_end_idx].to_vec());

match bulkstr {
Ok(bs) => Ok((RespType::BulkString(bs), bulkstr_end_idx + 2)),
Err(_) => Err(RespError::InvalidBulkString(String::from(
"Bulk string value is not a valid UTF-8 string",
))),
}
}

/// Parse the given bytes into a SimpleString RESP value. This will return the parsed RESP
/// value and the number of bytes read from the buffer.
///
/// Example SimpleString: `+OK\r\n`
///
/// # SimpleString Parts:
/// ```
/// + | OK | \r\n
/// identifier | string value | CRLF
/// ```
///
/// # Parsing Logic:
/// - The buffer is read until CRLF characters ("\r\n") are encountered. That slice of bytes are then
/// parsed into an UTF-8 string.
pub fn parse_simple_string(buffer: BytesMut) -> Result<(RespType, usize), RespError> {
// read until CRLF and parse the bytes into an UTF-8 string.
if let Some((buf_data, len)) = Self::read_till_crlf(&buffer[1..]) {
let utf8_str = String::from_utf8(buf_data.to_vec());

return match utf8_str {
Ok(simple_str) => Ok((RespType::SimpleString(simple_str), len + 1)),
Err(_) => {
return Err(RespError::InvalidSimpleString(String::from(
"Simple string value is not a valid UTF-8 string",
)))
}
};
}

Err(RespError::InvalidSimpleString(String::from(
"Invalid value for simple string",
)))
}

/// Convert the RESP value into its byte values.
pub fn to_bytes(&self) -> Bytes {
return match self {
RespType::SimpleString(ss) => Bytes::from_iter(format!("+{}\r\n", ss).into_bytes()),
RespType::BulkString(bs) => {
let bulkstr_bytes = format!("${}\r\n{}\r\n", bs.chars().count(), bs).into_bytes();
Bytes::from_iter(bulkstr_bytes)
}
RespType::SimpleError(es) => Bytes::from_iter(format!("-{}\r\n", es).into_bytes()),
};
}

// Read the bytes till reaching CRLF ("\r\n")
fn read_till_crlf(buf: &[u8]) -> Option<(&[u8], usize)> {
for i in 1..buf.len() {
if buf[i - 1] == b'\r' && buf[i] == b'\n' {
return Some((&buf[0..(i - 1)], i + 1));
}
}

None
}

// Parse usize from bytes. The number is provided in string format.
// So convert raw bytes into UTF-8 string and then convert the string
// into usize.
fn parse_usize_from_buf(buf: &[u8]) -> Result<usize, RespError> {
let utf8_str = String::from_utf8(buf.to_vec());
let parsed_int = match utf8_str {
Ok(s) => {
let int = s.parse::<usize>();
match int {
Ok(n) => Ok(n),
Err(_) => Err(RespError::Other(String::from(
"Invalid value for an integer",
))),
}
}
Err(_) => Err(RespError::Other(String::from("Invalid UTF-8 string"))),
};

parsed_int
}
}
24 changes: 20 additions & 4 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
// anyhow provides the Error and Result types for convenient error handling
use anyhow::{Error, Result};

use bytes::BytesMut;
// log crate provides macros for logging at various levels (error, warn, info, debug, trace)
use log::error;

use tokio::{
// AsyncWriteExt trait provides asynchronous write methods like write_all
io::AsyncWriteExt,
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
};

use crate::resp::types::RespType;

/// The Server struct holds the tokio TcpListener which listens for
/// incoming TCP connections.
#[derive(Debug)]
Expand Down Expand Up @@ -44,8 +47,21 @@ impl Server {
// Spawn a new asynchronous task to handle the connection.
// This allows the server to handle multiple connections concurrently.
tokio::spawn(async move {
// Write a "Hello!" message to the client.
if let Err(e) = &mut sock.write_all("Hello!".as_bytes()).await {
// read the TCP message and move the raw bytes into a buffer
let mut buffer = BytesMut::with_capacity(512);
if let Err(e) = sock.read_buf(&mut buffer).await {
panic!("Error reading request: {}", e);
}

// Try parsing the RESP data from the bytes in the buffer.
// If parsing fails return the error message as a RESP SimpleError data type.
let resp_data = match RespType::parse(buffer) {
Ok((data, _)) => data,
Err(e) => RespType::SimpleError(format!("{}", e)),
};

// Echo the RESP message back to the client.
if let Err(e) = &mut sock.write_all(&resp_data.to_bytes()[..]).await {
// Log the error and panic if there is an issue writing the response.
error!("{}", e);
panic!("Error writing response")
Expand All @@ -70,4 +86,4 @@ impl Server {
}
}
}
}
}

0 comments on commit 30a65f7

Please sign in to comment.