diff --git a/sqlx-core/src/sqlite/connection/describe.rs b/sqlx-core/src/sqlite/connection/describe.rs index 8bc9f9ceed..cb86e7e024 100644 --- a/sqlx-core/src/sqlite/connection/describe.rs +++ b/sqlx-core/src/sqlite/connection/describe.rs @@ -64,7 +64,7 @@ pub(super) fn describe<'c: 'e, 'q: 'e, 'e>( // fallback to [column_decltype] if !stepped && stmt.read_only() { stepped = true; - let _ = conn.worker.step(*stmt).await; + let _ = conn.worker.step(stmt).await; } let mut ty = stmt.column_type_info(col); diff --git a/sqlx-core/src/sqlite/connection/executor.rs b/sqlx-core/src/sqlite/connection/executor.rs index 6e09fdc27a..45c4768b55 100644 --- a/sqlx-core/src/sqlite/connection/executor.rs +++ b/sqlx-core/src/sqlite/connection/executor.rs @@ -62,12 +62,14 @@ fn bind( /// A structure holding sqlite statement handle and resetting the /// statement when it is dropped. struct StatementResetter { - handle: StatementHandle, + handle: Arc, } impl StatementResetter { - fn new(handle: StatementHandle) -> Self { - Self { handle } + fn new(handle: &Arc) -> Self { + Self { + handle: Arc::clone(handle), + } } } @@ -113,7 +115,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { // is dropped. `StatementResetter` will reliably reset the // statement even if the stream returned from `fetch_many` // is dropped early. - let _resetter = StatementResetter::new(*stmt); + let _resetter = StatementResetter::new(stmt); // bind values to the statement num_arguments += bind(stmt, &arguments, num_arguments)?; @@ -125,7 +127,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { // invoke [sqlite3_step] on the dedicated worker thread // this will move us forward one row or finish the statement - let s = worker.step(*stmt).await?; + let s = worker.step(stmt).await?; match s { Either::Left(changes) => { @@ -145,7 +147,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { Either::Right(()) => { let (row, weak_values_ref) = SqliteRow::current( - *stmt, + &stmt, columns, column_names ); @@ -205,12 +207,12 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { // invoke [sqlite3_step] on the dedicated worker thread // this will move us forward one row or finish the statement - match worker.step(*stmt).await? { + match worker.step(stmt).await? { Either::Left(_) => (), Either::Right(()) => { let (row, weak_values_ref) = - SqliteRow::current(*stmt, columns, column_names); + SqliteRow::current(stmt, columns, column_names); *last_row_values = Some(weak_values_ref); diff --git a/sqlx-core/src/sqlite/row.rs b/sqlx-core/src/sqlite/row.rs index 9f14ca58f0..f1e15f599d 100644 --- a/sqlx-core/src/sqlite/row.rs +++ b/sqlx-core/src/sqlite/row.rs @@ -23,7 +23,7 @@ pub struct SqliteRow { // IF the user drops the Row before iterating the stream (so // nearly all of our internal stream iterators), the executor moves on; otherwise, // it actually inflates this row with a list of owned sqlite3 values. - pub(crate) statement: StatementHandle, + pub(crate) statement: Arc, pub(crate) values: Arc>, pub(crate) num_values: usize, @@ -48,7 +48,7 @@ impl SqliteRow { // returns a weak reference to an atomic list where the executor should inflate if its going // to increment the statement with [step] pub(crate) fn current( - statement: StatementHandle, + statement: &Arc, columns: &Arc>, column_names: &Arc>, ) -> (Self, Weak>) { @@ -57,7 +57,7 @@ impl SqliteRow { let size = statement.column_count(); let row = Self { - statement, + statement: Arc::clone(statement), values, num_values: size, columns: Arc::clone(columns), diff --git a/sqlx-core/src/sqlite/statement/handle.rs b/sqlx-core/src/sqlite/statement/handle.rs index d1af117a7d..cbdeba030e 100644 --- a/sqlx-core/src/sqlite/statement/handle.rs +++ b/sqlx-core/src/sqlite/statement/handle.rs @@ -13,16 +13,16 @@ use libsqlite3_sys::{ sqlite3_column_count, sqlite3_column_database_name, sqlite3_column_decltype, sqlite3_column_double, sqlite3_column_int, sqlite3_column_int64, sqlite3_column_name, sqlite3_column_origin_name, sqlite3_column_table_name, sqlite3_column_type, - sqlite3_column_value, sqlite3_db_handle, sqlite3_reset, sqlite3_sql, sqlite3_stmt, - sqlite3_stmt_readonly, sqlite3_table_column_metadata, sqlite3_value, SQLITE_OK, - SQLITE_TRANSIENT, SQLITE_UTF8, + sqlite3_column_value, sqlite3_db_handle, sqlite3_finalize, sqlite3_reset, sqlite3_sql, + sqlite3_stmt, sqlite3_stmt_readonly, sqlite3_table_column_metadata, sqlite3_value, + SQLITE_MISUSE, SQLITE_OK, SQLITE_TRANSIENT, SQLITE_UTF8, }; use crate::error::{BoxDynError, Error}; use crate::sqlite::type_info::DataType; use crate::sqlite::{SqliteError, SqliteTypeInfo}; -#[derive(Debug, Copy, Clone)] +#[derive(Debug)] pub(crate) struct StatementHandle(pub(super) NonNull); // access to SQLite3 statement handles are safe to send and share between threads @@ -284,3 +284,20 @@ impl StatementHandle { unsafe { sqlite3_reset(self.0.as_ptr()) }; } } +impl Drop for StatementHandle { + fn drop(&mut self) { + unsafe { + // https://sqlite.org/c3ref/finalize.html + let status = sqlite3_finalize(self.0.as_ptr()); + if status == SQLITE_MISUSE { + // Panic in case of detected misuse of SQLite API. + // + // sqlite3_finalize returns it at least in the + // case of detected double free, i.e. calling + // sqlite3_finalize on already finalized + // statement. + panic!("Detected sqlite3_finalize misuse."); + } + } + } +} diff --git a/sqlx-core/src/sqlite/statement/virtual.rs b/sqlx-core/src/sqlite/statement/virtual.rs index 0063e06508..89dac81376 100644 --- a/sqlx-core/src/sqlite/statement/virtual.rs +++ b/sqlx-core/src/sqlite/statement/virtual.rs @@ -8,8 +8,8 @@ use crate::sqlite::{SqliteColumn, SqliteError, SqliteRow, SqliteValue}; use crate::HashMap; use bytes::{Buf, Bytes}; use libsqlite3_sys::{ - sqlite3, sqlite3_clear_bindings, sqlite3_finalize, sqlite3_prepare_v3, sqlite3_reset, - sqlite3_stmt, SQLITE_MISUSE, SQLITE_OK, SQLITE_PREPARE_PERSISTENT, + sqlite3, sqlite3_clear_bindings, sqlite3_prepare_v3, sqlite3_reset, sqlite3_stmt, SQLITE_OK, + SQLITE_PREPARE_PERSISTENT, }; use smallvec::SmallVec; use std::i32; @@ -31,7 +31,7 @@ pub(crate) struct VirtualStatement { // underlying sqlite handles for each inner statement // a SQL query string in SQLite is broken up into N statements // we use a [`SmallVec`] to optimize for the most likely case of a single statement - pub(crate) handles: SmallVec<[StatementHandle; 1]>, + pub(crate) handles: SmallVec<[Arc; 1]>, // each set of columns pub(crate) columns: SmallVec<[Arc>; 1]>, @@ -126,7 +126,7 @@ impl VirtualStatement { conn: &mut ConnectionHandle, ) -> Result< Option<( - &StatementHandle, + &Arc, &mut Arc>, &Arc>, &mut Option>>, @@ -159,7 +159,7 @@ impl VirtualStatement { column_names.insert(name, i); } - self.handles.push(statement); + self.handles.push(Arc::new(statement)); self.columns.push(Arc::new(columns)); self.column_names.push(Arc::new(column_names)); self.last_row_values.push(None); @@ -198,20 +198,6 @@ impl Drop for VirtualStatement { fn drop(&mut self) { for (i, handle) in self.handles.drain(..).enumerate() { SqliteRow::inflate_if_needed(&handle, &self.columns[i], self.last_row_values[i].take()); - - unsafe { - // https://sqlite.org/c3ref/finalize.html - let status = sqlite3_finalize(handle.0.as_ptr()); - if status == SQLITE_MISUSE { - // Panic in case of detected misuse of SQLite API. - // - // sqlite3_finalize returns it at least in the - // case of detected double free, i.e. calling - // sqlite3_finalize on already finalized - // statement. - panic!("Detected sqlite3_finalize misuse."); - } - } } } } diff --git a/sqlx-core/src/sqlite/statement/worker.rs b/sqlx-core/src/sqlite/statement/worker.rs index 8b1d229978..e6d05a3200 100644 --- a/sqlx-core/src/sqlite/statement/worker.rs +++ b/sqlx-core/src/sqlite/statement/worker.rs @@ -4,6 +4,7 @@ use crossbeam_channel::{unbounded, Sender}; use either::Either; use futures_channel::oneshot; use libsqlite3_sys::{sqlite3_step, SQLITE_DONE, SQLITE_ROW}; +use std::sync::Arc; use std::thread; // Each SQLite connection has a dedicated thread. @@ -18,7 +19,7 @@ pub(crate) struct StatementWorker { enum StatementWorkerCommand { Step { - statement: StatementHandle, + statement: Arc, tx: oneshot::Sender, Error>>, }, } @@ -50,12 +51,15 @@ impl StatementWorker { pub(crate) async fn step( &mut self, - statement: StatementHandle, + statement: &Arc, ) -> Result, Error> { let (tx, rx) = oneshot::channel(); self.tx - .send(StatementWorkerCommand::Step { statement, tx }) + .send(StatementWorkerCommand::Step { + statement: Arc::clone(statement), + tx, + }) .map_err(|_| Error::WorkerCrashed)?; rx.await.map_err(|_| Error::WorkerCrashed)?