Skip to content

Commit

Permalink
Implement register_for_sync_manager() for places (mozilla#4617)
Browse files Browse the repository at this point in the history
- Changed the bookmarks/history sync engines to use an
  Arc<Mutex<PlacesDb>>, which matches the other engines better
- Made these sync engines create their own `SqlInterruptScope`.  The
  interrupt code is still not working, but I think this is the direction
  we want to go for mozilla#1684.  If not, it should be easy to replace.
- Removed the old `PlacesApi.open_sync_connection()` method and replaced it
  with the new `get_sync_connection()` method
- Lots of unit test updates.  I didn't take the time to understand these
  fully.  I just made changes until it compiled, didn't deadlock, and passed.
  • Loading branch information
bendk committed Nov 8, 2021
1 parent a7bcd2c commit f01a0f6
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 211 deletions.
201 changes: 120 additions & 81 deletions components/places/src/api/places_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use crate::bookmark_sync::engine::BookmarksEngine;
use crate::bookmark_sync::BookmarksSyncEngine;
use crate::db::db::PlacesDb;
use crate::error::*;
use crate::history_sync::engine::HistoryEngine;
use crate::history_sync::HistorySyncEngine;
use crate::storage::{
self, bookmarks::bookmark_sync, delete_meta, get_meta, history::history_sync, put_meta,
};
Expand All @@ -18,10 +18,10 @@ use std::collections::HashMap;
use std::mem;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
atomic::{AtomicUsize, Ordering},
Arc, Mutex, Weak,
};
use sync15::{sync_multiple, telemetry, MemoryCachedState, SyncResult};
use sync15::{sync_multiple, telemetry, MemoryCachedState, SyncEngine, SyncResult};

// Not clear if this should be here, but this is the "global sync state"
// which is persisted to disk and reused for all engines.
Expand All @@ -30,6 +30,37 @@ use sync15::{sync_multiple, telemetry, MemoryCachedState, SyncResult};
// per collection.
pub const GLOBAL_STATE_META_KEY: &str = "global_sync_state_v2";

// Our "sync manager" will use whatever is stashed here.
lazy_static::lazy_static! {
// Mutex: just taken long enough to update the contents - needed to wrap
// the Weak as it isn't `Sync`
// [Arc/Weak]: Stores the places api used to create the connection for
// BookmarksSyncEngine/HistorySyncEngine
static ref PLACES_API_FOR_SYNC_MANAGER: Mutex<Weak<PlacesApi>> = Mutex::new(Weak::new());
}

// Called by the sync manager to get a sync engine via the PlacesApi previously
// registered with the sync manager.
pub fn get_registered_sync_engine(name: &str) -> Option<Box<dyn SyncEngine>> {
match PLACES_API_FOR_SYNC_MANAGER.lock().unwrap().upgrade() {
None => {
log::error!("get_registered_sync_engine: no PlacesApi registered");
None
}
Some(places_api) => match places_api.get_sync_connection() {
Ok(db) => match name {
"bookmarks" => Some(Box::new(BookmarksSyncEngine::new(db))),
"history" => Some(Box::new(HistorySyncEngine::new(db))),
_ => unreachable!("can't provide unknown engine: {}", name),
},
Err(e) => {
log::error!("get_registered_sync_engine: {}", e);
None
}
},
}
}

#[repr(u8)]
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum ConnectionType {
Expand Down Expand Up @@ -82,10 +113,18 @@ pub struct PlacesApi {
write_connection: Mutex<Option<PlacesDb>>,
sync_state: Mutex<Option<SyncState>>,
coop_tx_lock: Arc<Mutex<()>>,
sync_conn_active: AtomicBool,
sync_interrupt_counter: Arc<AtomicUsize>,
// Used for get_sync_connection()
// - The inner mutux synchronizes sync operation (for example one of the [SyncEngine] methods).
// This avoids issues like #867
// - The weak facilitates connection sharing. When `get_sync_connection()` returns an Arc, we
// keep a weak reference to it. If the Arc is still alive when `get_sync_connection()` is
// called again, we reuse it.
// - The outer mutex synchronizes the `get_sync_connection()` operation. If multiple threads
// ran that at the same time there would be issues.
sync_connection: Mutex<Weak<Mutex<PlacesDb>>>,
id: usize,
}

impl PlacesApi {
/// Create a new, or fetch an already open, PlacesApi backed by a file on disk.
pub fn new(db_name: impl AsRef<Path>) -> Result<Arc<Self>> {
Expand Down Expand Up @@ -121,8 +160,7 @@ impl PlacesApi {
db_name: db_name.clone(),
write_connection: Mutex::new(Some(connection)),
sync_state: Mutex::new(None),
sync_conn_active: AtomicBool::new(false),
sync_interrupt_counter: Arc::new(AtomicUsize::new(0)),
sync_connection: Mutex::new(Weak::new()),
id,
coop_tx_lock,
};
Expand Down Expand Up @@ -159,25 +197,37 @@ impl PlacesApi {
}
}
ConnectionType::Sync => {
panic!("Use `open_sync_connection` to open a sync connection");
panic!("Use `get_sync_connection` to open a sync connection");
}
}
}

pub fn open_sync_connection(&self) -> Result<SyncConn<'_>> {
self.sync_conn_active
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.map_err(|_| ErrorKind::ConnectionAlreadyOpen)?;
let db = PlacesDb::open(
self.db_name.clone(),
ConnectionType::Sync,
self.id,
self.coop_tx_lock.clone(),
)?;
Ok(SyncConn {
db,
flag: &self.sync_conn_active,
})
// Get a database connection to sync with
//
// This function provides a couple features to facilitate sharing the connection between
// different sync engines:
// - Each connection is wrapped in a `Mutex<>` to synchronize access.
// - The mutex is then wrapped in an Arc<>. If the last Arc<> returned is still alive, then
// get_sync_connection() will reuse it.
pub fn get_sync_connection(&self) -> Result<Arc<Mutex<PlacesDb>>> {
// First step: lock the outer mutex
let mut conn = self.sync_connection.lock().unwrap();
match conn.upgrade() {
// If our Weak is still alive, then re-use that
Some(db) => Ok(db),
// If not, create a new connection
None => {
let db = Arc::new(Mutex::new(PlacesDb::open(
self.db_name.clone(),
ConnectionType::Sync,
self.id,
self.coop_tx_lock.clone(),
)?));
// Store a weakref for next time
*conn = Arc::downgrade(&db);
Ok(db)
}
}
}

/// Close a connection to the database. If the connection is the write
Expand Down Expand Up @@ -206,6 +256,15 @@ impl PlacesApi {
}
}

// This allows the embedding app to say "make this instance available to
// the sync manager". The implementation is more like "offer to sync mgr"
// (thereby avoiding us needing to link with the sync manager) but
// `register_with_sync_manager()` is logically what's happening so that's
// the name it gets.
pub fn register_with_sync_manager(self: Arc<Self>) {
*PLACES_API_FOR_SYNC_MANAGER.lock().unwrap() = Arc::downgrade(&self);
}

// NOTE: These should be deprecated as soon as possible - that will be once
// all consumers have been updated to use the .sync() method below, and/or
// we have implemented the sync manager and migrated consumers to that.
Expand All @@ -217,15 +276,14 @@ impl PlacesApi {
self.do_sync_one(
"history",
move |conn, mem_cached_state, disk_cached_state| {
let interruptee = self.begin_sync_interrupt_scope();
let engine = HistoryEngine::new(conn, &interruptee);
let engine = HistorySyncEngine::new(conn);
sync_multiple(
&[&engine],
disk_cached_state,
mem_cached_state,
client_init,
key_bundle,
&interruptee,
&engine.scope,
None,
)
},
Expand All @@ -240,15 +298,14 @@ impl PlacesApi {
self.do_sync_one(
"bookmarks",
move |conn, mem_cached_state, disk_cached_state| {
let interruptee = self.begin_sync_interrupt_scope();
let engine = BookmarksEngine::new(conn, &interruptee);
let engine = BookmarksSyncEngine::new(conn);
sync_multiple(
&[&engine],
disk_cached_state,
mem_cached_state,
client_init,
key_bundle,
&interruptee,
&engine.scope,
None,
)
},
Expand All @@ -261,25 +318,25 @@ impl PlacesApi {
syncer: F,
) -> Result<telemetry::SyncTelemetryPing>
where
F: FnOnce(&SyncConn<'_>, &mut MemoryCachedState, &mut Option<String>) -> SyncResult,
F: FnOnce(Arc<Mutex<PlacesDb>>, &mut MemoryCachedState, &mut Option<String>) -> SyncResult,
{
let mut guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
let conn = self.get_sync_connection()?;
if guard.is_none() {
*guard = Some(SyncState {
mem_cached_state: Cell::default(),
disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn)?),
disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn.lock().unwrap())?),
});
}

let sync_state = guard.as_ref().unwrap();

let mut mem_cached_state = sync_state.mem_cached_state.take();
let mut disk_cached_state = sync_state.disk_cached_state.take();
let mut result = syncer(&conn, &mut mem_cached_state, &mut disk_cached_state);
let mut result = syncer(conn.clone(), &mut mem_cached_state, &mut disk_cached_state);
// even on failure we set the persisted state - sync itself takes care
// to ensure this has been None'd out if necessary.
self.set_disk_persisted_state(&conn, &disk_cached_state)?;
self.set_disk_persisted_state(&conn.lock().unwrap(), &disk_cached_state)?;
sync_state.mem_cached_state.replace(mem_cached_state);
sync_state.disk_cached_state.replace(disk_cached_state);

Expand Down Expand Up @@ -307,19 +364,18 @@ impl PlacesApi {
key_bundle: &sync15::KeyBundle,
) -> Result<SyncResult> {
let mut guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
let conn = self.get_sync_connection()?;
if guard.is_none() {
*guard = Some(SyncState {
mem_cached_state: Cell::default(),
disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn)?),
disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn.lock().unwrap())?),
});
}

let sync_state = guard.as_ref().unwrap();

let interruptee = self.begin_sync_interrupt_scope();
let bm_engine = BookmarksEngine::new(&conn, &interruptee);
let history_engine = HistoryEngine::new(&conn, &interruptee);
let bm_engine = BookmarksSyncEngine::new(conn.clone());
let history_engine = HistorySyncEngine::new(conn.clone());
let mut mem_cached_state = sync_state.mem_cached_state.take();
let mut disk_cached_state = sync_state.disk_cached_state.take();

Expand All @@ -330,12 +386,12 @@ impl PlacesApi {
&mut mem_cached_state,
client_init,
key_bundle,
&interruptee,
&bm_engine.scope,
None,
);
// even on failure we set the persisted state - sync itself takes care
// to ensure this has been None'd out if necessary.
if let Err(e) = self.set_disk_persisted_state(&conn, &disk_cached_state) {
if let Err(e) = self.set_disk_persisted_state(&conn.lock().unwrap(), &disk_cached_state) {
log::error!("Failed to persist the sync state: {:?}", e);
}
sync_state.mem_cached_state.replace(mem_cached_state);
Expand All @@ -347,50 +403,52 @@ impl PlacesApi {
pub fn wipe_bookmarks(&self) -> Result<()> {
// Take the lock to prevent syncing while we're doing this.
let _guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
let conn = self.get_sync_connection()?;

storage::bookmarks::delete_everything(&conn)?;
storage::bookmarks::delete_everything(&conn.lock().unwrap())?;
Ok(())
}

pub fn reset_bookmarks(&self) -> Result<()> {
// Take the lock to prevent syncing while we're doing this.
let _guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
let conn = self.get_sync_connection()?;

bookmark_sync::reset(&conn, &sync15::EngineSyncAssociation::Disconnected)?;
bookmark_sync::reset(
&conn.lock().unwrap(),
&sync15::EngineSyncAssociation::Disconnected,
)?;
Ok(())
}

pub fn wipe_history(&self) -> Result<()> {
// Take the lock to prevent syncing while we're doing this.
let _guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
let conn = self.get_sync_connection()?;

storage::history::delete_everything(&conn)?;
storage::history::delete_everything(&conn.lock().unwrap())?;
Ok(())
}

pub fn reset_history(&self) -> Result<()> {
// Take the lock to prevent syncing while we're doing this.
let _guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
let conn = self.get_sync_connection()?;

history_sync::reset(&conn, &sync15::EngineSyncAssociation::Disconnected)?;
history_sync::reset(
&conn.lock().unwrap(),
&sync15::EngineSyncAssociation::Disconnected,
)?;
Ok(())
}

/// Create a new SqlInterruptScope for syncing
/// Create a new SqlInterruptScope for the syncing code
///
/// Call this at the begining of a sync operation. Then if interrupt_sync() is called, we will
/// interrupt the current DB query and return `InterruptedError`
pub fn begin_sync_interrupt_scope(&self) -> SqlInterruptScope {
SqlInterruptScope::new(self.sync_interrupt_counter.clone())
}

/// Interrupt the current sync operation if one is in progress
pub fn interrupt_sync(&self) {
// TODO: update the sync connection code so we can implement this method. This will fix #1684.
/// The syncing code expects a SqlInterruptScope, but it's never been actually hooked up to
/// anything. This method returns something to make the compiler happy, but we should replace
/// this with working code as part of #1684.
pub fn dummy_sync_interrupt_scope(&self) -> SqlInterruptScope {
SqlInterruptScope::new(Arc::new(AtomicUsize::new(0)))
}

// Deprecated/Broken interrupt handler method, let's try to replace it with the above methods
Expand All @@ -405,28 +463,9 @@ impl PlacesApi {
// Probably not necessary to lock here, since this should only get
// called in startup.
let _guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
Ok(conn.new_interrupt_handle())
}
}

/// Wrapper around PlacesDb that automatically sets a flag (`sync_conn_active`)
/// to false when finished
pub struct SyncConn<'api> {
db: PlacesDb,
flag: &'api AtomicBool,
}

impl<'a> Drop for SyncConn<'a> {
fn drop(&mut self) {
self.flag.store(false, Ordering::SeqCst)
}
}

impl<'a> std::ops::Deref for SyncConn<'a> {
type Target = PlacesDb;
fn deref(&self) -> &PlacesDb {
&self.db
let conn = self.get_sync_connection()?;
let db = conn.lock().unwrap();
Ok(db.new_interrupt_handle())
}
}

Expand Down

0 comments on commit f01a0f6

Please sign in to comment.