Skip to content

Commit 2440a13

Browse files
committed
Add list metadata repository concurrency helpers
Add helper methods for atomic state transitions during sync operations: - `begin_refresh()`: Atomically increment version, set state to FetchingFirstPage, and return info needed for the fetch - `begin_fetch_next_page()`: Check pagination state, set state to FetchingNextPage, and return page number and version for stale check - `complete_sync()`: Set state to Idle on success - `complete_sync_with_error()`: Set state to Error with message These helpers ensure correct state transitions and enable version-based concurrency control to detect when a refresh invalidates an in-flight load-more operation. Also adds `RefreshInfo` and `FetchNextPageInfo` structs to encapsulate the data returned from begin operations.
1 parent e47cec8 commit 2440a13

File tree

1 file changed

+285
-0
lines changed

1 file changed

+285
-0
lines changed

wp_mobile_cache/src/repository/list_metadata.rs

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,136 @@ impl ListMetadataRepository {
375375

376376
Ok(())
377377
}
378+
379+
// ============================================================
380+
// Concurrency Helpers
381+
// ============================================================
382+
383+
/// Begin a refresh operation (fetch first page).
384+
///
385+
/// Atomically:
386+
/// 1. Creates header if needed
387+
/// 2. Increments version (invalidates any in-flight load-more)
388+
/// 3. Updates state to FetchingFirstPage
389+
/// 4. Returns info needed for the fetch
390+
///
391+
/// Call this before starting an API fetch for page 1.
392+
pub fn begin_refresh(
393+
&self,
394+
executor: &impl QueryExecutor,
395+
site: &DbSite,
396+
key: &str,
397+
) -> Result<RefreshInfo, SqliteDbError> {
398+
// Ensure header exists and get its ID
399+
let list_metadata_id = self.get_or_create(executor, site, key)?;
400+
401+
// Increment version (invalidates any in-flight load-more)
402+
let version = self.increment_version(executor, site, key)?;
403+
404+
// Update state to fetching
405+
self.update_state(executor, list_metadata_id, ListState::FetchingFirstPage, None)?;
406+
407+
// Get header for pagination info
408+
let header = self.get_header(executor, site, key)?.unwrap();
409+
410+
Ok(RefreshInfo {
411+
list_metadata_id,
412+
version,
413+
per_page: header.per_page,
414+
})
415+
}
416+
417+
/// Begin a load-next-page operation.
418+
///
419+
/// Atomically:
420+
/// 1. Gets current pagination state
421+
/// 2. Checks if there are more pages to load
422+
/// 3. Updates state to FetchingNextPage
423+
/// 4. Returns info needed for the fetch (including version for later check)
424+
///
425+
/// Returns None if already at the last page or no pages loaded yet.
426+
/// Call this before starting an API fetch for page N+1.
427+
pub fn begin_fetch_next_page(
428+
&self,
429+
executor: &impl QueryExecutor,
430+
site: &DbSite,
431+
key: &str,
432+
) -> Result<Option<FetchNextPageInfo>, SqliteDbError> {
433+
let header = match self.get_header(executor, site, key)? {
434+
Some(h) => h,
435+
None => return Ok(None), // List doesn't exist
436+
};
437+
438+
// Check if we have pages loaded and more to fetch
439+
if header.current_page == 0 {
440+
return Ok(None); // No pages loaded yet, need refresh first
441+
}
442+
443+
if let Some(total_pages) = header.total_pages
444+
&& header.current_page >= total_pages
445+
{
446+
return Ok(None); // Already at last page
447+
}
448+
449+
let next_page = header.current_page + 1;
450+
451+
// Update state to fetching
452+
self.update_state(executor, header.row_id, ListState::FetchingNextPage, None)?;
453+
454+
Ok(Some(FetchNextPageInfo {
455+
list_metadata_id: header.row_id,
456+
page: next_page,
457+
version: header.version,
458+
per_page: header.per_page,
459+
}))
460+
}
461+
462+
/// Complete a sync operation successfully.
463+
///
464+
/// Updates state to Idle and clears any error message.
465+
pub fn complete_sync(
466+
&self,
467+
executor: &impl QueryExecutor,
468+
list_metadata_id: RowId,
469+
) -> Result<(), SqliteDbError> {
470+
self.update_state(executor, list_metadata_id, ListState::Idle, None)
471+
}
472+
473+
/// Complete a sync operation with an error.
474+
///
475+
/// Updates state to Error with the provided message.
476+
pub fn complete_sync_with_error(
477+
&self,
478+
executor: &impl QueryExecutor,
479+
list_metadata_id: RowId,
480+
error_message: &str,
481+
) -> Result<(), SqliteDbError> {
482+
self.update_state(executor, list_metadata_id, ListState::Error, Some(error_message))
483+
}
484+
}
485+
486+
/// Information returned when starting a refresh operation.
487+
#[derive(Debug, Clone)]
488+
pub struct RefreshInfo {
489+
/// Row ID of the list_metadata record
490+
pub list_metadata_id: RowId,
491+
/// New version number (for concurrency checking)
492+
pub version: i64,
493+
/// Items per page setting
494+
pub per_page: i64,
495+
}
496+
497+
/// Information returned when starting a load-next-page operation.
498+
#[derive(Debug, Clone)]
499+
pub struct FetchNextPageInfo {
500+
/// Row ID of the list_metadata record
501+
pub list_metadata_id: RowId,
502+
/// Page number to fetch
503+
pub page: i64,
504+
/// Version at start (check before storing results)
505+
pub version: i64,
506+
/// Items per page setting
507+
pub per_page: i64,
378508
}
379509

380510
/// Input for creating a list metadata item.
@@ -773,4 +903,159 @@ mod tests {
773903
assert_eq!(item.entity_id, ((i + 1) * 100) as i64);
774904
}
775905
}
906+
907+
// ============================================================
908+
// Concurrency Helper Tests
909+
// ============================================================
910+
911+
#[rstest]
912+
fn test_begin_refresh_creates_header_and_sets_state(test_ctx: TestContext) {
913+
let repo = list_metadata_repo();
914+
let key = "edit:posts:publish";
915+
916+
let info = repo.begin_refresh(&test_ctx.conn, &test_ctx.site, key).unwrap();
917+
918+
// Verify version was incremented (from 0 to 1)
919+
assert_eq!(info.version, 1);
920+
assert_eq!(info.per_page, 20); // default
921+
922+
// Verify state is FetchingFirstPage
923+
let state = repo.get_state_by_key(&test_ctx.conn, &test_ctx.site, key).unwrap();
924+
assert_eq!(state, ListState::FetchingFirstPage);
925+
}
926+
927+
#[rstest]
928+
fn test_begin_refresh_increments_version_each_time(test_ctx: TestContext) {
929+
let repo = list_metadata_repo();
930+
let key = "edit:posts:draft";
931+
932+
let info1 = repo.begin_refresh(&test_ctx.conn, &test_ctx.site, key).unwrap();
933+
assert_eq!(info1.version, 1);
934+
935+
// Complete the first refresh
936+
repo.complete_sync(&test_ctx.conn, info1.list_metadata_id).unwrap();
937+
938+
let info2 = repo.begin_refresh(&test_ctx.conn, &test_ctx.site, key).unwrap();
939+
assert_eq!(info2.version, 2);
940+
}
941+
942+
#[rstest]
943+
fn test_begin_fetch_next_page_returns_none_for_non_existent_list(test_ctx: TestContext) {
944+
let repo = list_metadata_repo();
945+
946+
let result = repo.begin_fetch_next_page(&test_ctx.conn, &test_ctx.site, "nonexistent").unwrap();
947+
assert!(result.is_none());
948+
}
949+
950+
#[rstest]
951+
fn test_begin_fetch_next_page_returns_none_when_no_pages_loaded(test_ctx: TestContext) {
952+
let repo = list_metadata_repo();
953+
let key = "edit:posts:publish";
954+
955+
// Create header but don't set current_page
956+
repo.get_or_create(&test_ctx.conn, &test_ctx.site, key).unwrap();
957+
958+
let result = repo.begin_fetch_next_page(&test_ctx.conn, &test_ctx.site, key).unwrap();
959+
assert!(result.is_none());
960+
}
961+
962+
#[rstest]
963+
fn test_begin_fetch_next_page_returns_none_at_last_page(test_ctx: TestContext) {
964+
let repo = list_metadata_repo();
965+
let key = "edit:posts:publish";
966+
967+
// Set up header with current_page = total_pages
968+
let update = ListMetadataHeaderUpdate {
969+
total_pages: Some(3),
970+
total_items: Some(60),
971+
current_page: 3,
972+
per_page: 20,
973+
};
974+
repo.update_header(&test_ctx.conn, &test_ctx.site, key, &update).unwrap();
975+
976+
let result = repo.begin_fetch_next_page(&test_ctx.conn, &test_ctx.site, key).unwrap();
977+
assert!(result.is_none());
978+
}
979+
980+
#[rstest]
981+
fn test_begin_fetch_next_page_returns_info_when_more_pages(test_ctx: TestContext) {
982+
let repo = list_metadata_repo();
983+
let key = "edit:posts:publish";
984+
985+
// Set up header with more pages available
986+
let update = ListMetadataHeaderUpdate {
987+
total_pages: Some(5),
988+
total_items: Some(100),
989+
current_page: 2,
990+
per_page: 20,
991+
};
992+
repo.update_header(&test_ctx.conn, &test_ctx.site, key, &update).unwrap();
993+
994+
let result = repo.begin_fetch_next_page(&test_ctx.conn, &test_ctx.site, key).unwrap();
995+
assert!(result.is_some());
996+
997+
let info = result.unwrap();
998+
assert_eq!(info.page, 3); // next page
999+
assert_eq!(info.per_page, 20);
1000+
1001+
// Verify state changed to FetchingNextPage
1002+
let state = repo.get_state_by_key(&test_ctx.conn, &test_ctx.site, key).unwrap();
1003+
assert_eq!(state, ListState::FetchingNextPage);
1004+
}
1005+
1006+
#[rstest]
1007+
fn test_complete_sync_sets_state_to_idle(test_ctx: TestContext) {
1008+
let repo = list_metadata_repo();
1009+
let key = "edit:posts:publish";
1010+
1011+
let info = repo.begin_refresh(&test_ctx.conn, &test_ctx.site, key).unwrap();
1012+
repo.complete_sync(&test_ctx.conn, info.list_metadata_id).unwrap();
1013+
1014+
let state = repo.get_state_by_key(&test_ctx.conn, &test_ctx.site, key).unwrap();
1015+
assert_eq!(state, ListState::Idle);
1016+
}
1017+
1018+
#[rstest]
1019+
fn test_complete_sync_with_error_sets_state_and_message(test_ctx: TestContext) {
1020+
let repo = list_metadata_repo();
1021+
let key = "edit:posts:publish";
1022+
1023+
let info = repo.begin_refresh(&test_ctx.conn, &test_ctx.site, key).unwrap();
1024+
repo.complete_sync_with_error(&test_ctx.conn, info.list_metadata_id, "Network timeout").unwrap();
1025+
1026+
let state_record = repo.get_state(&test_ctx.conn, info.list_metadata_id).unwrap().unwrap();
1027+
assert_eq!(state_record.state, ListState::Error);
1028+
assert_eq!(state_record.error_message.as_deref(), Some("Network timeout"));
1029+
}
1030+
1031+
#[rstest]
1032+
fn test_version_check_detects_stale_operation(test_ctx: TestContext) {
1033+
let repo = list_metadata_repo();
1034+
let key = "edit:posts:publish";
1035+
1036+
// Start a refresh (version becomes 1)
1037+
let refresh_info = repo.begin_refresh(&test_ctx.conn, &test_ctx.site, key).unwrap();
1038+
assert_eq!(refresh_info.version, 1);
1039+
1040+
// Update header to simulate page 1 loaded
1041+
let update = ListMetadataHeaderUpdate {
1042+
total_pages: Some(5),
1043+
total_items: Some(100),
1044+
current_page: 1,
1045+
per_page: 20,
1046+
};
1047+
repo.update_header(&test_ctx.conn, &test_ctx.site, key, &update).unwrap();
1048+
repo.complete_sync(&test_ctx.conn, refresh_info.list_metadata_id).unwrap();
1049+
1050+
// Start load-next-page (captures version = 1)
1051+
let next_page_info = repo.begin_fetch_next_page(&test_ctx.conn, &test_ctx.site, key).unwrap().unwrap();
1052+
let captured_version = next_page_info.version;
1053+
1054+
// Another refresh happens (version becomes 2)
1055+
repo.begin_refresh(&test_ctx.conn, &test_ctx.site, key).unwrap();
1056+
1057+
// Version check should fail (stale)
1058+
let is_valid = repo.check_version(&test_ctx.conn, &test_ctx.site, key, captured_version).unwrap();
1059+
assert!(!is_valid, "Version should not match after refresh");
1060+
}
7761061
}

0 commit comments

Comments
 (0)