Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bin/cli.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! A Redis CLI application to interact with the Redis server.
//! A Redis CLI application.

use tokio::sync::mpsc;

Expand Down
15 changes: 5 additions & 10 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,12 @@ impl Client {

self.conn.write_frame(&frame).await?;

// todo: read response from the server and return to the client
match self.read_response().await? {
Some(data) => {
let resp = String::from_utf8(data.to_vec()).unwrap();
Ok(resp)
}
None => Err(wrap_error(RedisError::Other("Unknown error".to_string()))),
None => Err(wrap_error(RedisError::Other("Unknown error".into()))),
}
}

Expand All @@ -93,14 +92,10 @@ impl Client {
async fn read_response(&mut self) -> Result<Option<Bytes>> {
match self.conn.read_frame().await? {
Some(Frame::SimpleString(data)) => Ok(Some(Bytes::from(data))),
Some(Frame::SimpleError(data)) => Err(wrap_error(RedisError::Other(data))),
Some(Frame::BulkString(data)) => Ok(Some(Bytes::from(data))),
Some(_) => Err(wrap_error(RedisError::Other(
"Unknown frame type: not implemented".to_string(),
))),
None => Err(wrap_error(RedisError::Other(
"Error reading frame".to_string(),
))),
Some(Frame::SimpleError(data)) => Err(wrap_error(RedisError::Other(data.into()))),
Some(Frame::BulkString(data)) => Ok(Some(data)),
Some(_) => unimplemented!(),
None => Err(wrap_error(RedisError::Other("Unknown error".into()))),
}
}
}
17 changes: 14 additions & 3 deletions src/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Redis commands.

use bytes::Bytes;

use crate::Frame;

/// A trait for all Redis commands.
Expand All @@ -8,11 +10,13 @@ pub trait Command {
fn into_stream(self) -> Frame;
}

/// A Redis PING command.
///
/// Useful for testing whether a connection is still alive, or to measure latency.
pub struct Ping {
msg: Option<String>,
}

/// Implements the Redis Ping command.
impl Ping {
/// Creates a new Ping command.
///
Expand All @@ -23,6 +27,12 @@ impl Ping {
/// # Returns
///
/// A new Ping command
///
/// # Examples
///
/// ```ignore
/// let ping = Ping::new(Some("hello".into()));
/// ```
pub fn new(msg: Option<String>) -> Self {
Self { msg }
}
Expand All @@ -32,10 +42,11 @@ impl Command for Ping {
/// Converts the ping command into a Frame to be transimitted over the stream.
fn into_stream(self) -> Frame {
let mut frame: Frame = Frame::array();
frame.push_bulk_str("ping".into());
frame.push_frame_to_array(Frame::BulkString("ping".into()));

// do not push the message if it is None
if let Some(msg) = self.msg {
frame.push_bulk_str(msg);
frame.push_frame_to_array(Frame::BulkString(Bytes::from(msg)));
}

frame
Expand Down
34 changes: 16 additions & 18 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::Frame;
use crate::RedisError;
use crate::Result;
use crate::error::wrap_error;
use bytes::Buf;
use bytes::{Bytes, BytesMut};
use std::io::Cursor;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
Expand Down Expand Up @@ -57,7 +58,9 @@ impl Connection {
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err(wrap_error(RedisError::Other("Unknown error".to_string())));
return Err(wrap_error(RedisError::Other(
"Connection reset by peer".into(),
)));
}
}
}
Expand Down Expand Up @@ -97,25 +100,20 @@ impl Connection {
/// None if the Frame is incomplete and more data is needed.
/// An error if the Frame is invalid.
async fn try_parse_frame(&mut self) -> Result<Option<Frame>> {
let mut buf: Cursor<&[u8]> = Cursor::new(&self.buffer[..]);
let mut cursor: Cursor<&[u8]> = Cursor::new(&self.buffer[..]);

match Frame::check(&mut buf).await {
// Ok means we can parse a complete frame
Ok(()) => {
let len = buf.position() as usize;

let bytes = self.buffer.split_to(len).freeze();

// once we have read the frame, we can advance the buffer
println!("try_parse_frame: len={len}, bytes={bytes:?}");

Ok(Some(Frame::deserialize(bytes).await?))
match Frame::try_parse(&mut cursor) {
Ok(frame) => {
self.buffer.advance(cursor.position() as usize);
Ok(Some(frame))
}
Err(err) => {
if let Some(RedisError::IncompleteFrame) = err.downcast_ref::<RedisError>() {
Ok(None)
} else {
Err(err)
}
}
Err(err) => match &*err {
// IncompleteFrame means we need to read more data
RedisError::IncompleteFrame => Ok(None),
_ => Err(err),
},
}
}
}
43 changes: 28 additions & 15 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
//! Custom error handling for Redis client and a specialized Result type
//! used as the return type for Redis operations.
//!
//! todo: implement From trait for RedisError so that we can capture more built in e

use std::{error, fmt, io, result, sync};
use std::{error, fmt, io, result};

/// Represents errors that can occur when working with Redis.
#[derive(Debug)]
pub enum RedisError {
/// An I/O error that occurred while working with a Redis connection.
Io(io::Error),
/// An incomplete frame was received when reading from the socket.
IncompleteFrame,
/// An invalid frame was received when reading from the socket. According to RESP3 spec.
InvalidFrame,
Other(String),
}

impl From<io::Error> for RedisError {
fn from(err: io::Error) -> Self {
RedisError::Io(err)
}
/// Generic error type.
Other(Error),
}

impl fmt::Display for RedisError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
RedisError::Io(err) => write!(f, "IO error: {}", err),
RedisError::IncompleteFrame => write!(f, "incomplete frame"),
RedisError::InvalidFrame => write!(f, "invalid frame"),
RedisError::Other(s) => write!(f, "other error: {}", s),
Expand All @@ -35,12 +29,31 @@ impl fmt::Display for RedisError {
// Implement std::error::Error for RedisError.
impl error::Error for RedisError {}

type Error = sync::Arc<RedisError>;
impl From<io::Error> for RedisError {
fn from(err: io::Error) -> Self {
RedisError::Other(err.into())
}
}

impl From<String> for RedisError {
fn from(val: String) -> Self {
RedisError::Other(val.into())
}
}

/// Helper function to wrap errors into Arc.
pub fn wrap_error<E: Into<RedisError>>(err: E) -> Error {
sync::Arc::new(err.into())
impl From<&str> for RedisError {
fn from(val: &str) -> Self {
RedisError::Other(val.into())
}
}

/// Boxed generic error types.
type Error = Box<dyn std::error::Error + Send + Sync>;

/// A specialized `Result` type for Redis operations.
pub type Result<T> = result::Result<T, Error>;

/// Helper function to wrap errors into Box.
pub fn wrap_error<E: Into<RedisError>>(err: E) -> Error {
Box::new(err.into())
}
Loading