Skip to content

Commit

Permalink
Implement register_for_sync_manager() for places (mozilla#4617)
Browse files Browse the repository at this point in the history
- Changed the bookmarks/history sync engines to use an
  Arc<Mutex<PlacesDb>>, which matches the other engines better
- Made these sync engines create their own `SqlInterruptScope`.  The
  interrupt code is still not working, but I think this is the direction
  we want to go for mozilla#1684.  If not, it should be easy to replace.
- Updated the `PlacesApi` syncing code to use the new sync engines.
  One thing to note here is that the SyncManager has its own system of
  managing the global state, so the fact that those methods update it
  outside of the Mutex lock that the `SyncEngine` takes is not a problem.
- Updated the importer code to use the new `get_sync_connection()`
  method.  The only difference here is that the importer code needs to
  lock the mutex, this replaces the old system where
  `open_sync_connection()` would return an error if a previously
  returned connection was still alive.
- Removed the old `PlacesApi.open_sync_connection()` method and replaced it
  with the new `get_sync_connection()` method
- Lots of unit test updates.  I didn't take the time to understand these
  fully.  I just made changes until it compiled, didn't deadlock, and passed.
  • Loading branch information
bendk committed Nov 8, 2021
1 parent a7bcd2c commit 881b235
Show file tree
Hide file tree
Showing 11 changed files with 273 additions and 211 deletions.
201 changes: 120 additions & 81 deletions components/places/src/api/places_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use crate::bookmark_sync::engine::BookmarksEngine;
use crate::bookmark_sync::BookmarksSyncEngine;
use crate::db::db::PlacesDb;
use crate::error::*;
use crate::history_sync::engine::HistoryEngine;
use crate::history_sync::HistorySyncEngine;
use crate::storage::{
self, bookmarks::bookmark_sync, delete_meta, get_meta, history::history_sync, put_meta,
};
Expand All @@ -18,10 +18,10 @@ use std::collections::HashMap;
use std::mem;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
atomic::{AtomicUsize, Ordering},
Arc, Mutex, Weak,
};
use sync15::{sync_multiple, telemetry, MemoryCachedState, SyncResult};
use sync15::{sync_multiple, telemetry, MemoryCachedState, SyncEngine, SyncResult};

// Not clear if this should be here, but this is the "global sync state"
// which is persisted to disk and reused for all engines.
Expand All @@ -30,6 +30,37 @@ use sync15::{sync_multiple, telemetry, MemoryCachedState, SyncResult};
// per collection.
pub const GLOBAL_STATE_META_KEY: &str = "global_sync_state_v2";

// Our "sync manager" will use whatever is stashed here.
lazy_static::lazy_static! {
// Mutex: just taken long enough to update the contents - needed to wrap
// the Weak as it isn't `Sync`
// [Arc/Weak]: Stores the places api used to create the connection for
// BookmarksSyncEngine/HistorySyncEngine
static ref PLACES_API_FOR_SYNC_MANAGER: Mutex<Weak<PlacesApi>> = Mutex::new(Weak::new());
}

// Called by the sync manager to get a sync engine via the PlacesApi previously
// registered with the sync manager.
pub fn get_registered_sync_engine(name: &str) -> Option<Box<dyn SyncEngine>> {
match PLACES_API_FOR_SYNC_MANAGER.lock().unwrap().upgrade() {
None => {
log::error!("get_registered_sync_engine: no PlacesApi registered");
None
}
Some(places_api) => match places_api.get_sync_connection() {
Ok(db) => match name {
"bookmarks" => Some(Box::new(BookmarksSyncEngine::new(db))),
"history" => Some(Box::new(HistorySyncEngine::new(db))),
_ => unreachable!("can't provide unknown engine: {}", name),
},
Err(e) => {
log::error!("get_registered_sync_engine: {}", e);
None
}
},
}
}

#[repr(u8)]
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum ConnectionType {
Expand Down Expand Up @@ -82,10 +113,18 @@ pub struct PlacesApi {
write_connection: Mutex<Option<PlacesDb>>,
sync_state: Mutex<Option<SyncState>>,
coop_tx_lock: Arc<Mutex<()>>,
sync_conn_active: AtomicBool,
sync_interrupt_counter: Arc<AtomicUsize>,
// Used for get_sync_connection()
// - The inner mutux synchronizes sync operation (for example one of the [SyncEngine] methods).
// This avoids issues like #867
// - The weak facilitates connection sharing. When `get_sync_connection()` returns an Arc, we
// keep a weak reference to it. If the Arc is still alive when `get_sync_connection()` is
// called again, we reuse it.
// - The outer mutex synchronizes the `get_sync_connection()` operation. If multiple threads
// ran that at the same time there would be issues.
sync_connection: Mutex<Weak<Mutex<PlacesDb>>>,
id: usize,
}

impl PlacesApi {
/// Create a new, or fetch an already open, PlacesApi backed by a file on disk.
pub fn new(db_name: impl AsRef<Path>) -> Result<Arc<Self>> {
Expand Down Expand Up @@ -121,8 +160,7 @@ impl PlacesApi {
db_name: db_name.clone(),
write_connection: Mutex::new(Some(connection)),
sync_state: Mutex::new(None),
sync_conn_active: AtomicBool::new(false),
sync_interrupt_counter: Arc::new(AtomicUsize::new(0)),
sync_connection: Mutex::new(Weak::new()),
id,
coop_tx_lock,
};
Expand Down Expand Up @@ -159,25 +197,37 @@ impl PlacesApi {
}
}
ConnectionType::Sync => {
panic!("Use `open_sync_connection` to open a sync connection");
panic!("Use `get_sync_connection` to open a sync connection");
}
}
}

pub fn open_sync_connection(&self) -> Result<SyncConn<'_>> {
self.sync_conn_active
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.map_err(|_| ErrorKind::ConnectionAlreadyOpen)?;
let db = PlacesDb::open(
self.db_name.clone(),
ConnectionType::Sync,
self.id,
self.coop_tx_lock.clone(),
)?;
Ok(SyncConn {
db,
flag: &self.sync_conn_active,
})
// Get a database connection to sync with
//
// This function provides a couple features to facilitate sharing the connection between
// different sync engines:
// - Each connection is wrapped in a `Mutex<>` to synchronize access.
// - The mutex is then wrapped in an Arc<>. If the last Arc<> returned is still alive, then
// get_sync_connection() will reuse it.
pub fn get_sync_connection(&self) -> Result<Arc<Mutex<PlacesDb>>> {
// First step: lock the outer mutex
let mut conn = self.sync_connection.lock().unwrap();
match conn.upgrade() {
// If our Weak is still alive, then re-use that
Some(db) => Ok(db),
// If not, create a new connection
None => {
let db = Arc::new(Mutex::new(PlacesDb::open(
self.db_name.clone(),
ConnectionType::Sync,
self.id,
self.coop_tx_lock.clone(),
)?));
// Store a weakref for next time
*conn = Arc::downgrade(&db);
Ok(db)
}
}
}

/// Close a connection to the database. If the connection is the write
Expand Down Expand Up @@ -206,6 +256,15 @@ impl PlacesApi {
}
}

// This allows the embedding app to say "make this instance available to
// the sync manager". The implementation is more like "offer to sync mgr"
// (thereby avoiding us needing to link with the sync manager) but
// `register_with_sync_manager()` is logically what's happening so that's
// the name it gets.
pub fn register_with_sync_manager(self: Arc<Self>) {
*PLACES_API_FOR_SYNC_MANAGER.lock().unwrap() = Arc::downgrade(&self);
}

// NOTE: These should be deprecated as soon as possible - that will be once
// all consumers have been updated to use the .sync() method below, and/or
// we have implemented the sync manager and migrated consumers to that.
Expand All @@ -217,15 +276,14 @@ impl PlacesApi {
self.do_sync_one(
"history",
move |conn, mem_cached_state, disk_cached_state| {
let interruptee = self.begin_sync_interrupt_scope();
let engine = HistoryEngine::new(conn, &interruptee);
let engine = HistorySyncEngine::new(conn);
sync_multiple(
&[&engine],
disk_cached_state,
mem_cached_state,
client_init,
key_bundle,
&interruptee,
&engine.scope,
None,
)
},
Expand All @@ -240,15 +298,14 @@ impl PlacesApi {
self.do_sync_one(
"bookmarks",
move |conn, mem_cached_state, disk_cached_state| {
let interruptee = self.begin_sync_interrupt_scope();
let engine = BookmarksEngine::new(conn, &interruptee);
let engine = BookmarksSyncEngine::new(conn);
sync_multiple(
&[&engine],
disk_cached_state,
mem_cached_state,
client_init,
key_bundle,
&interruptee,
&engine.scope,
None,
)
},
Expand All @@ -261,25 +318,25 @@ impl PlacesApi {
syncer: F,
) -> Result<telemetry::SyncTelemetryPing>
where
F: FnOnce(&SyncConn<'_>, &mut MemoryCachedState, &mut Option<String>) -> SyncResult,
F: FnOnce(Arc<Mutex<PlacesDb>>, &mut MemoryCachedState, &mut Option<String>) -> SyncResult,
{
let mut guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
let conn = self.get_sync_connection()?;
if guard.is_none() {
*guard = Some(SyncState {
mem_cached_state: Cell::default(),
disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn)?),
disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn.lock().unwrap())?),
});
}

let sync_state = guard.as_ref().unwrap();

let mut mem_cached_state = sync_state.mem_cached_state.take();
let mut disk_cached_state = sync_state.disk_cached_state.take();
let mut result = syncer(&conn, &mut mem_cached_state, &mut disk_cached_state);
let mut result = syncer(conn.clone(), &mut mem_cached_state, &mut disk_cached_state);
// even on failure we set the persisted state - sync itself takes care
// to ensure this has been None'd out if necessary.
self.set_disk_persisted_state(&conn, &disk_cached_state)?;
self.set_disk_persisted_state(&conn.lock().unwrap(), &disk_cached_state)?;
sync_state.mem_cached_state.replace(mem_cached_state);
sync_state.disk_cached_state.replace(disk_cached_state);

Expand Down Expand Up @@ -307,19 +364,18 @@ impl PlacesApi {
key_bundle: &sync15::KeyBundle,
) -> Result<SyncResult> {
let mut guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
let conn = self.get_sync_connection()?;
if guard.is_none() {
*guard = Some(SyncState {
mem_cached_state: Cell::default(),
disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn)?),
disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn.lock().unwrap())?),
});
}

let sync_state = guard.as_ref().unwrap();

let interruptee = self.begin_sync_interrupt_scope();
let bm_engine = BookmarksEngine::new(&conn, &interruptee);
let history_engine = HistoryEngine::new(&conn, &interruptee);
let bm_engine = BookmarksSyncEngine::new(conn.clone());
let history_engine = HistorySyncEngine::new(conn.clone());
let mut mem_cached_state = sync_state.mem_cached_state.take();
let mut disk_cached_state = sync_state.disk_cached_state.take();

Expand All @@ -330,12 +386,12 @@ impl PlacesApi {
&mut mem_cached_state,
client_init,
key_bundle,
&interruptee,
&bm_engine.scope,
None,
);
// even on failure we set the persisted state - sync itself takes care
// to ensure this has been None'd out if necessary.
if let Err(e) = self.set_disk_persisted_state(&conn, &disk_cached_state) {
if let Err(e) = self.set_disk_persisted_state(&conn.lock().unwrap(), &disk_cached_state) {
log::error!("Failed to persist the sync state: {:?}", e);
}
sync_state.mem_cached_state.replace(mem_cached_state);
Expand All @@ -347,50 +403,52 @@ impl PlacesApi {
pub fn wipe_bookmarks(&self) -> Result<()> {
// Take the lock to prevent syncing while we're doing this.
let _guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
let conn = self.get_sync_connection()?;

storage::bookmarks::delete_everything(&conn)?;
storage::bookmarks::delete_everything(&conn.lock().unwrap())?;
Ok(())
}

pub fn reset_bookmarks(&self) -> Result<()> {
// Take the lock to prevent syncing while we're doing this.
let _guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
let conn = self.get_sync_connection()?;

bookmark_sync::reset(&conn, &sync15::EngineSyncAssociation::Disconnected)?;
bookmark_sync::reset(
&conn.lock().unwrap(),
&sync15::EngineSyncAssociation::Disconnected,
)?;
Ok(())
}

pub fn wipe_history(&self) -> Result<()> {
// Take the lock to prevent syncing while we're doing this.
let _guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
let conn = self.get_sync_connection()?;

storage::history::delete_everything(&conn)?;
storage::history::delete_everything(&conn.lock().unwrap())?;
Ok(())
}

pub fn reset_history(&self) -> Result<()> {
// Take the lock to prevent syncing while we're doing this.
let _guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
let conn = self.get_sync_connection()?;

history_sync::reset(&conn, &sync15::EngineSyncAssociation::Disconnected)?;
history_sync::reset(
&conn.lock().unwrap(),
&sync15::EngineSyncAssociation::Disconnected,
)?;
Ok(())
}

/// Create a new SqlInterruptScope for syncing
/// Create a new SqlInterruptScope for the syncing code
///
/// Call this at the begining of a sync operation. Then if interrupt_sync() is called, we will
/// interrupt the current DB query and return `InterruptedError`
pub fn begin_sync_interrupt_scope(&self) -> SqlInterruptScope {
SqlInterruptScope::new(self.sync_interrupt_counter.clone())
}

/// Interrupt the current sync operation if one is in progress
pub fn interrupt_sync(&self) {
// TODO: update the sync connection code so we can implement this method. This will fix #1684.
/// The syncing code expects a SqlInterruptScope, but it's never been actually hooked up to
/// anything. This method returns something to make the compiler happy, but we should replace
/// this with working code as part of #1684.
pub fn dummy_sync_interrupt_scope(&self) -> SqlInterruptScope {
SqlInterruptScope::new(Arc::new(AtomicUsize::new(0)))
}

// Deprecated/Broken interrupt handler method, let's try to replace it with the above methods
Expand All @@ -405,28 +463,9 @@ impl PlacesApi {
// Probably not necessary to lock here, since this should only get
// called in startup.
let _guard = self.sync_state.lock().unwrap();
let conn = self.open_sync_connection()?;
Ok(conn.new_interrupt_handle())
}
}

/// Wrapper around PlacesDb that automatically sets a flag (`sync_conn_active`)
/// to false when finished
pub struct SyncConn<'api> {
db: PlacesDb,
flag: &'api AtomicBool,
}

impl<'a> Drop for SyncConn<'a> {
fn drop(&mut self) {
self.flag.store(false, Ordering::SeqCst)
}
}

impl<'a> std::ops::Deref for SyncConn<'a> {
type Target = PlacesDb;
fn deref(&self) -> &PlacesDb {
&self.db
let conn = self.get_sync_connection()?;
let db = conn.lock().unwrap();
Ok(db.new_interrupt_handle())
}
}

Expand Down

0 comments on commit 881b235

Please sign in to comment.