diff --git a/.travis.yml b/.travis.yml index f5c76c5..dac7905 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,62 +1,45 @@ - services: docker - git: submodules: false - sudo: required - language: rust rust: - - stable - - beta - - nightly - +- stable +- beta +- nightly env: global: - - CRATE_NAME=rediSQL + - CRATE_NAME=rediSQL +script: + - cargo build --verbose --all + - cargo test --verbose --all + - cargo build --verbose --all --release matrix: include: - - # Linux - - env: TARGET=aarch64-unknown-linux-gnu - - env: TARGET=arm-unknown-linux-gnueabi - - env: TARGET=armv7-unknown-linux-gnueabihf - - env: TARGET=i686-unknown-linux-gnu - - env: TARGET=i686-unknown-linux-musl - - env: TARGET=mips-unknown-linux-gnu - - env: TARGET=mips64-unknown-linux-gnuabi64 - - env: TARGET=mips64el-unknown-linux-gnuabi64 - - env: TARGET=mipsel-unknown-linux-gnu - - env: TARGET=powerpc-unknown-linux-gnu - - env: TARGET=powerpc64-unknown-linux-gnu - - env: TARGET=powerpc64le-unknown-linux-gnu - - env: TARGET=s390x-unknown-linux-gnu DISABLE_TESTS=1 - - env: TARGET=x86_64-unknown-linux-gnu - dist: trusty - - env: TARGET=x86_64-unknown-linux-musl - - # OSX - - env: TARGET=i686-apple-darwin - os: osx - - env: TARGET=x86_64-apple-darwin - os: osx - - # *BSD - - env: TARGET=i686-unknown-freebsd DISABLE_TESTS=1 - - env: TARGET=x86_64-unknown-freebsd DISABLE_TESTS=1 - - env: TARGET=x86_64-unknown-netbsd DISABLE_TESTS=1 - - # Windows - - env: TARGET=x86_64-pc-windows-gnu - - # Testing other channels - - env: TARGET=x86_64-unknown-linux-gnu - rust: nightly + - env: TARGET=arm-unknown-linux-gnueabi + - env: TARGET=armv7-unknown-linux-gnueabihf + - env: TARGET=x86_64-unknown-linux-gnu cache: cargo - notifications: email: on_success: never +before_deploy: + - ls $TRAVIS_BUILD_DIR/target/release + - echo "${TRAVIS_BUILD_DIR}/target/release/RediSQL_${TRAVIS_TAG}_${TRAVIS_COMMIT:0:6}_${TARGET}_(release|debug).so" + - cp --verbose $TRAVIS_BUILD_DIR/target/release/libredis_sql.so "${TRAVIS_BUILD_DIR}/target/release/RediSQL_${TRAVIS_TAG}_${TRAVIS_COMMIT:0:6}_${TARGET}_release.so" + - cp --verbose $TRAVIS_BUILD_DIR/target/debug/libredis_sql.so "${TRAVIS_BUILD_DIR}/target/release/RediSQL_${TRAVIS_TAG}_${TRAVIS_COMMIT:0:6}_${TARGET}_debug.so" + - ls $TRAVIS_BUILD_DIR/target/release + +deploy: + skip_cleanup: true + provider: releases + api_key: + secure: IFiEcn4ln1fJEKfEUfXtGlq3dV4cl/1r36vMtGnUoOqoSR7yioCsWHszkIM0+KnvD08fC7hMiZSY7K2B5QKbirnbNZ7ha4hEESe+YTQlGETJsrN6EcVTSRJ/jMyY7LE3kHo565R2do5XsFhTz5KymJ562woaAGtYP+qmqj1DBFsDjoxLNzON23o5nnREjKyjDBu4Ie6PNuVm/pMUgjt0Erwhklac2tKvAJUUeT4PTEZ1FEJi3ubYoz0XLbR9wp8Ih3I9YEIOXpaLmoBmo7u8g/wCStSbZSvorssZfIP9uI9Q+VmQj7agMeI4nrm7uOas8uPf/UFFsY41dqKoWhO06XXLAot/PUw3saSpdpIobAtyX9x1SUb2BPs1hPwcoU6Pz/yBntr5xx0yLqdppcWof28t9S4dUeFstAtrkQB0iIjPa7J0JywWVcBQ1iPY1e87QeZKN9GJSf1Gk0B4AhR4ORLskvS8u29w2ZA1ca8PWJ9fokH3JYoYJGE8GtXqMFCW3ENTu953uX2B9f2JntqqYtFX8bk+YxpO/uBY+aAFS59ApCWhQDshx1zMzFLwqSxIpz5Qkgddso50L8Az39Kx1Due0V0FnO4G72q6rzSQayrAhxLdDF3z5xcvbjbsYrSAue5Ly5Y814D5ywKJMsjlGzz4JTNJjSvfChWihHwW61w= + file_glob: true + file: $TRAVIS_BUILD_DIR/target/release/RediSQL_*.so + on: + repo: RedBeardLab/rediSQL + branch: timeout_honor + tags: true diff --git a/Cargo.lock b/Cargo.lock index 063f0ca..3794e60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1013,19 +1013,19 @@ dependencies = [ [[package]] name = "rediSQL" -version = "1.0.3-rc02" +version = "1.1.0" dependencies = [ "env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "redisql_lib 0.5.5-rc02", - "sync_engine 0.5.1-rc01", + "redisql_lib 0.6.0", + "sync_engine 0.6.0", "telemetrics 0.1.0", "uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "redisql_lib" -version = "0.5.5-rc02" +version = "0.6.0" dependencies = [ "bindgen 0.47.3 (registry+https://github.com/rust-lang/crates.io-index)", "cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1254,11 +1254,11 @@ dependencies = [ [[package]] name = "sync_engine" -version = "0.5.1-rc01" +version = "0.6.0" dependencies = [ "cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "redisql_lib 0.5.5-rc02", + "redisql_lib 0.6.0", ] [[package]] @@ -1277,7 +1277,7 @@ name = "telemetrics" version = "0.1.0" dependencies = [ "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "redisql_lib 0.5.5-rc02", + "redisql_lib 0.6.0", "reqwest 0.9.16 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/Cargo.toml b/Cargo.toml index 071b5ad..6f5a695 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rediSQL" -version = "1.0.3-rc02" +version = "1.1.0" authors = ["Simone Mosciatti "] edition = "2018" diff --git a/redisql_lib/Cargo.toml b/redisql_lib/Cargo.toml index 5a6950f..e28a209 100644 --- a/redisql_lib/Cargo.toml +++ b/redisql_lib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "redisql_lib" -version = "0.5.5-rc02" +version = "0.6.0" authors = ["Simone Mosciatti "] edition = "2018" diff --git a/redisql_lib/src/community_statement.rs b/redisql_lib/src/community_statement.rs index 27c2e26..a314139 100644 --- a/redisql_lib/src/community_statement.rs +++ b/redisql_lib/src/community_statement.rs @@ -16,6 +16,7 @@ enum Parameters { Named { index: i32 }, } +#[derive(Clone)] pub struct MultiStatement { stmts: Vec, db: Arc>, @@ -38,12 +39,17 @@ impl<'a> fmt::Display for MultiStatement { } } +#[derive(Clone)] pub struct Statement { + stmt: Arc, +} + +struct InternalStatement { stmt: ptr::NonNull, } -unsafe impl Send for Statement {} -unsafe impl Sync for Statement {} +unsafe impl Send for InternalStatement {} +unsafe impl Sync for InternalStatement {} impl<'a> fmt::Display for Statement { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -56,10 +62,10 @@ impl<'a> fmt::Display for Statement { } } -impl<'a> Drop for Statement { +impl<'a> Drop for InternalStatement { fn drop(&mut self) { unsafe { - ffi::sqlite3_finalize(self.as_ptr()); + ffi::sqlite3_finalize(self.stmt.as_ptr()); }; } } @@ -120,7 +126,9 @@ pub fn generate_statements( impl Statement { fn from_ptr(stmt: *mut ffi::sqlite3_stmt) -> Self { Statement { - stmt: ptr::NonNull::new(stmt).unwrap(), + stmt: Arc::new(InternalStatement { + stmt: ptr::NonNull::new(stmt).unwrap(), + }), } } fn execute( @@ -139,7 +147,7 @@ impl Statement { ffi::sqlite3_column_count(self.as_ptr()) } as i32; Ok(Cursor::RowsCursor { - stmt: self, + stmt: self.clone(), num_columns, previous_status: ffi::SQLITE_ROW, modified_rows: 0, @@ -154,7 +162,7 @@ impl Statement { rc.get_last_error() } pub fn as_ptr(&self) -> *mut ffi::sqlite3_stmt { - self.stmt.as_ptr() + self.stmt.stmt.as_ptr() } } @@ -403,6 +411,7 @@ fn count_parameters( } } } + fn get_parameter_name( stmt: &Statement, index: i32, diff --git a/redisql_lib/src/redis.rs b/redisql_lib/src/redis.rs index e142823..d08ed55 100644 --- a/redisql_lib/src/redis.rs +++ b/redisql_lib/src/redis.rs @@ -3,9 +3,8 @@ use std; use std::cell::RefCell; use std::clone::Clone; use std::collections::hash_map::Entry; -use std::error; +use std::convert::TryFrom; use std::ffi::{CStr, CString}; -use std::fmt; use std::fs::File; use std::io::{BufReader, Read, Write}; use std::os::raw::{c_char, c_long}; @@ -22,7 +21,10 @@ use crate::redis_type::{ use crate::redisql_error as err; use crate::redisql_error::RediSQLError; -use crate::sqlite::StatementTrait; +use crate::sqlite::{ + Cursor, Entity, QueryResult, RawConnection, SQLite3Error, + StatementTrait, +}; use crate::community_statement::MultiStatement; @@ -33,11 +35,11 @@ use crate::statistics::STATISTICS; #[derive(Clone)] pub struct ReplicationBook { data: Arc>>, - db: Arc>, + db: Arc>, } pub trait StatementCache<'a> { - fn new(db: &Arc>) -> Self; + fn new(db: &Arc>) -> Self; fn is_statement_present(&self, identifier: &str) -> bool; fn insert_new_statement( &mut self, @@ -57,16 +59,16 @@ pub trait StatementCache<'a> { &self, identifier: &str, args: &[&str], - ) -> Result; + ) -> Result; fn query_statement( &self, identifier: &str, args: &[&str], - ) -> Result; + ) -> Result; } impl<'a> StatementCache<'a> for ReplicationBook { - fn new(db: &Arc>) -> Self { + fn new(db: &Arc>) -> Self { ReplicationBook { data: Arc::new(RwLock::new(FnvHashMap::default())), db: Arc::clone(db), @@ -153,7 +155,7 @@ impl<'a> StatementCache<'a> for ReplicationBook { &self, identifier: &str, args: &[&str], - ) -> Result { + ) -> Result { let map = self.data.read().unwrap(); match map.get(identifier) { None => { @@ -167,7 +169,7 @@ impl<'a> StatementCache<'a> for ReplicationBook { stmt.reset(); let stmt = bind_statement(stmt, args)?; let cursor = stmt.execute()?; - Ok(QueryResult::from(cursor)) + Ok(cursor) } Some(&(_, false)) => { let debug = String::from("Not read only statement"); @@ -181,7 +183,7 @@ impl<'a> StatementCache<'a> for ReplicationBook { &self, identifier: &str, args: &[&str], - ) -> Result { + ) -> Result { let map = self.data.read().unwrap(); match map.get(identifier) { None => { @@ -195,8 +197,7 @@ impl<'a> StatementCache<'a> for ReplicationBook { stmt.reset(); let stmt = bind_statement(stmt, args)?; let cursor = stmt.execute()?; - Ok(QueryResult::from(cursor)) - //Ok(cursor_to_query_result(cursor)) + Ok(cursor) } } } @@ -239,7 +240,7 @@ impl<'a> Drop for RedisContextSet<'a> { #[derive(Clone)] pub struct Loop { - db: Arc>, + db: Arc>, replication_book: ReplicationBook, redis_context: ProtectedRedisContext, } @@ -254,7 +255,7 @@ unsafe impl Send for Loop {} pub trait LoopData { fn get_replication_book(&self) -> ReplicationBook; - fn get_db(&self) -> Arc>; + fn get_db(&self) -> Arc>; fn set_rc(&self, ctx: Context) -> RedisContextSet; fn with_contex_set(&self, ctx: Context, f: F) where @@ -265,7 +266,7 @@ impl LoopData for Loop { fn get_replication_book(&self) -> ReplicationBook { self.replication_book.clone() } - fn get_db(&self) -> Arc> { + fn get_db(&self) -> Arc> { Arc::clone(&self.db) } fn set_rc(&self, ctx: Context) -> RedisContextSet { @@ -293,7 +294,7 @@ impl LoopData for Loop { impl Loop { fn new_from_arc( - db: Arc>, + db: Arc>, redis_context: Arc>>>, ) -> Loop { let replication_book = ReplicationBook::new(&db); @@ -307,27 +308,27 @@ impl Loop { } pub trait RedisReply { - fn reply(&self, ctx: &rm::Context) -> i32; + fn reply(&mut self, ctx: &rm::Context) -> i32; } -impl RedisReply for sql::Entity { - fn reply(&self, ctx: &rm::Context) -> i32 { +impl RedisReply for Entity { + fn reply(&mut self, ctx: &rm::Context) -> i32 { match *self { - sql::Entity::Integer { int } => { + Entity::Integer { int } => { rm::ReplyWithLongLong(ctx, int) } - sql::Entity::Float { float } => { + Entity::Float { float } => { rm::ReplyWithDouble(ctx, float) } - sql::Entity::Text { ref text } => { + Entity::Text { ref text } => { rm::ReplyWithStringBuffer(ctx, text.as_bytes()) } - sql::Entity::Blob { ref blob } => { + Entity::Blob { ref blob } => { rm::ReplyWithStringBuffer(ctx, blob.as_bytes()) } - sql::Entity::Null => rm::ReplyWithNull(ctx), - sql::Entity::OK { .. } => QueryResult::OK {}.reply(ctx), - sql::Entity::DONE { modified_rows, .. } => { + Entity::Null => rm::ReplyWithNull(ctx), + Entity::OK { .. } => QueryResult::OK {}.reply(ctx), + Entity::DONE { modified_rows, .. } => { QueryResult::DONE { modified_rows }.reply(ctx) } } @@ -367,37 +368,51 @@ fn reply_with_done( rm::ffi::REDISMODULE_OK } -fn reply_with_array(ctx: &rm::Context, array: Vec) -> i32 { - let len = array.len() as c_long; +fn reply_with_array( + ctx: &rm::Context, + mut array: impl RowFiller, +) -> i32 { unsafe { rm::ffi::RedisModule_ReplyWithArray.unwrap()( ctx.as_ptr(), - len, + rm::ffi::REDISMODULE_POSTPONED_ARRAY_LEN.into(), ); } - for row in array { + let mut row = Vec::new(); + let mut i = 0; + while array.fill_row(&mut row) != None { + i += 1; unsafe { rm::ffi::RedisModule_ReplyWithArray.unwrap()( ctx.as_ptr(), row.len() as c_long, ); } - for entity in row { + + for entity in row.iter_mut() { entity.reply(&ctx); } + + row.clear(); + } + unsafe { + rm::ffi::RedisModule_ReplySetArrayLength.unwrap()( + ctx.as_ptr(), + i, + ); } rm::ffi::REDISMODULE_OK } -impl RedisReply for sql::SQLite3Error { - fn reply(&self, ctx: &Context) -> i32 { +impl RedisReply for SQLite3Error { + fn reply(&mut self, ctx: &Context) -> i32 { let error = format!("{}", self); reply_with_error(ctx.as_ptr(), error) } } impl RedisReply for RediSQLError { - fn reply(&self, ctx: &Context) -> i32 { + fn reply(&mut self, ctx: &Context) -> i32 { let error = format!("{}", self); reply_with_error(ctx.as_ptr(), error) } @@ -437,21 +452,20 @@ fn parse_args( Vec::with_capacity(argc as usize); for i in 0..argc { let redis_str = unsafe { *argv.offset(i as isize) }; - let arg = string_ptr_len(redis_str)?; + let arg = unsafe { string_ptr_len(redis_str)? }; args.push(arg); } Ok(args) } -pub fn string_ptr_len( +unsafe fn string_ptr_len( str: *mut rm::ffi::RedisModuleString, ) -> Result<&'static str, std::str::Utf8Error> { let mut len = 0; - let base = unsafe { + let base = rm::ffi::RedisModule_StringPtrLen.unwrap()(str, &mut len) - as *mut u8 - }; - let slice = unsafe { slice::from_raw_parts(base, len) }; + as *mut u8; + let slice = slice::from_raw_parts(base, len); let s = str::from_utf8(slice)?; Ok(s.trim_end_matches(char::from(0))) } @@ -478,10 +492,12 @@ pub enum Command { Stop, Exec { query: &'static str, + timeout: std::time::Instant, client: BlockedClient, }, Query { query: &'static str, + timeout: std::time::Instant, return_method: ReturnMethod, client: BlockedClient, }, @@ -493,6 +509,7 @@ pub enum Command { ExecStatement { identifier: &'static str, arguments: Vec<&'static str>, + timeout: std::time::Instant, client: BlockedClient, }, UpdateStatement { @@ -507,6 +524,7 @@ pub enum Command { QueryStatement { identifier: &'static str, arguments: Vec<&'static str>, + timeout: std::time::Instant, return_method: ReturnMethod, client: BlockedClient, }, @@ -516,53 +534,256 @@ pub enum Command { }, } -pub enum QueryResult { - OK {}, - DONE { - modified_rows: i32, - }, - Array { - names: Vec, - array: Vec, - }, +struct SQLiteResultIterator<'s> { + num_columns: i32, + previous_status: i32, + stmt: &'s crate::community_statement::Statement, +} + +impl<'s> SQLiteResultIterator<'s> { + fn from_stmt( + stmt: &'s crate::community_statement::Statement, + ) -> Self { + let num_columns = + unsafe { sql::ffi::sqlite3_column_count(stmt.as_ptr()) }; + let previous_status = sql::ffi::SQLITE_ROW; + Self { + num_columns, + previous_status, + stmt, + } + } + fn get_next_row( + &mut self, + row: &mut Vec, + ) -> Option { + row.clear(); + if self.previous_status != sql::ffi::SQLITE_ROW { + return None; + } + for i in 0..self.num_columns { + let entity_value = Entity::new(self.stmt, i); + row.push(entity_value); + } + unsafe { + self.previous_status = + sql::ffi::sqlite3_step(self.stmt.as_ptr()); + }; + Some(self.num_columns as usize) + } +} + +pub trait RowFiller { + fn fill_row(&mut self, row: &mut Vec) -> Option; +} + +impl<'s> RowFiller for SQLiteResultIterator<'s> { + fn fill_row(&mut self, row: &mut Vec) -> Option { + row.clear(); + self.get_next_row(row) + } +} + +impl<'r> RowFiller for std::slice::Chunks<'_, Entity> { + fn fill_row(&mut self, row: &mut Vec) -> Option { + row.clear(); + let r = self.next(); + match r { + None => None, + Some(r) => { + for e in r.iter() { + row.push(e.clone()); + } + Some(r.len()) + } + } + } +} + +impl<'s> Iterator for SQLiteResultIterator<'s> { + type Item = Vec; + fn next(&mut self) -> Option { + if self.previous_status != sql::ffi::SQLITE_ROW { + return None; + } + let mut row = Vec::with_capacity(self.num_columns as usize); + for i in 0..self.num_columns { + let entity_value = Entity::new(self.stmt, i); + row.push(entity_value); + } + unsafe { + self.previous_status = + sql::ffi::sqlite3_step(self.stmt.as_ptr()); + }; + + Some(row) + } } -impl QueryResult { - pub fn reply(self, ctx: &rm::Context) -> i32 { - debug!("Start replying!"); +pub trait Returner { + fn create_data_to_return( + self, + ctx: &Context, + return_method: &ReturnMethod, + timeout: std::time::Instant, + ) -> Box>; +} + +impl Returner for QueryResult { + fn create_data_to_return( + self, + ctx: &Context, + return_method: &ReturnMethod, + timeout: std::time::Instant, + ) -> Box> { + match return_method { + ReturnMethod::Stream { name: stream_name } => { + match self { + QueryResult::Array { + array, + names: columns_names, + } => { + match stream_query_result_array( + ctx, + stream_name, + &columns_names, + array.chunks(columns_names.len()), + timeout, + ) { + Ok(res) => Box::new(Box::new(res)), + Err(e) => Box::new(Box::new(e)), + } + } + _ => Box::new(Box::new(self)), + } + } + _ => Box::new(Box::new(self)), + } + } +} + +impl Returner for RediSQLError { + fn create_data_to_return( + self, + _ctx: &Context, + _return_method: &ReturnMethod, + _timeout: std::time::Instant, + ) -> Box> { + Box::new(Box::new(self)) + } +} + +impl<'s> Returner for Cursor { + fn create_data_to_return( + self, + ctx: &Context, + return_method: &ReturnMethod, + timeout: std::time::Instant, + ) -> Box> { + match self { + Cursor::RowsCursor { + ref stmt, + num_columns, + .. + } => match return_method { + ReturnMethod::Stream { name: stream_name } => { + let mut names = + Vec::with_capacity(num_columns as usize); + for i in 0..num_columns { + let name = unsafe { + CStr::from_ptr( + sql::ffi::sqlite3_column_name( + stmt.as_ptr(), + i, + ), + ) + .to_string_lossy() + .into_owned() + }; + names.push(name); + } + + match stream_query_result_array( + ctx, + stream_name, + &names, + SQLiteResultIterator::from_stmt(stmt), + timeout, + ) { + Ok(res) => Box::new(Box::new(res)), + Err(e) => Box::new(Box::new(e)), + } + } + ReturnMethod::Reply => { + let query_result = + QueryResult::from_cursor_before( + self, timeout, + ); + Box::new(Box::new(query_result)) + } + }, + _ => Box::new(Box::new(self)), + } + } +} + +impl RedisReply for Result { + fn reply(&mut self, ctx: &Context) -> i32 { + match self { + Ok(ok) => ok.reply(ctx), + Err(e) => e.reply(ctx), + } + } +} + +impl<'s> RedisReply for Cursor { + fn reply(&mut self, ctx: &Context) -> i32 { + match self { + Cursor::OKCursor {} => reply_with_ok(ctx.as_ptr()), + Cursor::DONECursor { modified_rows } => { + reply_with_done(ctx.as_ptr(), *modified_rows) + } + Cursor::RowsCursor { stmt, .. } => reply_with_array( + ctx, + SQLiteResultIterator::from_stmt(stmt), + ), + } + } +} + +impl RedisReply for QueryResult { + fn reply(&mut self, ctx: &rm::Context) -> i32 { match self { QueryResult::OK { .. } => reply_with_ok(ctx.as_ptr()), QueryResult::DONE { modified_rows, .. } => { - debug!("QueryResult::DONE"); - reply_with_done(ctx.as_ptr(), modified_rows) + reply_with_done(ctx.as_ptr(), *modified_rows) } - QueryResult::Array { array, .. } => { + QueryResult::Array { array, names } => { debug!("QueryResult::Array"); - reply_with_array(ctx, array) + reply_with_array(ctx, array.chunks(names.len())) } } } } pub fn do_execute( - db: &Arc>, + db: &Arc>, query: &str, -) -> Result { +) -> Result { let stmt = MultiStatement::new(db.clone(), query)?; debug!("do_execute | created statement"); let cursor = stmt.execute()?; debug!("do_execute | statement executed"); - Ok(QueryResult::from(cursor)) + Ok(cursor) } pub fn do_query( - db: &Arc>, + db: &Arc>, query: &str, -) -> Result { +) -> Result { let stmt = MultiStatement::new(db.clone(), query)?; if stmt.is_read_only() { - let cursor = stmt.execute()?; - Ok(QueryResult::from(cursor)) + Ok(stmt.execute()?) } else { let debug = String::from("Not read only statement"); let description = String::from("Statement is not read only but it may modify the database, use `EXEC_STATEMENT` instead.",); @@ -573,9 +794,9 @@ pub fn do_query( /// implements the copy of the source database into the destination one /// it also leak the two DBKeys pub fn do_copy( - source_db: &Arc>, + source_db: &Arc>, destination_loopdata: &L, -) -> Result { +) -> Result { debug!("DoCopy | Start"); let destination_path = { @@ -609,35 +830,13 @@ pub fn do_copy( fn bind_statement<'a>( stmt: &'a MultiStatement, arguments: &[&str], -) -> Result<&'a MultiStatement, sql::SQLite3Error> { +) -> Result<&'a MultiStatement, SQLite3Error> { match stmt.bind_texts(arguments) { Err(e) => Err(e), Ok(_) => Ok(stmt), } } -pub struct RedisError { - pub msg: String, -} - -impl fmt::Display for RedisError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "ERR - {}", self.msg) - } -} - -impl fmt::Debug for RedisError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self) - } -} - -impl error::Error for RedisError { - fn description(&self) -> &str { - self.msg.as_str() - } -} - /// restore_previous_statements wread the statements written in the database and add them to the /// loopdata datastructure. /// At the moment this function returns `()` no matter if there are errors or not. @@ -648,14 +847,14 @@ impl error::Error for RedisError { fn restore_previous_statements<'a, L: 'a + LoopData>(loopdata: &L) { let saved_statements = get_statement_metadata(loopdata.get_db()); match saved_statements { - Ok(QueryResult::Array { array, .. }) => { - for row in array { + Ok(QueryResult::Array { array, names }) => { + for row in array.chunks(names.len()) { let identifier = match row[1] { - sql::Entity::Text { ref text } => text, + Entity::Text { ref text } => text, _ => continue, }; let statement = match row[2] { - sql::Entity::Text { ref text } => text, + Entity::Text { ref text } => text, _ => continue, }; if let Err(e) = compile_and_insert_statement( @@ -672,62 +871,62 @@ fn restore_previous_statements<'a, L: 'a + LoopData>(loopdata: &L) { fn return_value( client: &BlockedClient, - result: Result, + return_method: &ReturnMethod, + result: Result, + timeout: std::time::Instant, ) { + let ctx = Context::thread_safe(client); + let result = match result { + Ok(res) => { + res.create_data_to_return(&ctx, return_method, timeout) + } + Err(e) => { + e.create_data_to_return(&ctx, return_method, timeout) + } + }; unsafe { rm::ffi::RedisModule_UnblockClient.unwrap()( client.client, - Box::into_raw(Box::new(result)) - as *mut std::os::raw::c_void, + Box::into_raw(result) as *mut std::os::raw::c_void, ); } } -fn return_value_v2( - client: &BlockedClient, - return_method: &ReturnMethod, - result: Result, -) { - match return_method { - ReturnMethod::Reply => return_value(client, result), - ReturnMethod::Stream { name: stream_name } => match result { - Err(_) => return_value(client, result), - Ok(QueryResult::OK {}) => return_value(client, result), - Ok(QueryResult::DONE { .. }) => { - return_value(client, result) - } - Ok(QueryResult::Array { - array: rows, - names: columns_names, - }) => { - let context = Context::thread_safe(client); - let result = stream_query_result_array( - &context, - stream_name, - columns_names.as_slice(), - rows, - ); - return_value(client, result) - } - }, - } -} - -pub fn stream_query_result_array( +pub fn stream_query_result_array( context: &Context, stream_name: &str, columns_names: &[String], - array: Vec, -) -> Result { - let row_last_index = array.len() - 1; - + mut array: A, + timeout: std::time::Instant, +) -> Result +where + A: RowFiller, +{ let mut result = Vec::with_capacity(4); - result.push(sql::Entity::Text { + result.push(Entity::Text { text: stream_name.to_string(), }); + let mut i = 0; + let mut first_stream_index = None; + let mut second_stream_index = None; + + let mut now = std::time::Instant::now(); + + if now > timeout { + return Err(err::RediSQLError::timeout()); + } + let mut lock = context.lock(); - for (i, row) in array.iter().enumerate() { + let mut row = Vec::new(); + while array.fill_row(&mut row) != None { + now = std::time::Instant::now(); + + if now > timeout { + context.release(lock); + return Err(err::RediSQLError::timeout()); + } + if i % 256 == 255 { // avoid that a big results lock the context for too long, should help in // keeping the latency low. @@ -735,39 +934,40 @@ pub fn stream_query_result_array( lock = context.lock(); } let mut xadd = rm::XADDCommand::new(&context, stream_name); + for (j, entity) in row.iter().enumerate() { match entity { - sql::Entity::OK {} | sql::Entity::DONE { .. } => { + Entity::OK {} | Entity::DONE { .. } => { // do nothing } - sql::Entity::Null => { + Entity::Null => { xadd.add_element( &format!("null:{}", &columns_names[j]), "(null)", ); } - sql::Entity::Integer { int } => { + Entity::Integer { int } => { xadd.add_element( &format!("int:{}", &columns_names[j]), &int.to_string(), ); } - sql::Entity::Float { float } => { + Entity::Float { float } => { xadd.add_element( &format!("real:{}", &columns_names[j]), &float.to_string(), ); } - sql::Entity::Text { text } => { + Entity::Text { text } => { xadd.add_element( &format!("text:{}", &columns_names[j]), - text, + &text, ); } - sql::Entity::Blob { blob } => { + Entity::Blob { blob } => { xadd.add_element( &format!("blob:{}", &columns_names[j]), - blob, + &blob, ); } } @@ -777,18 +977,17 @@ pub fn stream_query_result_array( match xadd_result { rm::CallReply::RString { .. } => match i { 0 => { - result.push(sql::Entity::Text { + let stream_index = Entity::Text { text: xadd_result.access_string().unwrap(), - }); + }; + first_stream_index = Some(stream_index.clone()); + second_stream_index = Some(stream_index); } - n if n == row_last_index => { - result.push(sql::Entity::Text { + _ => { + second_stream_index = Some(Entity::Text { text: xadd_result.access_string().unwrap(), }); } - _ => { - // do nothing - } }, rm::CallReply::RError { .. } => { context.release(lock); @@ -802,17 +1001,20 @@ pub fn stream_query_result_array( debug!("XADD result: {:?}", xadd_result); panic!(); } - } + }; + i += 1; } context.release(lock); - if result.len() == 2 { - let start_and_end = result[1].clone(); - result.push(start_and_end); - } - result.push(sql::Entity::Integer { - int: array.len() as i64, - }); + result.push( + first_stream_index + .expect("Not found first index when returning a stream"), + ); + result + .push(second_stream_index.expect( + "Not found second index when returning a stream", + )); + result.push(Entity::Integer { int: i }); Ok(QueryResult::Array { names: vec![ @@ -821,7 +1023,7 @@ pub fn stream_query_result_array( String::from("last_id"), String::from("size"), ], - array: vec![result], + array: result, }) } @@ -834,10 +1036,14 @@ pub fn listen_and_execute<'a, L: 'a + LoopData>( loop { debug!("Loop iteration"); match rx.recv() { - Ok(Command::Exec { query, client }) => { + Ok(Command::Exec { + query, + client, + timeout, + }) => { debug!("Exec | Query = {:?}", query); loopdata.with_contex_set( - Context::thread_safe(&client), + Context::no_client(), |_| { let result = do_execute(&loopdata.get_db(), query); @@ -845,22 +1051,29 @@ pub fn listen_and_execute<'a, L: 'a + LoopData>( Ok(_) => STATISTICS.exec_ok(), Err(_) => STATISTICS.exec_err(), } - return_value(&client, result); + return_value( + &client, + &ReturnMethod::Reply, + result, + timeout, + ); }, ); debug!("Exec | DONE, returning result"); } Ok(Command::Query { query, + timeout, return_method, client, }) => { debug!("Query | Query = {:?}", query); loopdata.with_contex_set( - Context::thread_safe(&client), + Context::no_client(), |_| { let result = do_query(&loopdata.get_db(), query); + match (&return_method, &result) { (ReturnMethod::Reply, Ok(_)) => { STATISTICS.query_ok() @@ -875,10 +1088,11 @@ pub fn listen_and_execute<'a, L: 'a + LoopData>( STATISTICS.query_into_err() } }; - return_value_v2( + return_value( &client, &return_method, result, + timeout, ); }, ); @@ -899,7 +1113,10 @@ pub fn listen_and_execute<'a, L: 'a + LoopData>( Ok(_) => STATISTICS.update_statement_ok(), Err(_) => STATISTICS.update_statement_err(), }; - return_value(&client, result) + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + + return_value(&client, &ReturnMethod::Reply, result, t) } Ok(Command::DeleteStatement { identifier, client }) => { debug!( @@ -913,7 +1130,15 @@ pub fn listen_and_execute<'a, L: 'a + LoopData>( Ok(_) => STATISTICS.delete_statement_ok(), Err(_) => STATISTICS.delete_statement_err(), } - return_value(&client, result); + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + + return_value( + &client, + &ReturnMethod::Reply, + result, + t, + ); } Ok(Command::CompileStatement { identifier, @@ -931,12 +1156,21 @@ pub fn listen_and_execute<'a, L: 'a + LoopData>( Ok(_) => STATISTICS.create_statement_ok(), Err(_) => STATISTICS.create_statement_err(), } - return_value(&client, result); + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + + return_value( + &client, + &ReturnMethod::Reply, + result, + t, + ); } Ok(Command::ExecStatement { identifier, arguments, + timeout, client, }) => { debug!( @@ -944,7 +1178,7 @@ pub fn listen_and_execute<'a, L: 'a + LoopData>( identifier, arguments ); loopdata.with_contex_set( - Context::thread_safe(&client), + Context::no_client(), |_| { let result = loopdata .get_replication_book() @@ -953,7 +1187,13 @@ pub fn listen_and_execute<'a, L: 'a + LoopData>( Ok(_) => STATISTICS.exec_statement_ok(), Err(_) => STATISTICS.exec_statement_err(), } - return_value(&client, result); + + return_value( + &client, + &ReturnMethod::Reply, + result, + timeout, + ); }, ); } @@ -961,10 +1201,11 @@ pub fn listen_and_execute<'a, L: 'a + LoopData>( identifier, arguments, return_method, + timeout, client, }) => { loopdata.with_contex_set( - Context::thread_safe(&client), + Context::no_client(), |_| { let result = loopdata .get_replication_book() @@ -986,10 +1227,12 @@ pub fn listen_and_execute<'a, L: 'a + LoopData>( STATISTICS.query_statement_into_err() } }; - return_value_v2( + + return_value( &client, &return_method, result, + timeout, ); }, ); @@ -999,7 +1242,7 @@ pub fn listen_and_execute<'a, L: 'a + LoopData>( client, }) => { loopdata.with_contex_set( - Context::thread_safe(&client), + Context::no_client(), |_| { debug!("MakeCopy | Doing do_copy"); let destination_loopdata = @@ -1012,7 +1255,15 @@ pub fn listen_and_execute<'a, L: 'a + LoopData>( Ok(_) => STATISTICS.copy_ok(), Err(_) => STATISTICS.copy_err(), }; - return_value(&client, result); + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + + return_value( + &client, + &ReturnMethod::Reply, + result, + t, + ); }, ); std::mem::forget(destination); @@ -1056,15 +1307,16 @@ fn compile_and_insert_statement<'a, L: 'a + LoopData>( } } Entry::Occupied(_) => { - let err = RedisError { - msg: String::from( - "Statement already exists, \ - impossible to overwrite it with \ + let err = RediSQLError::new( + "Statement already exists".to_string(), + String::from( + "Impossible to overwrite it with \ this command, try with \ UPDATE_STATEMENT", ), - }; - Err(err::RediSQLError::from(err)) + ); + + Err(err) } } } @@ -1078,7 +1330,7 @@ pub struct DBKey { impl DBKey { pub fn new_from_arc( tx: Sender, - db: Arc>, + db: Arc>, in_memory: bool, redis_context: Arc>>>, ) -> DBKey { @@ -1098,8 +1350,8 @@ impl Drop for DBKey { } pub fn create_metadata_table( - db: Arc>, -) -> Result<(), sql::SQLite3Error> { + db: Arc>, +) -> Result<(), SQLite3Error> { let statement = "CREATE TABLE IF NOT EXISTS RediSQLMetadata(data_type TEXT, key TEXT, value TEXT);"; let stmt = MultiStatement::new(db, statement)?; @@ -1108,11 +1360,11 @@ pub fn create_metadata_table( } pub fn insert_metadata( - db: Arc>, + db: Arc>, data_type: &str, key: &str, value: &str, -) -> Result<(), sql::SQLite3Error> { +) -> Result<(), SQLite3Error> { let statement = "INSERT INTO RediSQLMetadata VALUES(?1, ?2, ?3);"; let stmt = MultiStatement::new(db, statement)?; @@ -1124,8 +1376,8 @@ pub fn insert_metadata( } pub fn enable_foreign_key( - db: Arc>, -) -> Result<(), sql::SQLite3Error> { + db: Arc>, +) -> Result<(), SQLite3Error> { let enable_foreign_key = "PRAGMA foreign_keys = ON;"; match MultiStatement::new(db, enable_foreign_key) { Err(e) => Err(e), @@ -1137,10 +1389,10 @@ pub fn enable_foreign_key( } fn update_statement_metadata( - db: Arc>, + db: Arc>, key: &str, value: &str, -) -> Result<(), sql::SQLite3Error> { +) -> Result<(), SQLite3Error> { let statement = "UPDATE RediSQLMetadata SET value = ?1 WHERE data_type = 'statement' AND key = ?2"; @@ -1152,9 +1404,9 @@ fn update_statement_metadata( } fn remove_statement_metadata( - db: Arc>, + db: Arc>, key: &str, -) -> Result<(), sql::SQLite3Error> { +) -> Result<(), SQLite3Error> { let statement = "DELETE FROM RediSQLMetadata WHERE data_type = 'statement' AND key = ?1"; let stmt = MultiStatement::new(db, statement)?; @@ -1164,28 +1416,26 @@ fn remove_statement_metadata( } fn get_statement_metadata( - db: Arc>, -) -> Result { + db: Arc>, +) -> Result { let statement = "SELECT * FROM RediSQLMetadata WHERE data_type = 'statement';"; let stmt = MultiStatement::new(db, statement)?; let cursor = stmt.execute()?; - Ok(QueryResult::from(cursor)) + QueryResult::try_from(cursor) } fn get_path_metadata( - db: Arc>, -) -> Result { + db: Arc>, +) -> Result { let statement = "SELECT value FROM RediSQLMetadata WHERE data_type = 'path' AND key = 'path';"; let stmt = MultiStatement::new(db, statement)?; let cursor = stmt.execute()?; - Ok(QueryResult::from(cursor)) + QueryResult::try_from(cursor) } -pub fn is_redisql_database( - db: Arc>, -) -> bool { +pub fn is_redisql_database(db: Arc>) -> bool { let query = "SELECT name FROM sqlite_master WHERE type='table' AND name='RediSQLMetadata;"; let query = MultiStatement::new(db, query); @@ -1199,19 +1449,22 @@ pub fn is_redisql_database( return false; }; - match QueryResult::from(cursor.unwrap()) { - QueryResult::Array { .. } => true, - _ => false, + match QueryResult::try_from(cursor.unwrap()) { + Ok(QueryResult::Array { .. }) => true, + Ok(_) => false, + Err(_) => false, } } pub fn get_path_from_db( - db: Arc>, + db: Arc>, ) -> Result { match get_path_metadata(db) { - Err(e) => Err(e.into()), - Ok(QueryResult::Array { array, .. }) => match array[0][0] { - sql::Entity::Text { ref text } => match text { + Err(e) => Err(e), + // we have one big vector of results, else the first element is just [0] and not [0][0] + // it use to be a matrix, is not anymore the case. + Ok(QueryResult::Array { array, .. }) => match array[0] { + Entity::Text { ref text } => match text { t if t.is_empty() => { let err = RediSQLError::new( "Found empty path".to_string(), @@ -1235,16 +1488,16 @@ pub fn get_path_from_db( } pub fn insert_path_metadata( - db: Arc>, + db: Arc>, path: &str, -) -> Result<(), sql::SQLite3Error> { +) -> Result<(), SQLite3Error> { insert_metadata(db, "path", "path", path) } fn update_path_metadata( - db: Arc>, + db: Arc>, value: &str, -) -> Result<(), sql::SQLite3Error> { +) -> Result<(), SQLite3Error> { let statement = "UPDATE RediSQLMetadata SET value = ?1 WHERE data_type = 'path' AND key = 'path'"; @@ -1255,9 +1508,9 @@ fn update_path_metadata( } pub fn make_backup( - conn1: &sql::RawConnection, - conn2: &sql::RawConnection, -) -> Result { + conn1: &RawConnection, + conn2: &RawConnection, +) -> Result { match sql::create_backup(conn1, conn2) { Err(e) => Err(e), Ok(bk) => { @@ -1272,10 +1525,10 @@ pub fn make_backup( } pub fn create_backup( - conn: &sql::RawConnection, + conn: &RawConnection, path: &str, -) -> Result { - match sql::RawConnection::open_connection(path) { +) -> Result { + match RawConnection::open_connection(path) { Err(e) => Err(e), Ok(new_db) => make_backup(conn, &new_db), } @@ -1356,28 +1609,6 @@ pub unsafe fn write_rdb_to_file( Ok(()) } -pub fn with_leaky_db( - ctx: *mut rm::ffi::RedisModuleCtx, - name: &str, - f: F, -) -> i32 -where - F: Fn(&Result) -> i32, -{ - let db = match get_dbkeyptr_from_name(ctx, name) { - Err(err) => Err(err), - Ok(ptr) => Ok(unsafe { ptr.read() }), - }; - let result = f(&db); - debug!("with_leaky_db | go result {}", result); - if db.is_ok() { - debug!("with_leaky_db | forgetting db"); - // Box::into_raw(db); - std::mem::forget(db); - } - result -} - pub fn get_dbkeyptr_from_name( ctx: *mut rm::ffi::RedisModuleCtx, name: &str, @@ -1416,39 +1647,10 @@ pub fn get_dbkey_from_name( Ok(dbkey) } -pub fn with_ch_and_loopdata( - ctx: *mut rm::ffi::RedisModuleCtx, - name: &str, - f: F, -) -> i32 -where - F: Fn(Result<(&Sender, &mut Loop), i32>) -> i32, -{ - let r = get_ch_and_loopdata_from_name(ctx, name); - f(r) -} - -pub fn get_ch_and_loopdata_from_name( - ctx: *mut rm::ffi::RedisModuleCtx, - name: &str, -) -> Result<(&Sender, &mut Loop), i32> { - // here we are intentionally leaking the DBKey, so that it does - // not get destroyed - let db: *mut DBKey = get_dbkeyptr_from_name(ctx, name)?; - let channel = unsafe { &(*db).tx }; - let loopdata = unsafe { &mut (*db).loop_data }; - Ok((channel, loopdata)) -} - -pub fn get_db_channel_from_name( - ctx: *mut rm::ffi::RedisModuleCtx, - name: &str, -) -> Result, i32> { - let db: *mut DBKey = get_dbkeyptr_from_name(ctx, name)?; - let db = unsafe { Box::from_raw(db) }; - let channel = db.tx.clone(); - std::mem::forget(db); - Ok(channel) +pub unsafe fn get_ch_from_dbkeyptr( + db: *mut DBKey, +) -> Sender { + (*db).tx.clone() } pub fn reply_with_error_from_key_type( @@ -1471,7 +1673,7 @@ pub fn reply_with_error_from_key_type( } fn create_statement( - db: Arc>, + db: Arc>, identifier: &str, statement: &str, ) -> Result { @@ -1481,7 +1683,7 @@ fn create_statement( } fn update_statement( - db: &Arc>, + db: &Arc>, identifier: &str, statement: &str, ) -> Result { @@ -1491,22 +1693,13 @@ fn update_statement( } fn remove_statement( - db: &Arc>, + db: &Arc>, identifier: &str, ) -> Result<(), err::RediSQLError> { remove_statement_metadata(Arc::clone(db), identifier) .or_else(|e| Err(err::RediSQLError::from(e))) } -#[allow(non_snake_case)] -pub unsafe fn Replicate( - _ctx: &rm::Context, - _command: &str, - _argv: *mut *mut rm::ffi::RedisModuleString, - _argc: std::os::raw::c_int, -) { -} - pub fn register_function( context: &rm::Context, name: &str, diff --git a/redisql_lib/src/redis_type.rs b/redisql_lib/src/redis_type.rs index 677f2cb..2b6a985 100644 --- a/redisql_lib/src/redis_type.rs +++ b/redisql_lib/src/redis_type.rs @@ -28,6 +28,17 @@ impl Context { thread_safe: false, } } + pub fn no_client() -> Self { + let ctx = unsafe { + ffi::RedisModule_GetThreadSafeContext.unwrap()( + std::ptr::null_mut(), + ) + }; + Context { + ctx, + thread_safe: false, + } + } pub fn as_ptr(&self) -> *mut ffi::RedisModuleCtx { self.ctx } @@ -128,19 +139,19 @@ impl<'a> Drop for RMString<'a> { //array and leak the RMString. //It is to be used as argument to RM_Call when the format includes the `v`, an array of RMString. #[derive(Debug)] -pub struct LeakyArrayOfRMString<'a> { +struct LeakyArrayOfRMString<'a> { array: Vec<*mut ffi::RedisModuleString>, ctx: &'a Context, } impl<'a> LeakyArrayOfRMString<'a> { - pub fn new(ctx: &'a Context) -> LeakyArrayOfRMString { + fn new(ctx: &'a Context) -> LeakyArrayOfRMString { LeakyArrayOfRMString { array: Vec::with_capacity(24), ctx, } } - pub fn push(&mut self, s: &str) { + fn push(&mut self, s: &str) { let ptr = unsafe { ffi::RedisModule_CreateString.unwrap()( self.ctx.as_ptr(), @@ -150,15 +161,12 @@ impl<'a> LeakyArrayOfRMString<'a> { }; self.array.push(ptr); } - pub fn as_ptr(&mut self) -> *mut *mut ffi::RedisModuleString { + fn as_ptr(&mut self) -> *mut *mut ffi::RedisModuleString { self.array.as_mut_ptr() } - pub fn len(&self) -> usize { + fn len(&self) -> usize { self.array.len() } - pub fn is_empty(&self) -> bool { - self.len() == 0 - } } #[derive(Debug)] @@ -288,15 +296,6 @@ pub fn OpenKey( } } -/* -#[allow(non_snake_case)] -pub fn LoadStringBuffer(rdb: *mut rm::ffi::RedisModuleIO, - dimension: &mut usize) - -> { - unsafe { ffi::RedisModule_LoadStringBuffer(rdb, dimension) } -} -*/ - #[allow(non_snake_case)] pub unsafe fn LoadSigned(rdb: *mut ffi::RedisModuleIO) -> i64 { ffi::RedisModule_LoadSigned.unwrap()(rdb) as i64 diff --git a/redisql_lib/src/redisql_error.rs b/redisql_lib/src/redisql_error.rs index caefaaf..36a9a54 100644 --- a/redisql_lib/src/redisql_error.rs +++ b/redisql_lib/src/redisql_error.rs @@ -2,7 +2,6 @@ use std::error; use std::error::Error; use std::fmt; -use crate::redis; use crate::sqlite as sql; pub trait RediSQLErrorTrait: fmt::Display + error::Error {} @@ -13,15 +12,18 @@ pub struct RediSQLError { } impl RediSQLError { - pub fn new( - debug: String, - error_description: String, - ) -> RediSQLError { + pub fn new(debug: String, error_description: String) -> Self { RediSQLError { debug, error_description, } } + pub fn timeout() -> Self { + RediSQLError::new( + "Timeout expired.".to_string(), + "It was impossible to return the whole result before the timeout expired.".to_string(), + ) + } } impl fmt::Debug for RediSQLError { @@ -50,12 +52,3 @@ impl From for RediSQLError { } } } - -impl From for RediSQLError { - fn from(err: redis::RedisError) -> RediSQLError { - RediSQLError { - debug: format!("{}", err), - error_description: err.description().to_owned(), - } - } -} diff --git a/redisql_lib/src/sqlite.rs b/redisql_lib/src/sqlite.rs index 9a1a013..935db11 100644 --- a/redisql_lib/src/sqlite.rs +++ b/redisql_lib/src/sqlite.rs @@ -1,3 +1,4 @@ +use std::convert::TryFrom; use std::error; use std::ffi::{CStr, CString}; use std::fmt; @@ -7,8 +8,6 @@ use std::os::raw::c_char; use std::ptr; use std::sync::{Arc, Mutex}; -use crate::redis::QueryResult; - use crate::redisql_error as err; use crate::community_statement::Statement; @@ -173,7 +172,7 @@ pub trait StatementTrait<'a>: Sized { } } -pub enum EntityType { +enum EntityType { Integer, Float, Text, @@ -183,7 +182,7 @@ pub enum EntityType { // TODO XXX explore it is possible to change these String into &str // and then use Copy instead of Clone -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum Entity { Integer { int: i64 }, Float { float: f64 }, @@ -196,7 +195,7 @@ pub enum Entity { } impl Entity { - fn new(stmt: &Statement, i: i32) -> Entity { + pub fn new(stmt: &Statement, i: i32) -> Entity { match get_entity_type(stmt.as_ptr(), i) { EntityType::Integer => { let int = unsafe { @@ -240,9 +239,7 @@ impl Entity { } } -pub type Row = Vec; - -pub enum Cursor<'a> { +pub enum Cursor { OKCursor, DONECursor { modified_rows: i32, @@ -250,19 +247,19 @@ pub enum Cursor<'a> { RowsCursor { num_columns: i32, previous_status: i32, - stmt: &'a Statement, + stmt: Statement, modified_rows: i32, }, /* ADD empty cursor, it will be the easiest (and maybe * cleaner?) way to manage empty return statements */ } -impl<'a> FromIterator> for Cursor<'a> { - fn from_iter>>( +impl<'a> FromIterator for Cursor { + fn from_iter>( cursors: I, - ) -> Cursor<'a> { + ) -> Cursor { let mut modified = 0; - let mut last: Option> = None; + let mut last: Option = None; for cursor in cursors { match cursor { Cursor::OKCursor {} => { @@ -302,20 +299,37 @@ fn get_entity_type( } } -impl<'a> From> for QueryResult { - fn from(mut cursor: Cursor) -> QueryResult { +pub enum QueryResult { + OK {}, + DONE { + modified_rows: i32, + }, + Array { + names: Vec, + array: Vec, + }, +} + +impl QueryResult { + pub fn from_cursor_before( + mut cursor: Cursor, + timeout: std::time::Instant, + ) -> Result { match cursor { - Cursor::OKCursor {} => QueryResult::OK {}, + Cursor::OKCursor {} => Ok(QueryResult::OK {}), Cursor::DONECursor { modified_rows } => { - QueryResult::DONE { modified_rows } + Ok(QueryResult::DONE { modified_rows }) } - Cursor::RowsCursor { - stmt, + ref stmt, num_columns, ref mut previous_status, .. } => { + let mut now = std::time::Instant::now(); + if now > timeout { + return Err(err::RediSQLError::timeout()); + } let mut result = vec![]; let mut names = Vec::with_capacity(num_columns as usize); @@ -331,22 +345,82 @@ impl<'a> From> for QueryResult { names.push(name); } while *previous_status == ffi::SQLITE_ROW { - let mut row = - Vec::with_capacity(num_columns as usize); + now = std::time::Instant::now(); + if now > timeout { + return Err(err::RediSQLError::timeout()); + } for i in 0..num_columns { let entity_value = Entity::new(stmt, i); - row.push(entity_value); + result.push(entity_value); } unsafe { *previous_status = ffi::sqlite3_step(stmt.as_ptr()); }; + } + match *previous_status { + ffi::SQLITE_INTERRUPT => { + Err(err::RediSQLError::new("Query Interrupted".to_string(), "The query was interrupted, most likely because it runs out of time.".to_string())) + }, + _ => Ok(QueryResult::Array { + names, + array: result, + }), + } + } + } + } +} + +impl TryFrom for QueryResult { + type Error = err::RediSQLError; + fn try_from( + mut cursor: Cursor, + ) -> Result { + match cursor { + Cursor::OKCursor {} => Ok(QueryResult::OK {}), + Cursor::DONECursor { modified_rows } => { + Ok(QueryResult::DONE { modified_rows }) + } - result.push(row); + Cursor::RowsCursor { + ref stmt, + num_columns, + ref mut previous_status, + .. + } => { + let mut result = vec![]; + let mut names = + Vec::with_capacity(num_columns as usize); + for i in 0..num_columns { + let name = unsafe { + CStr::from_ptr(ffi::sqlite3_column_name( + stmt.as_ptr(), + i, + )) + .to_string_lossy() + .into_owned() + }; + names.push(name); + } + while *previous_status == ffi::SQLITE_ROW { + for i in 0..num_columns { + let entity_value = Entity::new(stmt, i); + result.push(entity_value); + } + unsafe { + *previous_status = + ffi::sqlite3_step(stmt.as_ptr()); + }; } - QueryResult::Array { - names, - array: result, + match *previous_status { + ffi::SQLITE_INTERRUPT => { + Err(Self::Error::new("Query Interrupted".to_string(), "The query was interrupted, most likely because it runs out of time.".to_string())) + }, + _ => Ok(QueryResult::Array { + names, + array: result, + }), } } } diff --git a/src/commands.rs b/src/commands.rs index b0aef59..6e28841 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -4,11 +4,12 @@ use std::sync::mpsc::channel; use std::thread; use redisql_lib::redis::{ - get_dbkey_from_name, reply_with_error_from_key_type, - with_ch_and_loopdata, RedisReply, + get_ch_from_dbkeyptr, get_dbkey_from_name, + get_dbkeyptr_from_name, reply_with_error_from_key_type, + RedisReply, }; use redisql_lib::redis_type::ReplicateVerbatim; -use redisql_lib::sqlite::{get_arc_connection, SQLite3Error}; +use redisql_lib::sqlite::{get_arc_connection, QueryResult}; use redisql_lib::virtual_tables as vtab; use redisql_lib::redis as r; @@ -20,7 +21,7 @@ use redisql_lib::statistics::STATISTICS; const REDISQL_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION"); -extern "C" fn reply_exec( +extern "C" fn reply( ctx: *mut r::rm::ffi::RedisModuleCtx, _argv: *mut *mut r::rm::ffi::RedisModuleString, _argc: ::std::os::raw::c_int, @@ -29,52 +30,11 @@ extern "C" fn reply_exec( let result = unsafe { r::rm::ffi::RedisModule_GetBlockedClientPrivateData.unwrap()( context.as_ptr(), - ) as *mut Result + ) as *mut *mut RedisReply }; - let result: Box> = - unsafe { Box::from_raw(result) }; - match *result { - Ok(query_result) => query_result.reply(&context), - Err(error) => error.reply(&context), - } -} - -extern "C" fn reply_exec_statement( - ctx: *mut r::rm::ffi::RedisModuleCtx, - _argv: *mut *mut r::rm::ffi::RedisModuleString, - _argc: ::std::os::raw::c_int, -) -> i32 { - let context = r::rm::Context::new(ctx); - let result = unsafe { - r::rm::ffi::RedisModule_GetBlockedClientPrivateData.unwrap()( - context.as_ptr(), - ) as *mut Result - }; - let result: Box> = - unsafe { Box::from_raw(result) }; - match *result { - Ok(query_result) => query_result.reply(&context), - Err(error) => error.reply(&context), - } -} - -extern "C" fn reply_create_statement( - ctx: *mut r::rm::ffi::RedisModuleCtx, - _argv: *mut *mut r::rm::ffi::RedisModuleString, - _argc: ::std::os::raw::c_int, -) -> i32 { - let context = r::rm::Context::new(ctx); - let result = unsafe { - r::rm::ffi::RedisModule_GetBlockedClientPrivateData.unwrap()( - context.as_ptr(), - ) as *mut Result - }; - let result: Box> = - unsafe { Box::from_raw(result) }; - match *result { - Ok(query_result) => query_result.reply(&context), - Err(error) => error.reply(&context), - } + let mut result: Box = + unsafe { Box::from_raw(*result) }; + result.reply(&context) } extern "C" fn timeout( @@ -98,7 +58,7 @@ pub extern "C" fn ExecStatement( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { STATISTICS.exec_statement_err(); return error.reply(&context); } @@ -119,54 +79,59 @@ pub extern "C" fn ExecStatement( ) } } - _ => with_ch_and_loopdata( - context.as_ptr(), - argvector[1], - |ch_loopdata| match ch_loopdata { - Err(key_type) => { - STATISTICS.exec_statement_err(); + _ => { + let db = match get_dbkeyptr_from_name( + context.as_ptr(), + argvector[1], + ) { + Ok(db) => db, + Err(e) => { + STATISTICS.exec_err(); + return reply_with_error_from_key_type( + context.as_ptr(), + e, + ); + } + }; - reply_with_error_from_key_type( + let ch = unsafe { get_ch_from_dbkeyptr(db) }; + + let blocked_client = r::rm::BlockedClient { + client: unsafe { + r::rm::ffi::RedisModule_BlockClient.unwrap()( context.as_ptr(), - key_type, + Some(reply), + Some(timeout), + Some(free_privdata), + 10000, ) - } - Ok((ch, _loopdata)) => { - let blocked_client = r::rm::BlockedClient { - client: unsafe { - r::rm::ffi::RedisModule_BlockClient - .unwrap()( - context.as_ptr(), - Some(reply_exec_statement), - Some(timeout), - Some(free_privdata), - 10000, - ) - }, - }; - - let cmd = r::Command::ExecStatement { - identifier: argvector[2], - arguments: argvector[3..].to_vec(), - client: blocked_client, - }; - match ch.send(cmd) { - Ok(()) => { - unsafe { - Replicate( - &context, - "REDISQL.EXEC_STATEMENT.NOW", - argv, - argc, - ); - } - r::rm::ffi::REDISMODULE_OK - } - Err(_) => r::rm::ffi::REDISMODULE_OK, + }, + }; + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + + let cmd = r::Command::ExecStatement { + identifier: argvector[2], + arguments: argvector[3..].to_vec(), + client: blocked_client, + timeout: t, + }; + + match ch.send(cmd) { + Ok(()) => { + unsafe { + Replicate( + &context, + "REDISQL.EXEC_STATEMENT.NOW", + argv, + argc, + ); } + r::rm::ffi::REDISMODULE_OK } - }, - ), + Err(_) => r::rm::ffi::REDISMODULE_OK, + } + } } } @@ -180,7 +145,7 @@ pub extern "C" fn QueryStatement( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { STATISTICS.query_statement_err(); return error.reply(&context); } @@ -201,46 +166,50 @@ pub extern "C" fn QueryStatement( ) } } - _ => with_ch_and_loopdata( - context.as_ptr(), - argvector[1], - |ch_loopdata| match ch_loopdata { - Err(key_type) => { - STATISTICS.query_statement_err(); + _ => { + let db = match get_dbkeyptr_from_name( + context.as_ptr(), + argvector[1], + ) { + Ok(db) => db, + Err(e) => { + STATISTICS.exec_err(); + return reply_with_error_from_key_type( + context.as_ptr(), + e, + ); + } + }; + let ch = unsafe { get_ch_from_dbkeyptr(db) }; - reply_with_error_from_key_type( + let blocked_client = r::rm::BlockedClient { + client: unsafe { + r::rm::ffi::RedisModule_BlockClient.unwrap()( context.as_ptr(), - key_type, + Some(reply), + Some(timeout), + Some(free_privdata), + 10000, ) - } - Ok((ch, _loopdata)) => { - let blocked_client = r::rm::BlockedClient { - client: unsafe { - r::rm::ffi::RedisModule_BlockClient - .unwrap()( - context.as_ptr(), - Some(reply_exec_statement), - Some(timeout), - Some(free_privdata), - 10000, - ) - }, - }; - - let cmd = r::Command::QueryStatement { - identifier: argvector[2], - arguments: argvector[3..].to_vec(), - return_method: r::ReturnMethod::Reply, - client: blocked_client, - }; - - match ch.send(cmd) { - Ok(()) => r::rm::ffi::REDISMODULE_OK, - Err(_) => r::rm::ffi::REDISMODULE_OK, - } - } - }, - ), + }, + }; + + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + + let cmd = r::Command::QueryStatement { + identifier: argvector[2], + arguments: argvector[3..].to_vec(), + return_method: r::ReturnMethod::Reply, + client: blocked_client, + timeout: t, + }; + + match ch.send(cmd) { + Ok(()) => r::rm::ffi::REDISMODULE_OK, + Err(_) => r::rm::ffi::REDISMODULE_OK, + } + } } } @@ -254,7 +223,7 @@ pub extern "C" fn QueryStatementInto( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { STATISTICS.query_statement_into_err(); return error.reply(&context); } @@ -277,50 +246,51 @@ pub extern "C" fn QueryStatementInto( } _ => { let stream_name = argvector[1]; - let db = argvector[2]; - with_ch_and_loopdata( + let db = match get_dbkeyptr_from_name( context.as_ptr(), - db, - |ch_loopdata| match ch_loopdata { - Err(key_type) => { - STATISTICS.query_statement_into(); + argvector[2], + ) { + Ok(db) => db, + Err(e) => { + STATISTICS.exec_err(); + return reply_with_error_from_key_type( + context.as_ptr(), + e, + ); + } + }; + let ch = unsafe { get_ch_from_dbkeyptr(db) }; - reply_with_error_from_key_type( - context.as_ptr(), - key_type, - ) - } - Ok((ch, _loopdata)) => { - let blocked_client = r::rm::BlockedClient { - client: unsafe { - r::rm::ffi::RedisModule_BlockClient - .unwrap()( - context.as_ptr(), - Some(reply_exec_statement), - Some(timeout), - Some(free_privdata), - 10000, - ) - }, - }; + let blocked_client = r::rm::BlockedClient { + client: unsafe { + r::rm::ffi::RedisModule_BlockClient.unwrap()( + context.as_ptr(), + Some(reply), + Some(timeout), + Some(free_privdata), + 10000, + ) + }, + }; - let cmd = r::Command::QueryStatement { - identifier: argvector[3], - arguments: argvector[4..].to_vec(), - return_method: r::ReturnMethod::Stream { - name: stream_name, - }, - client: blocked_client, - }; + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); - match ch.send(cmd) { - Ok(()) => r::rm::ffi::REDISMODULE_OK, - Err(_) => r::rm::ffi::REDISMODULE_OK, - } - } + let cmd = r::Command::QueryStatement { + identifier: argvector[3], + arguments: argvector[4..].to_vec(), + return_method: r::ReturnMethod::Stream { + name: stream_name, }, - ) + client: blocked_client, + timeout: t, + }; + + match ch.send(cmd) { + Ok(()) => r::rm::ffi::REDISMODULE_OK, + Err(_) => r::rm::ffi::REDISMODULE_OK, + } } } } @@ -335,63 +305,69 @@ pub extern "C" fn Exec( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { STATISTICS.exec_err(); return error.reply(&context); } }; match argvector.len() { - 3 => with_ch_and_loopdata( - context.as_ptr(), - argvector[1], - |leaky_db| match leaky_db { - Err(key_type) => { + 3 => { + let db = match get_dbkeyptr_from_name( + context.as_ptr(), + argvector[1], + ) { + Ok(db) => db, + Err(e) => { STATISTICS.exec_err(); + return reply_with_error_from_key_type( + context.as_ptr(), + e, + ); + } + }; + let ch = unsafe { get_ch_from_dbkeyptr(db) }; - reply_with_error_from_key_type( + let blocked_client = r::rm::BlockedClient { + client: unsafe { + r::rm::ffi::RedisModule_BlockClient.unwrap()( context.as_ptr(), - key_type, + Some(reply), + Some(timeout), + Some(free_privdata), + 10000, ) - } - Ok((ch, _loopdata)) => { - debug!("Exec | GotDB"); - let blocked_client = r::rm::BlockedClient { - client: unsafe { - r::rm::ffi::RedisModule_BlockClient - .unwrap()( - context.as_ptr(), - Some(reply_exec), - Some(timeout), - Some(free_privdata), - 10000, - ) - }, - }; - debug!("Exec | BlockedClient"); - - let cmd = r::Command::Exec { - query: argvector[2], - client: blocked_client, - }; - debug!("Exec | Create Command"); - match ch.send(cmd) { - Ok(()) => { - unsafe { - Replicate( - &context, - "REDISQL.EXEC.NOW", - argv, - argc, - ); - } - r::rm::ffi::REDISMODULE_OK - } - Err(_) => r::rm::ffi::REDISMODULE_OK, + }, + }; + + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + + let cmd = r::Command::Exec { + query: argvector[2], + client: blocked_client, + timeout: t, + }; + + match ch.send(cmd) { + Ok(()) => { + unsafe { + Replicate( + &context, + "REDISQL.EXEC.NOW", + argv, + argc, + ); } + STATISTICS.exec_ok(); + r::rm::ffi::REDISMODULE_OK } - }, - ), + Err(_) => { + STATISTICS.exec_err(); + r::rm::ffi::REDISMODULE_OK + } + } + } n => { let error = CString::new(format!( "Wrong number of arguments, it \ @@ -420,51 +396,55 @@ pub extern "C" fn Query( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { STATISTICS.query_err(); return error.reply(&context); } }; match argvector.len() { - 3 => with_ch_and_loopdata( - context.as_ptr(), - argvector[1], - |ch_loopdata| match ch_loopdata { - Err(key_type) => { - STATISTICS.query_err(); + 3 => { + let db = match get_dbkeyptr_from_name( + context.as_ptr(), + argvector[1], + ) { + Ok(db) => db, + Err(e) => { + STATISTICS.exec_err(); + return reply_with_error_from_key_type( + context.as_ptr(), + e, + ); + } + }; + let ch = unsafe { get_ch_from_dbkeyptr(db) }; - reply_with_error_from_key_type( + let blocked_client = r::rm::BlockedClient { + client: unsafe { + r::rm::ffi::RedisModule_BlockClient.unwrap()( context.as_ptr(), - key_type, + Some(reply), + Some(timeout), + Some(free_privdata), + 10000, ) - } - Ok((ch, _loopdata)) => { - let blocked_client = r::rm::BlockedClient { - client: unsafe { - r::rm::ffi::RedisModule_BlockClient - .unwrap()( - context.as_ptr(), - Some(reply_exec), - Some(timeout), - Some(free_privdata), - 10000, - ) - }, - }; - - let cmd = r::Command::Query { - query: argvector[2], - return_method: r::ReturnMethod::Reply, - client: blocked_client, - }; - match ch.send(cmd) { - Ok(()) => r::rm::ffi::REDISMODULE_OK, - Err(_) => r::rm::ffi::REDISMODULE_OK, - } - } - }, - ), + }, + }; + + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + + let cmd = r::Command::Query { + query: argvector[2], + return_method: r::ReturnMethod::Reply, + client: blocked_client, + timeout: t, + }; + match ch.send(cmd) { + Ok(()) => r::rm::ffi::REDISMODULE_OK, + Err(_) => r::rm::ffi::REDISMODULE_OK, + } + } n => { let error = CString::new(format!( "Wrong number of arguments, it \ @@ -493,7 +473,7 @@ pub extern "C" fn QueryInto( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { STATISTICS.query_into_err(); return error.reply(&context); } @@ -502,47 +482,49 @@ pub extern "C" fn QueryInto( match argvector.len() { 4 => { let stream_name = argvector[1]; - let db = argvector[2]; - with_ch_and_loopdata( + + let db = match get_dbkeyptr_from_name( context.as_ptr(), - db, - |ch_loopdata| match ch_loopdata { - Err(key_type) => { - STATISTICS.query_into_err(); + argvector[2], + ) { + Ok(db) => db, + Err(e) => { + STATISTICS.exec_err(); + return reply_with_error_from_key_type( + context.as_ptr(), + e, + ); + } + }; + let ch = unsafe { get_ch_from_dbkeyptr(db) }; - reply_with_error_from_key_type( - context.as_ptr(), - key_type, - ) - } - Ok((ch, _loopdata)) => { - let blocked_client = r::rm::BlockedClient { - client: unsafe { - r::rm::ffi::RedisModule_BlockClient - .unwrap()( - context.as_ptr(), - Some(reply_exec), - Some(timeout), - Some(free_privdata), - 10000, - ) - }, - }; + let blocked_client = r::rm::BlockedClient { + client: unsafe { + r::rm::ffi::RedisModule_BlockClient.unwrap()( + context.as_ptr(), + Some(reply), + Some(timeout), + Some(free_privdata), + 10000, + ) + }, + }; - let cmd = r::Command::Query { - query: argvector[3], - return_method: r::ReturnMethod::Stream { - name: stream_name, - }, - client: blocked_client, - }; - match ch.send(cmd) { - Ok(()) => r::rm::ffi::REDISMODULE_OK, - Err(_) => r::rm::ffi::REDISMODULE_OK, - } - } + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + + let cmd = r::Command::Query { + query: argvector[3], + return_method: r::ReturnMethod::Stream { + name: stream_name, }, - ) + client: blocked_client, + timeout: t, + }; + match ch.send(cmd) { + Ok(()) => r::rm::ffi::REDISMODULE_OK, + Err(_) => r::rm::ffi::REDISMODULE_OK, + } } n => { let error = CString::new(format!( @@ -573,7 +555,7 @@ pub extern "C" fn CreateStatement( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { STATISTICS.create_statement_err(); return error.reply(&context); } @@ -581,48 +563,53 @@ pub extern "C" fn CreateStatement( match argvector.len() { 4 => { - with_ch_and_loopdata( + let db = match get_dbkeyptr_from_name( context.as_ptr(), argvector[1], - |ch_loopdata| match ch_loopdata { - Err(key_type) => { - STATISTICS.create_statement_err(); + ) { + Ok(db) => db, + Err(e) => { + STATISTICS.exec_err(); + return reply_with_error_from_key_type( + context.as_ptr(), + e, + ); + } + }; + let ch = unsafe { get_ch_from_dbkeyptr(db) }; - reply_with_error_from_key_type( - context.as_ptr(), - key_type, - ) - } - Ok((ch, _loopdata)) => { - let blocked_client = r::rm::BlockedClient { - client: unsafe { - r::rm::ffi::RedisModule_BlockClient - .unwrap()( - context.as_ptr(), - Some(reply_create_statement), - Some(timeout), - Some(free_privdata), - 10000, - ) - }, - }; - let cmd = r::Command::CompileStatement { - identifier: argvector[2], - statement: argvector[3], - client: blocked_client, - }; - match ch.send(cmd) { - Ok(()) => { - unsafe { - Replicate(&context, "REDISQL.CREATE_STATEMENT.NOW", argv, argc); - } - r::rm::ffi::REDISMODULE_OK - } - Err(_) => r::rm::ffi::REDISMODULE_OK, - } - } + let blocked_client = r::rm::BlockedClient { + client: unsafe { + r::rm::ffi::RedisModule_BlockClient.unwrap()( + context.as_ptr(), + Some(reply), + Some(timeout), + Some(free_privdata), + 10000, + ) }, - ) + }; + + let cmd = r::Command::CompileStatement { + identifier: argvector[2], + statement: argvector[3], + client: blocked_client, + }; + + match ch.send(cmd) { + Ok(()) => { + unsafe { + Replicate( + &context, + "REDISQL.CREATE_STATEMENT.NOW", + argv, + argc, + ); + } + r::rm::ffi::REDISMODULE_OK + } + Err(_) => r::rm::ffi::REDISMODULE_OK, + } } _ => { @@ -652,7 +639,7 @@ pub extern "C" fn UpdateStatement( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { STATISTICS.update_statement_err(); return error.reply(&context); } @@ -660,50 +647,53 @@ pub extern "C" fn UpdateStatement( match argvector.len() { 4 => { - with_ch_and_loopdata( + let db = match get_dbkeyptr_from_name( context.as_ptr(), argvector[1], - |ch_loopdata| match ch_loopdata { - Err(key_type) => { - STATISTICS.update_statement_err(); + ) { + Ok(db) => db, + Err(e) => { + STATISTICS.exec_err(); + return reply_with_error_from_key_type( + context.as_ptr(), + e, + ); + } + }; + let ch = unsafe { get_ch_from_dbkeyptr(db) }; - reply_with_error_from_key_type( - context.as_ptr(), - key_type, - ) - } - Ok((ch, _loopdata)) => { - let blocked_client = r::rm::BlockedClient { - client: unsafe { - r::rm::ffi::RedisModule_BlockClient - .unwrap()( - context.as_ptr(), - Some(reply_create_statement), - Some(timeout), - Some(free_privdata), - 10000, - ) - }, - }; + let blocked_client = r::rm::BlockedClient { + client: unsafe { + r::rm::ffi::RedisModule_BlockClient.unwrap()( + context.as_ptr(), + Some(reply), + Some(timeout), + Some(free_privdata), + 10000, + ) + }, + }; - let cmd = r::Command::UpdateStatement { - identifier: argvector[2], - statement: argvector[3], - client: blocked_client, - }; + let cmd = r::Command::UpdateStatement { + identifier: argvector[2], + statement: argvector[3], + client: blocked_client, + }; - match ch.send(cmd) { - Ok(()) => { - unsafe { - Replicate(&context, "REDISQL.UPDATE_STATEMENT.NOW", argv, argc); - } - r::rm::ffi::REDISMODULE_OK - } - Err(_) => r::rm::ffi::REDISMODULE_OK, - } + match ch.send(cmd) { + Ok(()) => { + unsafe { + Replicate( + &context, + "REDISQL.UPDATE_STATEMENT.NOW", + argv, + argc, + ); } - }, - ) + r::rm::ffi::REDISMODULE_OK + } + Err(_) => r::rm::ffi::REDISMODULE_OK, + } } _ => { @@ -734,59 +724,60 @@ pub extern "C" fn DeleteStatement( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { STATISTICS.delete_statement_err(); return error.reply(&context); } }; match argvector.len() { - 3 => with_ch_and_loopdata( - context.as_ptr(), - argvector[1], - |ch_loopdata| { - match ch_loopdata { - Err(key_type) => { - STATISTICS.delete_statement_err(); + 3 => { + let db = match get_dbkeyptr_from_name( + context.as_ptr(), + argvector[1], + ) { + Ok(db) => db, + Err(e) => { + STATISTICS.exec_err(); + return reply_with_error_from_key_type( + context.as_ptr(), + e, + ); + } + }; + let ch = unsafe { get_ch_from_dbkeyptr(db) }; - reply_with_error_from_key_type( - context.as_ptr(), - key_type, - ) - } - Ok((ch, _loopdata)) => { - //let ch = &db.tx; - //let _loopdata = &db.loop_data; - let blocked_client = r::rm::BlockedClient { - client: unsafe { - r::rm::ffi::RedisModule_BlockClient - .unwrap()( - context.as_ptr(), - Some(reply_create_statement), - Some(timeout), - Some(free_privdata), - 10000, - ) - }, - }; + let blocked_client = r::rm::BlockedClient { + client: unsafe { + r::rm::ffi::RedisModule_BlockClient.unwrap()( + context.as_ptr(), + Some(reply), + Some(timeout), + Some(free_privdata), + 10000, + ) + }, + }; - let cmd = r::Command::DeleteStatement { - identifier: argvector[2], - client: blocked_client, - }; - match ch.send(cmd) { - Ok(()) => { - unsafe { - Replicate(&context, "REDISQL.DELETE_STATEMENT.NOW", argv, argc); - } - r::rm::ffi::REDISMODULE_OK - } - Err(_) => r::rm::ffi::REDISMODULE_OK, - } + let cmd = r::Command::DeleteStatement { + identifier: argvector[2], + client: blocked_client, + }; + match ch.send(cmd) { + Ok(()) => { + unsafe { + Replicate( + &context, + "REDISQL.DELETE_STATEMENT.NOW", + argv, + argc, + ); } + r::rm::ffi::REDISMODULE_OK } - }, - ), + Err(_) => r::rm::ffi::REDISMODULE_OK, + } + } _ => { let error = CString::new( "Wrong number of arguments, it \ @@ -814,7 +805,7 @@ pub extern "C" fn CreateDB( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { STATISTICS.create_db_err(); return error.reply(&context); } @@ -856,7 +847,7 @@ pub extern "C" fn CreateDB( .and_then(|_| { vtab::register_modules(&rc) }) { - Err(e) => e.reply(&context), + Err(mut e) => e.reply(&context), Ok(vtab_context) => { let (tx, rx) = channel(); let db = r::DBKey::new_from_arc( @@ -885,8 +876,8 @@ pub extern "C" fn CreateDB( match type_set { r::rm::ffi::REDISMODULE_OK => { - let ok = - r::QueryResult::OK {}; + let mut ok = + QueryResult::OK {}; STATISTICS.create_db_ok(); ReplicateVerbatim(&context); ok.reply(&context) @@ -979,81 +970,78 @@ pub extern "C" fn MakeCopy( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { STATISTICS.copy_err(); return error.reply(&context); } }; match argvector.len() { - 3 => with_ch_and_loopdata( - context.as_ptr(), - argvector[1], - |ch_loopdata| match ch_loopdata { - Err(key_type) => { - STATISTICS.copy_err(); - - reply_with_error_from_key_type( - context.as_ptr(), - key_type, - ) - } - Ok((ch, _loopdata)) => { - let dest_db = get_dbkey_from_name( + 3 => { + let db = match get_dbkeyptr_from_name( + context.as_ptr(), + argvector[1], + ) { + Ok(db) => db, + Err(e) => { + STATISTICS.exec_err(); + return reply_with_error_from_key_type( context.as_ptr(), - argvector[2], + e, ); - if dest_db.is_err() { - let error = - CString::new("Error in opening the DESTINATION database") + } + }; + + let ch = unsafe { get_ch_from_dbkeyptr(db) }; + let dest_db = + get_dbkey_from_name(context.as_ptr(), argvector[2]); + if dest_db.is_err() { + let error = CString::new( + "Error in opening the DESTINATION database", + ) .unwrap(); - STATISTICS.copy_err(); - return unsafe { - r::rm::ffi::RedisModule_ReplyWithError - .unwrap()( - context.as_ptr(), - error.as_ptr(), - ) - }; - } - let dest_db = dest_db.unwrap(); - - let blocked_client = r::rm::BlockedClient { - client: unsafe { - r::rm::ffi::RedisModule_BlockClient - .unwrap()( - context.as_ptr(), - Some(reply_create_statement), - Some(timeout), - Some(free_privdata), - 10000, - ) - }, - }; + STATISTICS.copy_err(); + return unsafe { + r::rm::ffi::RedisModule_ReplyWithError.unwrap()( + context.as_ptr(), + error.as_ptr(), + ) + }; + } + let dest_db = dest_db.unwrap(); - let cmd = r::Command::MakeCopy { - destination: dest_db, - client: blocked_client, - }; + let blocked_client = r::rm::BlockedClient { + client: unsafe { + r::rm::ffi::RedisModule_BlockClient.unwrap()( + context.as_ptr(), + Some(reply), + Some(timeout), + Some(free_privdata), + 10000, + ) + }, + }; + let cmd = r::Command::MakeCopy { + destination: dest_db, + client: blocked_client, + }; - match ch.send(cmd) { - Ok(()) => { - debug!("MakeCopy | Successfully send command"); - unsafe { - Replicate( - &context, - "REDISQL.COPY.NOW", - argv, - argc, - ); - } - r::rm::ffi::REDISMODULE_OK - } - Err(_) => r::rm::ffi::REDISMODULE_OK, + match ch.send(cmd) { + Ok(()) => { + debug!("MakeCopy | Successfully send command"); + unsafe { + Replicate( + &context, + "REDISQL.COPY.NOW", + argv, + argc, + ); } + r::rm::ffi::REDISMODULE_OK } - }, - ), + Err(_) => r::rm::ffi::REDISMODULE_OK, + } + } _ => { let error = CString::new( "Wrong number of arguments, it accepts exactly 3", diff --git a/sync_engine/Cargo.toml b/sync_engine/Cargo.toml index a074a45..1edfabb 100644 --- a/sync_engine/Cargo.toml +++ b/sync_engine/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sync_engine" -version = "0.5.1-rc01" +version = "0.6.0" authors = ["Simone Mosciatti "] edition = "2018" diff --git a/sync_engine/src/lib.rs b/sync_engine/src/lib.rs index b681826..21e4e29 100644 --- a/sync_engine/src/lib.rs +++ b/sync_engine/src/lib.rs @@ -2,7 +2,8 @@ extern crate redisql_lib; use std::collections::vec_deque::VecDeque; use std::ffi::CString; -use std::mem; +use std::mem::zeroed; +use std::mem::ManuallyDrop; use std::os::raw; use std::sync::{Arc, Mutex}; @@ -14,8 +15,8 @@ use redisql_lib::redis as r; use redisql_lib::redis::{ do_execute, do_query, get_dbkey_from_name, register_function, register_function_with_keys, register_write_function, - reply_with_error_from_key_type, stream_query_result_array, - LoopData, RedisReply, StatementCache, + reply_with_error_from_key_type, LoopData, RedisReply, + ReturnMethod, Returner, StatementCache, }; use redisql_lib::redis_type::ffi::{ RedisModuleIO, RedisModuleString, @@ -32,7 +33,7 @@ struct DumpIterator { impl<'b> DumpIterator { fn new(conn: &Arc>) -> DumpIterator { let db = conn.lock().unwrap(); - let buffer: [u8; 4096] = unsafe { mem::zeroed() }; + let buffer: [u8; 4096] = unsafe { zeroed() }; let fd = unsafe { ffi::start((*db).get_db()) }; let iterator = VecDeque::new(); let first_chunk = String::from(""); @@ -139,13 +140,6 @@ fn check_args( } } -fn unwrap_return_code(r: Result) -> i32 { - match r { - Ok(ok) => ok, - Err(e) => e, - } -} - #[allow(non_snake_case)] pub extern "C" fn ExecNow( ctx: *mut r::rm::ffi::RedisModuleCtx, @@ -155,47 +149,53 @@ pub extern "C" fn ExecNow( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { return error.reply(&context); } }; - let return_code = check_args(argvector, 3) - .or_else(|e| { - Err(unsafe { - r::rm::ffi::RedisModule_ReplyWithError.unwrap()( - context.as_ptr(), - e.as_ptr(), - ) - }) - }) - .and_then(|args| { + match check_args(argvector, 3) { + Err(e) => unsafe { + r::rm::ffi::RedisModule_ReplyWithError.unwrap()( + context.as_ptr(), + e.as_ptr(), + ) + }, + Ok(args) => { match get_dbkey_from_name(context.as_ptr(), args[1]) { - Err(key_type) => Err(reply_with_error_from_key_type( + Err(key_type) => reply_with_error_from_key_type( context.as_ptr(), key_type, - )), + ), Ok(dbkey) => { - let (result, context) = { - let db = dbkey.loop_data.get_db(); - let redis_context = - dbkey.loop_data.set_rc(context); - let result = do_execute(&db, args[2]); - let context = redis_context.release(); - (result, context) - }; - mem::forget(dbkey); - match result { - Ok(result) => { + let dbkey = ManuallyDrop::new(dbkey); + let _rc = + dbkey.loop_data.set_rc(Context::no_client()); + let db = dbkey.loop_data.get_db(); + let result = do_execute(&db, args[2]); + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + let mut result = match result { + Ok(r) => { ReplicateVerbatim(&context); - Ok(result.reply(&context)) + r.create_data_to_return( + &context, + &ReturnMethod::Reply, + t, + ) } - Err(e) => Err(e.reply(&context)), - } + Err(r) => r.create_data_to_return( + &context, + &ReturnMethod::Reply, + t, + ), + }; + + result.reply(&context) } } - }); - unwrap_return_code(return_code) + } + } } #[allow(non_snake_case)] @@ -207,44 +207,49 @@ pub extern "C" fn QueryNow( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { return error.reply(&context); } }; - let return_code = check_args(argvector, 3) - .or_else(|e| { - Err(unsafe { - r::rm::ffi::RedisModule_ReplyWithError.unwrap()( - context.as_ptr(), - e.as_ptr(), - ) - }) - }) - .and_then(|args| { + match check_args(argvector, 3) { + Err(e) => unsafe { + r::rm::ffi::RedisModule_ReplyWithError.unwrap()( + context.as_ptr(), + e.as_ptr(), + ) + }, + Ok(args) => { match get_dbkey_from_name(context.as_ptr(), args[1]) { - Err(key_type) => Err(reply_with_error_from_key_type( + Err(key_type) => reply_with_error_from_key_type( context.as_ptr(), key_type, - )), + ), Ok(dbkey) => { + let dbkey = ManuallyDrop::new(dbkey); let db = dbkey.loop_data.get_db(); - let (result, context) = { - let redis_context = - dbkey.loop_data.set_rc(context); - let result = do_query(&db, args[2]); - let context = redis_context.release(); - (result, context) + let _rc = + dbkey.loop_data.set_rc(Context::no_client()); + let result = do_query(&db, args[2]); + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + let mut result = match result { + Ok(r) => r.create_data_to_return( + &context, + &ReturnMethod::Reply, + t, + ), + Err(r) => r.create_data_to_return( + &context, + &ReturnMethod::Reply, + t, + ), }; - mem::forget(dbkey); - match result { - Ok(res) => Ok(res.reply(&context)), - Err(e) => Err(e.reply(&context)), - } + result.reply(&context) } } - }); - unwrap_return_code(return_code) + } + } } #[allow(non_snake_case)] @@ -256,64 +261,51 @@ pub extern "C" fn QueryNowInto( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { return error.reply(&context); } }; - let return_code = check_args(argvector, 4) - .or_else(|e| { - Err(unsafe { - r::rm::ffi::RedisModule_ReplyWithError.unwrap()( - context.as_ptr(), - e.as_ptr(), - ) - }) - }) - .and_then(|args| { + match check_args(argvector, 4) { + Err(e) => unsafe { + r::rm::ffi::RedisModule_ReplyWithError.unwrap()( + context.as_ptr(), + e.as_ptr(), + ) + }, + Ok(args) => { match get_dbkey_from_name(context.as_ptr(), args[2]) { - Err(key_type) => Err(reply_with_error_from_key_type( + Err(key_type) => reply_with_error_from_key_type( context.as_ptr(), key_type, - )), + ), Ok(dbkey) => { + let dbkey = ManuallyDrop::new(dbkey); let db = dbkey.loop_data.get_db(); - let (result, context) = { - let redis_context = - dbkey.loop_data.set_rc(context); - let result = do_query(&db, args[3]); - let context = redis_context.release(); - (result, context) + let _rc = + dbkey.loop_data.set_rc(Context::no_client()); + let result = do_query(&db, args[3]); + let return_method = + ReturnMethod::Stream { name: args[1] }; + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + let mut result = match result { + Ok(r) => r.create_data_to_return( + &context, + &return_method, + t, + ), + Err(r) => r.create_data_to_return( + &context, + &return_method, + t, + ), }; - mem::forget(dbkey); - match result { - Ok(res @ r::QueryResult::OK {}) => { - Ok(res.reply(&context)) - } - Ok(res @ r::QueryResult::DONE { .. }) => { - Ok(res.reply(&context)) - } - - Ok(r::QueryResult::Array { - array: rows, - names, - }) => { - let result = stream_query_result_array( - &context, args[1], &names, rows, - ); - match result { - Ok(result) => { - Ok(result.reply(&context)) - } - Err(e) => Err(e.reply(&context)), - } - } - Err(e) => Err(e.reply(&context)), - } + result.reply(&context) } } - }); - unwrap_return_code(return_code) + } + } } #[allow(non_snake_case)] @@ -325,7 +317,7 @@ pub extern "C" fn ExecStatementNow( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { return error.reply(&context); } }; @@ -344,26 +336,25 @@ pub extern "C" fn ExecStatementNow( key_type, ), Ok(dbkey) => { - let (result, context) = { - let redis_context = - dbkey.loop_data.set_rc(context); - let result = dbkey - .loop_data - .get_replication_book() - .exec_statement( - argvector[2], - &argvector[3..], - ); - let context = redis_context.release(); - (result, context) - }; - mem::forget(dbkey); + let dbkey = ManuallyDrop::new(dbkey); + // _rc must be + // 1. Define befor the call to exec_statement() and .reply(&context) + // 2. Dropped before we forget the `dbkey` + let _rc = + dbkey.loop_data.set_rc(Context::no_client()); + let result = dbkey + .loop_data + .get_replication_book() + .exec_statement( + argvector[2], + &argvector[3..], + ); match result { - Ok(res) => { + Ok(mut res) => { ReplicateVerbatim(&context); res.reply(&context) } - Err(err) => err.reply(&context), + Err(mut err) => err.reply(&context), } } } @@ -380,43 +371,43 @@ pub extern "C" fn CreateStatementNow( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { return error.reply(&context); } }; - let return_code = check_args(argvector, 4) - .or_else(|e| { - Err(unsafe { - r::rm::ffi::RedisModule_ReplyWithError.unwrap()( - context.as_ptr(), - e.as_ptr(), - ) - }) - }) - .and_then(|args| { + match check_args(argvector, 4) { + Err(e) => unsafe { + r::rm::ffi::RedisModule_ReplyWithError.unwrap()( + context.as_ptr(), + e.as_ptr(), + ) + }, + Ok(args) => { match get_dbkey_from_name(context.as_ptr(), args[1]) { - Err(key_type) => Err(reply_with_error_from_key_type( + Err(key_type) => reply_with_error_from_key_type( context.as_ptr(), key_type, - )), + ), Ok(dbkey) => { + let dbkey = ManuallyDrop::new(dbkey); + let _rc = + dbkey.loop_data.set_rc(Context::no_client()); let result = dbkey .loop_data .get_replication_book() .insert_new_statement(args[2], args[3]); - mem::forget(dbkey); match result { - Ok(res) => { + Ok(mut res) => { ReplicateVerbatim(&context); - Ok(res.reply(&context)) + res.reply(&context) } - Err(e) => Err(e.reply(&context)), + Err(mut e) => e.reply(&context), } } } - }); - unwrap_return_code(return_code) + } + } } #[allow(non_snake_case)] @@ -428,43 +419,44 @@ pub extern "C" fn UpdateStatementNow( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { return error.reply(&context); } }; - let return_code = check_args(argvector, 4) - .or_else(|e| { - Err(unsafe { - r::rm::ffi::RedisModule_ReplyWithError.unwrap()( - context.as_ptr(), - e.as_ptr(), - ) - }) - }) - .and_then(|args| { + match check_args(argvector, 4) { + Err(e) => unsafe { + r::rm::ffi::RedisModule_ReplyWithError.unwrap()( + context.as_ptr(), + e.as_ptr(), + ) + }, + + Ok(args) => { match get_dbkey_from_name(context.as_ptr(), args[1]) { - Err(key_type) => Err(reply_with_error_from_key_type( + Err(key_type) => reply_with_error_from_key_type( context.as_ptr(), key_type, - )), + ), Ok(dbkey) => { + let dbkey = ManuallyDrop::new(dbkey); + let _rc = + dbkey.loop_data.set_rc(Context::no_client()); let result = dbkey .loop_data .get_replication_book() .update_statement(args[2], args[3]); - mem::forget(dbkey); match result { - Ok(res) => { + Ok(mut res) => { ReplicateVerbatim(&context); - Ok(res.reply(&context)) + res.reply(&context) } - Err(e) => Err(e.reply(&context)), + Err(mut e) => e.reply(&context), } } } - }); - unwrap_return_code(return_code) + } + } } #[allow(non_snake_case)] @@ -476,43 +468,44 @@ pub extern "C" fn DeleteStatementNow( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { return error.reply(&context); } }; - let return_code = check_args(argvector, 3) - .or_else(|e| { - Err(unsafe { - r::rm::ffi::RedisModule_ReplyWithError.unwrap()( - context.as_ptr(), - e.as_ptr(), - ) - }) - }) - .and_then(|args| { + match check_args(argvector, 3) { + Err(e) => unsafe { + r::rm::ffi::RedisModule_ReplyWithError.unwrap()( + context.as_ptr(), + e.as_ptr(), + ) + }, + + Ok(args) => { match get_dbkey_from_name(context.as_ptr(), args[1]) { - Err(key_type) => Err(reply_with_error_from_key_type( + Err(key_type) => reply_with_error_from_key_type( context.as_ptr(), key_type, - )), + ), Ok(dbkey) => { + let dbkey = ManuallyDrop::new(dbkey); + let _rc = + dbkey.loop_data.set_rc(Context::no_client()); let result = dbkey .loop_data .get_replication_book() .delete_statement(args[2]); - mem::forget(dbkey); match result { - Ok(res) => { + Ok(mut res) => { ReplicateVerbatim(&context); - Ok(res.reply(&context)) + res.reply(&context) } - Err(e) => Err(e.reply(&context)), + Err(mut e) => e.reply(&context), } } } - }); - unwrap_return_code(return_code) + } + } } #[allow(non_snake_case)] @@ -524,7 +517,7 @@ pub extern "C" fn QueryStatementNow( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { return error.reply(&context); } }; @@ -543,26 +536,22 @@ pub extern "C" fn QueryStatementNow( key_type, ), Ok(dbkey) => { - let (result, context) = { - let redis_context = - &dbkey.loop_data.set_rc(context); - let result = dbkey - .loop_data - .get_replication_book() - .query_statement( - argvector[2], - &argvector[3..], - ); - let context = redis_context.release(); - (result, context) - }; - mem::forget(dbkey); + let dbkey = ManuallyDrop::new(dbkey); + let _rc = + dbkey.loop_data.set_rc(Context::no_client()); + let result = dbkey + .loop_data + .get_replication_book() + .query_statement( + argvector[2], + &argvector[3..], + ); match result { - Ok(res) => { + Ok(mut res) => { ReplicateVerbatim(&context); res.reply(&context) } - Err(err) => err.reply(&context), + Err(mut err) => err.reply(&context), } } } @@ -579,66 +568,52 @@ pub extern "C" fn QueryStatementNowInto( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { return error.reply(&context); } }; - let return_code = check_args(argvector, 4) - .or_else(|e| { - Err(unsafe { - r::rm::ffi::RedisModule_ReplyWithError.unwrap()( - context.as_ptr(), - e.as_ptr(), - ) - }) - }) - .and_then(|args| { + match check_args(argvector, 4) { + Err(e) => unsafe { + r::rm::ffi::RedisModule_ReplyWithError.unwrap()( + context.as_ptr(), + e.as_ptr(), + ) + }, + Ok(args) => { match get_dbkey_from_name(context.as_ptr(), args[2]) { - Err(key_type) => Err(reply_with_error_from_key_type( + Err(key_type) => reply_with_error_from_key_type( context.as_ptr(), key_type, - )), + ), Ok(dbkey) => { - let (result, context) = { - let redis_context = - dbkey.loop_data.set_rc(context); - let result = dbkey - .loop_data - .get_replication_book() - .query_statement(args[3], &args[4..]); - let context = redis_context.release(); - (result, context) - }; - mem::forget(dbkey); + let dbkey = ManuallyDrop::new(dbkey); + let _rc = + dbkey.loop_data.set_rc(Context::no_client()); + let result = dbkey + .loop_data + .get_replication_book() + .query_statement(args[3], &args[4..]); + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); match result { - Ok(res @ r::QueryResult::OK {}) => { - Ok(res.reply(&context)) - } - Ok(res @ r::QueryResult::DONE { .. }) => { - Ok(res.reply(&context)) - } - - Ok(r::QueryResult::Array { - array: rows, - names, - }) => { - let result = stream_query_result_array( - &context, args[1], &names, rows, - ); - match result { - Ok(result) => { - Ok(result.reply(&context)) - } - Err(e) => Err(e.reply(&context)), - } + Ok(result) => { + let mut to_return = result + .create_data_to_return( + &context, + &ReturnMethod::Stream { + name: args[1], + }, + t, + ); + to_return.reply(&context) } - Err(e) => Err(e.reply(&context)), + Err(mut err) => err.reply(&context), } } } - }); - unwrap_return_code(return_code) + } + } } #[allow(non_snake_case)] @@ -650,21 +625,19 @@ pub extern "C" fn MakeCopyNow( let context = r::rm::Context::new(ctx); let argvector = match r::create_argument(argv, argc) { Ok(argvector) => argvector, - Err(error) => { + Err(mut error) => { return error.reply(&context); } }; - let return_code = check_args(argvector, 3) - .or_else(|e| { - Err(unsafe { - r::rm::ffi::RedisModule_ReplyWithError.unwrap()( - context.as_ptr(), - e.as_ptr(), - ) - }) - }) - .and_then(|argvector| { + match check_args(argvector, 3) { + Err(e) => unsafe { + r::rm::ffi::RedisModule_ReplyWithError.unwrap()( + context.as_ptr(), + e.as_ptr(), + ) + }, + Ok(argvector) => { let source_db = get_dbkey_from_name(context.as_ptr(), argvector[1]); if source_db.is_err() { @@ -672,14 +645,15 @@ pub extern "C" fn MakeCopyNow( "Error in opening the SOURCE database", ) .unwrap(); - return Err(unsafe { + return unsafe { r::rm::ffi::RedisModule_ReplyWithError.unwrap()( context.as_ptr(), error.as_ptr(), ) - }); + }; } let source_db = source_db.unwrap(); + let source_db = ManuallyDrop::new(source_db); let dest_db = get_dbkey_from_name(context.as_ptr(), argvector[2]); @@ -688,33 +662,43 @@ pub extern "C" fn MakeCopyNow( "Error in opening the DESTINATION database", ) .unwrap(); - return Err(unsafe { + return unsafe { r::rm::ffi::RedisModule_ReplyWithError.unwrap()( context.as_ptr(), error.as_ptr(), ) - }); + }; } let dest_db = dest_db.unwrap(); - - let result = { - let dest_loopdata = &dest_db.loop_data; - let source_loopdata = &source_db.loop_data; - r::do_copy(&source_loopdata.get_db(), dest_loopdata) - }; - mem::forget(source_db); - mem::forget(dest_db); - match result { - Ok(res) => { + let dest_db = ManuallyDrop::new(dest_db); + + let dest_loopdata = &dest_db.loop_data; + let source_loopdata = &source_db.loop_data; + let t = std::time::Instant::now() + + std::time::Duration::from_secs(10); + let mut result = match r::do_copy( + &source_loopdata.get_db(), + dest_loopdata, + ) { + Ok(r) => { ReplicateVerbatim(&context); - Ok(res.reply(&context)) + r.create_data_to_return( + &context, + &ReturnMethod::Reply, + t, + ) } - Err(e) => Err(e.reply(&context)), - } - }); + Err(r) => r.create_data_to_return( + &context, + &ReturnMethod::Reply, + t, + ), + }; - unwrap_return_code(return_code) + result.reply(&context) + } + } } #[allow(non_snake_case)] diff --git a/test/correctness/test.py b/test/correctness/test.py index 8c0299e..d2103dc 100755 --- a/test/correctness/test.py +++ b/test/correctness/test.py @@ -402,6 +402,7 @@ def test_not_insert(self): result = self.exec_naked("REDISQL.QUERY", "B", "SELECT * FROM test ORDER BY a ASC") self.assertEquals(result, [[1, 'ciao'], [2, 'foo'], [100, 'baz']]) +###@unittest.skip("Testing without virtual tables") class TestBruteHash(TestRediSQLWithExec): def testSimple(self): with DB(self, "B"): @@ -508,6 +509,7 @@ def test_statement_after_rdb_load(self): self.assertTrue([4L, "cat:4", "4"] in result) +###@unittest.skip("Testing without virtual tables") class TestBruteHashSyncronous(TestRediSQLWithExec): def testSimpleNow(self): with DB(self, "B"):