diff --git a/src/bin/cli.rs b/src/bin/cli.rs index da42d9a..7ac5079 100644 --- a/src/bin/cli.rs +++ b/src/bin/cli.rs @@ -1,4 +1,4 @@ -//! A Redis CLI application to interact with the Redis server. +//! A Redis CLI application. use tokio::sync::mpsc; diff --git a/src/client.rs b/src/client.rs index 7dd0680..12ab12c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -62,13 +62,12 @@ impl Client { self.conn.write_frame(&frame).await?; - // todo: read response from the server and return to the client match self.read_response().await? { Some(data) => { let resp = String::from_utf8(data.to_vec()).unwrap(); Ok(resp) } - None => Err(wrap_error(RedisError::Other("Unknown error".to_string()))), + None => Err(wrap_error(RedisError::Other("Unknown error".into()))), } } @@ -93,14 +92,10 @@ impl Client { async fn read_response(&mut self) -> Result> { match self.conn.read_frame().await? { Some(Frame::SimpleString(data)) => Ok(Some(Bytes::from(data))), - Some(Frame::SimpleError(data)) => Err(wrap_error(RedisError::Other(data))), - Some(Frame::BulkString(data)) => Ok(Some(Bytes::from(data))), - Some(_) => Err(wrap_error(RedisError::Other( - "Unknown frame type: not implemented".to_string(), - ))), - None => Err(wrap_error(RedisError::Other( - "Error reading frame".to_string(), - ))), + Some(Frame::SimpleError(data)) => Err(wrap_error(RedisError::Other(data.into()))), + Some(Frame::BulkString(data)) => Ok(Some(data)), + Some(_) => unimplemented!(), + None => Err(wrap_error(RedisError::Other("Unknown error".into()))), } } } diff --git a/src/cmd.rs b/src/cmd.rs index cd03985..6e3d2f0 100644 --- a/src/cmd.rs +++ b/src/cmd.rs @@ -1,5 +1,7 @@ //! Redis commands. +use bytes::Bytes; + use crate::Frame; /// A trait for all Redis commands. @@ -8,11 +10,13 @@ pub trait Command { fn into_stream(self) -> Frame; } +/// A Redis PING command. +/// +/// Useful for testing whether a connection is still alive, or to measure latency. pub struct Ping { msg: Option, } -/// Implements the Redis Ping command. impl Ping { /// Creates a new Ping command. /// @@ -23,6 +27,12 @@ impl Ping { /// # Returns /// /// A new Ping command + /// + /// # Examples + /// + /// ```ignore + /// let ping = Ping::new(Some("hello".into())); + /// ``` pub fn new(msg: Option) -> Self { Self { msg } } @@ -32,10 +42,11 @@ impl Command for Ping { /// Converts the ping command into a Frame to be transimitted over the stream. fn into_stream(self) -> Frame { let mut frame: Frame = Frame::array(); - frame.push_bulk_str("ping".into()); + frame.push_frame_to_array(Frame::BulkString("ping".into())); + // do not push the message if it is None if let Some(msg) = self.msg { - frame.push_bulk_str(msg); + frame.push_frame_to_array(Frame::BulkString(Bytes::from(msg))); } frame diff --git a/src/connection.rs b/src/connection.rs index 4aa0634..8f31e12 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -2,6 +2,7 @@ use crate::Frame; use crate::RedisError; use crate::Result; use crate::error::wrap_error; +use bytes::Buf; use bytes::{Bytes, BytesMut}; use std::io::Cursor; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; @@ -57,7 +58,9 @@ impl Connection { if self.buffer.is_empty() { return Ok(None); } else { - return Err(wrap_error(RedisError::Other("Unknown error".to_string()))); + return Err(wrap_error(RedisError::Other( + "Connection reset by peer".into(), + ))); } } } @@ -97,25 +100,20 @@ impl Connection { /// None if the Frame is incomplete and more data is needed. /// An error if the Frame is invalid. async fn try_parse_frame(&mut self) -> Result> { - let mut buf: Cursor<&[u8]> = Cursor::new(&self.buffer[..]); + let mut cursor: Cursor<&[u8]> = Cursor::new(&self.buffer[..]); - match Frame::check(&mut buf).await { - // Ok means we can parse a complete frame - Ok(()) => { - let len = buf.position() as usize; - - let bytes = self.buffer.split_to(len).freeze(); - - // once we have read the frame, we can advance the buffer - println!("try_parse_frame: len={len}, bytes={bytes:?}"); - - Ok(Some(Frame::deserialize(bytes).await?)) + match Frame::try_parse(&mut cursor) { + Ok(frame) => { + self.buffer.advance(cursor.position() as usize); + Ok(Some(frame)) + } + Err(err) => { + if let Some(RedisError::IncompleteFrame) = err.downcast_ref::() { + Ok(None) + } else { + Err(err) + } } - Err(err) => match &*err { - // IncompleteFrame means we need to read more data - RedisError::IncompleteFrame => Ok(None), - _ => Err(err), - }, } } } diff --git a/src/error.rs b/src/error.rs index e31a40d..6c44b50 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,30 +1,24 @@ //! Custom error handling for Redis client and a specialized Result type //! used as the return type for Redis operations. +//! +//! todo: implement From trait for RedisError so that we can capture more built in e -use std::{error, fmt, io, result, sync}; +use std::{error, fmt, io, result}; /// Represents errors that can occur when working with Redis. #[derive(Debug)] pub enum RedisError { - /// An I/O error that occurred while working with a Redis connection. - Io(io::Error), /// An incomplete frame was received when reading from the socket. IncompleteFrame, /// An invalid frame was received when reading from the socket. According to RESP3 spec. InvalidFrame, - Other(String), -} - -impl From for RedisError { - fn from(err: io::Error) -> Self { - RedisError::Io(err) - } + /// Generic error type. + Other(Error), } impl fmt::Display for RedisError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - RedisError::Io(err) => write!(f, "IO error: {}", err), RedisError::IncompleteFrame => write!(f, "incomplete frame"), RedisError::InvalidFrame => write!(f, "invalid frame"), RedisError::Other(s) => write!(f, "other error: {}", s), @@ -35,12 +29,31 @@ impl fmt::Display for RedisError { // Implement std::error::Error for RedisError. impl error::Error for RedisError {} -type Error = sync::Arc; +impl From for RedisError { + fn from(err: io::Error) -> Self { + RedisError::Other(err.into()) + } +} + +impl From for RedisError { + fn from(val: String) -> Self { + RedisError::Other(val.into()) + } +} -/// Helper function to wrap errors into Arc. -pub fn wrap_error>(err: E) -> Error { - sync::Arc::new(err.into()) +impl From<&str> for RedisError { + fn from(val: &str) -> Self { + RedisError::Other(val.into()) + } } +/// Boxed generic error types. +type Error = Box; + /// A specialized `Result` type for Redis operations. pub type Result = result::Result; + +/// Helper function to wrap errors into Box. +pub fn wrap_error>(err: E) -> Error { + Box::new(err.into()) +} diff --git a/src/frame.rs b/src/frame.rs index c81b51e..861862c 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -3,25 +3,33 @@ use crate::{RedisError, Result, error::wrap_error}; use bytes::{Buf, Bytes, BytesMut}; -use std::io::BufRead; +use std::io::{BufRead, Cursor}; + +#[derive(Debug, PartialEq)] +pub struct BigInt { + sign: bool, + data: Vec, +} /// Frame represents a single RESP data transmit unit over the socket. +/// +/// more on the RESP protocol can be found [here](https://redis.io/topics/protocol) #[derive(Debug, PartialEq)] pub enum Frame { - /// [Simple strings](https://redis.io/docs/latest/develop/reference/protocol-spec/#simple-strings) SimpleString(String), - /// [Simple errors](https://redis.io/docs/latest/develop/reference/protocol-spec/#simple-errors) SimpleError(String), - /// [Integers](https://redis.io/docs/latest/develop/reference/protocol-spec/#integers) Integer(i64), - /// [Bulk strings](https://redis.io/docs/latest/develop/reference/protocol-spec/#bulk-strings) - BulkString(String), - /// [Arrays](https://redis.io/docs/latest/develop/reference/protocol-spec/#arrays) + BulkString(Bytes), Array(Vec), - /// [Nulls](https://redis.io/docs/latest/develop/reference/protocol-spec/#nulls) Null, - /// [Booleans](https://redis.io/docs/latest/develop/reference/protocol-spec/#booleans) Boolean(bool), + Double(f64), + BigNumber(BigInt), + BulkError(Bytes), + Map(Vec<(Frame, Frame)>), + Attribute, + Set(Vec), + Push, } impl Frame { @@ -30,24 +38,28 @@ impl Frame { Frame::Array(Vec::new()) } - /// A utility method to push a new BulkString Frame into an Array Frame. + /// A utility method to push a Frame into an Array Frame. /// /// # Arguments /// - /// * `item` - A string to be pushed into the Array Frame + /// * `frame` - A Frame to be pushed into the Array /// /// # Panics /// /// This method will panic if the Frame is not an Array - pub fn push_bulk_str(&mut self, item: String) { + pub fn push_frame_to_array(&mut self, frame: Frame) { match self { - Frame::Array(vec) => vec.push(Frame::BulkString(item)), + Frame::Array(vec) => vec.push(frame), _ => unimplemented!(), } } /// Serializes a Frame into a bytes buffer. /// + /// The returned value is a smart pointer only counting reference. It is cheap to clone. + /// Caller can get the underlying slice by calling `as_slice` or `as_ref` on the returned value. + /// It is almost 0 cost to get the slice. + /// /// # Returns /// /// A Result containing the serialized bytes buffer @@ -56,7 +68,9 @@ impl Frame { Frame::SimpleString(val) => { let mut buf = BytesMut::with_capacity(val.len() + 3); + // + indicates it is a simple string buf.extend_from_slice(b"+"); + // encode the string value buf.extend_from_slice(val.as_bytes()); buf.extend_from_slice(b"\r\n"); @@ -65,19 +79,35 @@ impl Frame { Frame::SimpleError(val) => { let mut buf = BytesMut::with_capacity(val.len() + 3); + // - indicates it is an error buf.extend_from_slice(b"-"); + // encode the error message buf.extend_from_slice(val.as_bytes()); buf.extend_from_slice(b"\r\n"); Ok(buf.freeze()) } + Frame::Integer(val) => { + let mut buf = BytesMut::with_capacity(20); + + // : indicates it is an integer + buf.extend_from_slice(b":"); + // encode the integer value + buf.extend_from_slice(val.to_string().as_bytes()); + buf.extend_from_slice(b"\r\n"); + + Ok(buf.freeze()) + } Frame::BulkString(val) => { let mut buf = BytesMut::with_capacity(val.len() + 5); + // * indicates it is a bulk string buf.extend_from_slice(b"$"); + // encode the length of the binary string buf.extend_from_slice(val.len().to_string().as_bytes()); buf.extend_from_slice(b"\r\n"); - buf.extend_from_slice(val.as_bytes()); + // encode the binary string + buf.extend_from_slice(val.as_ref()); buf.extend_from_slice(b"\r\n"); Ok(buf.freeze()) @@ -85,122 +115,155 @@ impl Frame { Frame::Array(frame_vec) => { let mut buf = BytesMut::new(); + // * indicates it is an array buf.extend_from_slice(b"*"); + // encode the number of elements in the array buf.extend_from_slice(frame_vec.len().to_string().as_bytes()); buf.extend_from_slice(b"\r\n"); + // encode each element in the array for frame in frame_vec { buf.extend_from_slice(&Box::pin(frame.serialize()).await?); } Ok(buf.freeze()) } + Frame::Null => { + let mut buf = BytesMut::with_capacity(3); + + // _ indicates it is a null + buf.extend_from_slice(b"_\r\n"); + + Ok(buf.freeze()) + } + Frame::Boolean(val) => { + todo!("Boolean serialization is not implemented yet {:?}", val) + } + Frame::Double(val) => { + todo!("Double serialization is not implemented yet {:?}", val) + } + Frame::BulkError(val) => { + todo!("BulkError serialization is not implemented yet {:?}", val) + } _ => unimplemented!(), } } /// Deserializes from the buffer into a Frame. /// + /// The method reads from the buffer and parses it into a Frame. + /// /// # Arguments /// - /// * `bytes` - A buffer containing the serialized Frame + /// * `buf` - An immutable read buffer containing the serialized Frame /// /// # Returns /// /// A Result containing the deserialized Frame - pub async fn deserialize(bytes: Bytes) -> Result { - // todo: implement deserialization - match bytes[0] { - b'+' => { - // Simple string, slicing to ignore the leading + and ending CRLF char - let bytes = &bytes[1..bytes.len() - 2]; - Ok(Frame::SimpleString( - String::from_utf8(bytes.to_vec()).unwrap(), - )) - } - b'-' => { - // Simple error, slicing to ignore the leading - and ending CRLF char - let bytes = &bytes[1..bytes.len() - 2]; - Ok(Frame::SimpleError( - String::from_utf8(bytes.to_vec()).unwrap(), - )) - } - b'$' => { - // Bulk string, slicing to ignore the leading $ and ending CRLF char - let bytes = &bytes[1..]; - let mut reader = bytes.reader(); - - let mut buf_str1 = String::new(); - let mut buf_str2 = String::new(); - - let _ = reader.read_line(&mut buf_str1).unwrap(); - let _ = reader.read_line(&mut buf_str2).unwrap(); - - Ok(Frame::BulkString( - buf_str2.trim_end_matches("\r\n").to_string(), - )) - } - _ => unimplemented!(), - } + pub async fn deserialize(buf: Bytes) -> Result { + // the cursor is almost zero cost as it is just a smart ptr to the buffer + Frame::try_parse(&mut Cursor::new(&buf[..])) } - /// Checks whether the buffer contains a complete RESP frame starting from the current position. + /// Tries parsing a Frame from the buffer. /// - /// # Arguments - /// * `buf` - A mutable buffer with a cursor to be checked + /// This method wraps the input with a cursor to track the current version as we need to make resursive calls. + /// Using a cursor avoids the need to split the buffer or passing an additional parameter. /// /// # Returns /// - /// * `Ok(())` if the buffer contains a complete frame + /// * `Ok(usize)` if the buffer contains a complete frame, the number of bytes needed to parse the frame /// * `Err(RedisError::IncompleteFrame)` if the buffer contains an incomplete frame /// * `Err(RedisError::InvalidFrame)` if the buffer contains an invalid frame - pub async fn check(buf: &mut impl Buf) -> Result<()> { - if buf.remaining() == 0 { + pub fn try_parse(cursor: &mut Cursor<&[u8]>) -> Result { + if !cursor.has_remaining() { return Err(wrap_error(RedisError::IncompleteFrame)); } - match buf.get_u8() { - // simple string, simple error - b'+' | b'-' => { - let mut reader = buf.reader(); - - let mut buf_str = String::new(); - - let _ = reader.read_line(&mut buf_str).unwrap(); + match cursor.get_u8() { + b'+' => { + // Simple string + let mut buf = String::new(); + let _ = cursor.read_line(&mut buf).unwrap(); + + if buf.ends_with("\r\n") { + Ok(Frame::SimpleString( + buf.trim_end_matches("\r\n").to_string(), + )) + } else { + // fixme: there maybe edge cases here + // we need to guarantee there's no more \r\n in the buffer + Err(wrap_error(RedisError::IncompleteFrame)) + } + } + b'-' => { + // Simple error + let mut buf = String::new(); + let _ = cursor.read_line(&mut buf).unwrap(); - if buf_str.ends_with("\r\n") { - Ok(()) + if buf.ends_with("\r\n") { + Ok(Frame::SimpleError(buf.trim_end_matches("\r\n").to_string())) } else { // fixme: there maybe edge cases here + // we need to guarantee there's no more \r\n in the buffer + Err(wrap_error(RedisError::IncompleteFrame)) + } + } + b':' => { + // Integer + let mut buf = String::new(); + let _ = cursor.read_line(&mut buf).unwrap(); + + // todo: check whether it is a valid integer + if buf.ends_with("\r\n") { + Ok(Frame::Integer( + buf.trim_end_matches("\r\n").parse::().unwrap(), + )) + } else { Err(wrap_error(RedisError::IncompleteFrame)) } } - // bulk string b'$' => { - let mut reader = buf.reader(); - - let mut buf_str1 = String::new(); - let mut buf_str2 = String::new(); + // Bulk string + let mut buf = String::new(); + // read the length of the bulk string + let _ = cursor.read_line(&mut buf).unwrap(); - let _ = reader.read_line(&mut buf_str1).unwrap(); - let _ = reader.read_line(&mut buf_str2).unwrap(); - - // both lines should end with CRLF - // an example RESP encodes bulk string: - // $\r\n\r\n - if !buf_str1.ends_with("\r\n") || !buf_str2.ends_with("\r\n") { + if !buf.ends_with("\r\n") { return Err(wrap_error(RedisError::IncompleteFrame)); } - let length = buf_str1.trim_end_matches("\r\n").parse::().unwrap(); + let len = buf.trim_end_matches("\r\n").parse::().unwrap(); + + buf.clear(); + let _ = cursor.read_line(&mut buf).unwrap(); - if length == buf_str2.len() - 2 { - Ok(()) + // -2 because \r\n + if len == buf.len() - 2 { + Ok(Frame::BulkString(Bytes::from( + buf.trim_end_matches("\r\n").to_string(), + ))) } else { Err(wrap_error(RedisError::InvalidFrame)) } } - _ => Err(wrap_error(RedisError::InvalidFrame)), + b'*' => { + // Array + let mut buf = String::new(); + let _ = cursor.read_line(&mut buf).unwrap(); + + let len = buf.trim_end_matches("\r\n").parse::().unwrap(); + + let mut frame_vec: Vec<_> = Vec::with_capacity(len); + + for _ in 0..len { + frame_vec.push(Frame::try_parse(cursor)?); + } + + Ok(Frame::Array(frame_vec)) + } + b'_' => Ok(Frame::Null), + _ => unimplemented!(), } } } @@ -209,6 +272,7 @@ impl Frame { mod tests { use super::*; + /// Tests the serialization of a simple string frame. #[tokio::test] async fn test_serialize_simple_string() { let frame = Frame::SimpleString("OK".to_string()); @@ -217,6 +281,7 @@ mod tests { assert_eq!(bytes, Bytes::from_static(b"+OK\r\n")); } + /// Tests the serialization of a simple error frame. #[tokio::test] async fn test_serialize_simple_error() { let frame = Frame::SimpleError("ERR".to_string()); @@ -225,6 +290,85 @@ mod tests { assert_eq!(bytes, Bytes::from_static(b"-ERR\r\n")); } + /// Tests the serialization of an integer frame. + #[tokio::test] + async fn test_serialize_integer() { + // positive integer + let frame = Frame::Integer(123_i64); + let bytes = frame.serialize().await.unwrap(); + + assert_eq!(bytes, Bytes::from_static(b":123\r\n")); + + // negative integer + let frame = Frame::Integer(-123_i64); + let bytes = frame.serialize().await.unwrap(); + + assert_eq!(bytes, Bytes::from_static(b":-123\r\n")); + } + + /// Tests the serialization of a bulk string frame. + #[tokio::test] + async fn test_serialize_bulk_string() { + let frame = Frame::BulkString(Bytes::from_static(b"Hello Redis")); + let bytes = frame.serialize().await.unwrap(); + + assert_eq!(bytes, Bytes::from_static(b"$11\r\nHello Redis\r\n")); + + // empty bulk string + let frame = Frame::BulkString(Bytes::from_static(b"")); + let bytes = frame.serialize().await.unwrap(); + + assert_eq!(bytes, Bytes::from_static(b"$0\r\n\r\n")); + } + + /// Tests the serailization of an array frame. + #[tokio::test] + async fn test_serialize_array() { + let mut frame = Frame::array(); + frame.push_frame_to_array(Frame::BulkString(Bytes::from_static(b"Hello"))); + frame.push_frame_to_array(Frame::BulkString(Bytes::from_static(b"Redis"))); + + let bytes = frame.serialize().await.unwrap(); + + assert_eq!( + bytes, + Bytes::from_static(b"*2\r\n$5\r\nHello\r\n$5\r\nRedis\r\n") + ); + + // empty array + let frame = Frame::array(); + let bytes = frame.serialize().await.unwrap(); + + assert_eq!(bytes, Bytes::from_static(b"*0\r\n")); + + // nested array + let mut frame: Frame = Frame::array(); + let mut nested_frame = Frame::array(); + nested_frame.push_frame_to_array(Frame::BulkString(Bytes::from_static(b"Hello"))); + nested_frame.push_frame_to_array(Frame::BulkString(Bytes::from_static(b"Redis"))); + + if let Frame::Array(vec) = &mut frame { + vec.push(nested_frame); + } + + let bytes = frame.serialize().await.unwrap(); + + assert_eq!( + bytes, + Bytes::from_static(b"*1\r\n*2\r\n$5\r\nHello\r\n$5\r\nRedis\r\n") + ); + } + + /// Tests the serialization of a null frame. + #[tokio::test] + async fn test_serialize_null() { + let frame = Frame::Null; + let bytes = frame.serialize().await.unwrap(); + + assert_eq!(bytes, Bytes::from_static(b"_\r\n")); + } + + /// Tests the deserialization of a simple string frame. #[tokio::test] async fn test_deserialize_simple_string() { let bytes = Bytes::from_static(b"+OK\r\n"); @@ -234,6 +378,7 @@ mod tests { assert_eq!(frame, Frame::SimpleString("OK".to_string())); } + /// Tests the deserialization of a simple error frame. #[tokio::test] async fn test_deserialize_simple_error() { let bytes = Bytes::from_static(b"-ERR\r\n"); @@ -243,43 +388,82 @@ mod tests { assert_eq!(frame, Frame::SimpleError("ERR".to_string())); } + /// Tests the deserialization of an integer frame. #[tokio::test] - async fn test_check_empty_buffer() { - use std::io::Cursor; - // a mutable buffer with the same underlying data to be shared across tests - let buf = BytesMut::new(); + async fn test_deserialize_integer() { + // positive integer + let bytes = Bytes::from_static(b":123\r\n"); + + let frame = Frame::deserialize(bytes).await.unwrap(); - let mut buf_cursor = Cursor::new(&buf[..]); + assert_eq!(frame, Frame::Integer(123_i64)); + + // negative integer + let bytes = Bytes::from_static(b":-123\r\n"); + + let frame = Frame::deserialize(bytes).await.unwrap(); - // empty buffer sould result in an error - assert!(Frame::check(&mut buf_cursor).await.is_err()); + assert_eq!(frame, Frame::Integer(-123_i64)); } + /// Tests the deserialization of a bulk string frame. #[tokio::test] - async fn test_check_incomplete_frame() { - use std::io::Cursor; - // a mutable buffer with the same underlying data to be shared across tests - let mut buf = BytesMut::new(); + async fn test_deserialize_bulk_string() { + let bytes = Bytes::from_static(b"$11\r\nHello Redis\r\n"); - buf.extend_from_slice(b"+OK"); + let frame = Frame::deserialize(bytes).await.unwrap(); + + assert_eq!(frame, Frame::BulkString(Bytes::from_static(b"Hello Redis"))); - let mut buf_cursor = Cursor::new(&buf[..]); + let bytes = Bytes::from_static(b"$0\r\n\r\n"); + + let frame = Frame::deserialize(bytes).await.unwrap(); - // an incomplete frame should result in an error - assert!(Frame::check(&mut buf_cursor).await.is_err()); + assert_eq!(frame, Frame::BulkString(Bytes::from_static(b""))); } + /// Tests deseaialization of an array frame. #[tokio::test] - async fn test_check_complete_frame() { - use std::io::Cursor; - // a mutable buffer with the same underlying data to be shared across tests - let mut buf = BytesMut::new(); + async fn test_deserialize_array() { + let bytes = Bytes::from_static(b"*2\r\n$5\r\nHello\r\n$5\r\nRedis\r\n"); + + let frame = Frame::deserialize(bytes).await.unwrap(); + + let mut expected_frame = Frame::array(); + expected_frame.push_frame_to_array(Frame::BulkString(Bytes::from_static(b"Hello"))); + expected_frame.push_frame_to_array(Frame::BulkString(Bytes::from_static(b"Redis"))); - buf.extend_from_slice(b"+OK\r\n"); + assert_eq!(frame, expected_frame); - let mut buf_cursor = Cursor::new(&buf[..]); + // empty array + let bytes = Bytes::from_static(b"*0\r\n"); + + let frame = Frame::deserialize(bytes).await.unwrap(); + + assert_eq!(frame, Frame::array()); + + // nested array + let bytes = Bytes::from_static(b"*1\r\n*2\r\n$5\r\nHello\r\n$5\r\nRedis\r\n"); + + let frame = Frame::deserialize(bytes).await.unwrap(); + + let mut expected_frame = Frame::array(); + let mut nested_frame = Frame::array(); + nested_frame.push_frame_to_array(Frame::BulkString(Bytes::from_static(b"Hello"))); + nested_frame.push_frame_to_array(Frame::BulkString(Bytes::from_static(b"Redis"))); + + expected_frame.push_frame_to_array(nested_frame); + + assert_eq!(frame, expected_frame); + } + + /// Tests the deserialization of a null frame. + #[tokio::test] + async fn test_deserialize_null() { + let bytes = Bytes::from_static(b"_\r\n"); + + let frame = Frame::deserialize(bytes).await.unwrap(); - // an incomplete frame should result in an error - assert!(Frame::check(&mut buf_cursor).await.is_ok()); + assert_eq!(frame, Frame::Null); } } diff --git a/src/lib.rs b/src/lib.rs index 88fcab7..a2fad21 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,14 @@ //! An asynchronous Redis client library for Rust. //! -//! # Basic Usage +//! # Basic usage +//! +//! ## Example +//! +//! ```ignore +//! use async_redis::Client; +//! ``` +//! +//! # TLS/SSL //! //! # Connection pooling //! @@ -11,11 +19,11 @@ //! //! # Pipelining //! -//! # Transactions +//! # Transaction //! //! # Pub/Sub //! -//! # RESP3 support +//! # RESP3 //! //! This library supports the Redis Serialization Protocol (RESP) version 3 //! introduced in Redis 6.0.