Skip to content

Commit

Permalink
Basic sync operation backfill (spacedriveapp#2101)
Browse files Browse the repository at this point in the history
* basic sync operation backfill

* no changes
  • Loading branch information
Brendonovich authored Feb 20, 2024
1 parent 2a28347 commit 9bc1a47
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 3 deletions.
264 changes: 264 additions & 0 deletions core/crates/sync/src/backfill.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
use sd_prisma::{
prisma::{
file_path, label, label_on_object, location, object, tag, tag_on_object, PrismaClient,
},
prisma_sync,
};
use sd_sync::OperationFactory;
use sd_utils::chain_optional_iter;
use serde_json::json;

use crate::crdt_op_unchecked_db;

pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, instance_id: i32) {
println!("backfill started");
db.crdt_operation()
.delete_many(vec![])
.exec()
.await
.unwrap();
let locations = db.location().find_many(vec![]).exec().await.unwrap();
db.crdt_operation()
.create_many(
locations
.into_iter()
.flat_map(|l| {
sync.shared_create(
prisma_sync::location::SyncId { pub_id: l.pub_id },
chain_optional_iter(
[],
[
l.name.map(|v| (location::name::NAME, json!(v))),
l.path.map(|v| (location::path::NAME, json!(v))),
l.total_capacity
.map(|v| (location::total_capacity::NAME, json!(v))),
l.available_capacity
.map(|v| (location::available_capacity::NAME, json!(v))),
l.size_in_bytes
.map(|v| (location::size_in_bytes::NAME, json!(v))),
l.is_archived
.map(|v| (location::is_archived::NAME, json!(v))),
l.generate_preview_media
.map(|v| (location::generate_preview_media::NAME, json!(v))),
l.sync_preview_media
.map(|v| (location::sync_preview_media::NAME, json!(v))),
l.hidden.map(|v| (location::hidden::NAME, json!(v))),
l.date_created
.map(|v| (location::date_created::NAME, json!(v))),
],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await
.unwrap();

let objects = db.object().find_many(vec![]).exec().await.unwrap();
db.crdt_operation()
.create_many(
objects
.into_iter()
.flat_map(|o| {
sync.shared_create(
prisma_sync::object::SyncId { pub_id: o.pub_id },
chain_optional_iter(
[],
[
o.kind.map(|v| (object::kind::NAME, json!(v))),
o.hidden.map(|v| (object::hidden::NAME, json!(v))),
o.favorite.map(|v| (object::favorite::NAME, json!(v))),
o.important.map(|v| (object::important::NAME, json!(v))),
o.note.map(|v| (object::note::NAME, json!(v))),
o.date_created
.map(|v| (object::date_created::NAME, json!(v))),
o.date_accessed
.map(|v| (object::date_accessed::NAME, json!(v))),
],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await
.unwrap();

let file_paths = db
.file_path()
.find_many(vec![])
.include(file_path::include!({
location: select { pub_id }
object: select { pub_id }
}))
.exec()
.await
.unwrap();

db.crdt_operation()
.create_many(
file_paths
.into_iter()
.flat_map(|fp| {
sync.shared_create(
prisma_sync::file_path::SyncId { pub_id: fp.pub_id },
chain_optional_iter(
[],
[
fp.is_dir.map(|v| (file_path::is_dir::NAME, json!(v))),
fp.cas_id.map(|v| (file_path::cas_id::NAME, json!(v))),
fp.integrity_checksum
.map(|v| (file_path::integrity_checksum::NAME, json!(v))),
fp.location.map(|l| {
(
file_path::location::NAME,
json!(prisma_sync::location::SyncId { pub_id: l.pub_id }),
)
}),
fp.materialized_path
.map(|v| (file_path::materialized_path::NAME, json!(v))),
fp.name.map(|v| (file_path::name::NAME, json!(v))),
fp.extension.map(|v| (file_path::extension::NAME, json!(v))),
fp.hidden.map(|v| (file_path::hidden::NAME, json!(v))),
fp.size_in_bytes_bytes
.map(|v| (file_path::size_in_bytes_bytes::NAME, json!(v))),
fp.inode.map(|v| (file_path::inode::NAME, json!(v))),
fp.date_created
.map(|v| (file_path::date_created::NAME, json!(v))),
fp.date_modified
.map(|v| (file_path::date_modified::NAME, json!(v))),
fp.date_indexed
.map(|v| (file_path::date_indexed::NAME, json!(v))),
],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await
.unwrap();
let tags = db.tag().find_many(vec![]).exec().await.unwrap();
db.crdt_operation()
.create_many(
tags.into_iter()
.flat_map(|t| {
sync.shared_create(
prisma_sync::tag::SyncId { pub_id: t.pub_id },
chain_optional_iter(
[],
[
t.name.map(|v| (tag::name::NAME, json!(v))),
t.color.map(|v| (tag::color::NAME, json!(v))),
t.date_created.map(|v| (tag::date_created::NAME, json!(v))),
t.date_modified
.map(|v| (tag::date_modified::NAME, json!(v))),
],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await
.unwrap();

let tag_on_objects = db
.tag_on_object()
.find_many(vec![])
.include(tag_on_object::include!({
tag: select { pub_id }
object: select { pub_id }
}))
.exec()
.await
.unwrap();
db.crdt_operation()
.create_many(
tag_on_objects
.into_iter()
.flat_map(|t_o| {
sync.relation_create(
prisma_sync::tag_on_object::SyncId {
tag: prisma_sync::tag::SyncId {
pub_id: t_o.tag.pub_id,
},
object: prisma_sync::object::SyncId {
pub_id: t_o.object.pub_id,
},
},
chain_optional_iter(
[],
[t_o.date_created
.map(|v| (tag_on_object::date_created::NAME, json!(v)))],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await
.unwrap();

let labels = db.label().find_many(vec![]).exec().await.unwrap();
db.crdt_operation()
.create_many(
labels
.into_iter()
.flat_map(|l| {
sync.shared_create(
prisma_sync::label::SyncId { name: l.name },
[
(label::date_created::NAME, json!(l.date_created)),
(label::date_modified::NAME, json!(l.date_modified)),
],
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await
.unwrap();

let label_on_objects = db
.label_on_object()
.find_many(vec![])
.select(label_on_object::select!({
object: select { pub_id }
label: select { name }
}))
.exec()
.await
.unwrap();
db.crdt_operation()
.create_many(
label_on_objects
.into_iter()
.flat_map(|l_o| {
sync.relation_create(
prisma_sync::label_on_object::SyncId {
label: prisma_sync::label::SyncId {
name: l_o.label.name,
},
object: prisma_sync::object::SyncId {
pub_id: l_o.object.pub_id,
},
},
[],
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await
.unwrap();
println!("backfill ended")
}
18 changes: 18 additions & 0 deletions core/crates/sync/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Brendan remove this once you've got error handling here

mod actor;
pub mod backfill;
mod db_operation;
pub mod ingest;
mod manager;
Expand Down Expand Up @@ -46,3 +47,20 @@ pub fn crdt_op_db(op: &CRDTOperation) -> crdt_operation::Create {
_params: vec![],
}
}

#[must_use]
pub fn crdt_op_unchecked_db(
op: &CRDTOperation,
instance_id: i32,
) -> crdt_operation::CreateUnchecked {
crdt_operation::CreateUnchecked {
id: op.id.as_bytes().to_vec(),
timestamp: op.timestamp.0 as i64,
instance_id,
kind: op.kind().to_string(),
data: serde_json::to_vec(&op.data).unwrap(),
model: op.model.to_string(),
record_id: serde_json::to_vec(&op.record_id).unwrap(),
_params: vec![],
}
}
13 changes: 13 additions & 0 deletions core/src/api/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,17 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.await?)
})
})
.procedure("backfill", {
R.with2(library())
.mutation(|(_, library), _: ()| async move {
sd_core_sync::backfill::backfill_operations(
&library.db,
&library.sync,
library.config().await.instance_id,
)
.await;

Ok(())
})
})
}
14 changes: 13 additions & 1 deletion interface/app/$libraryId/debug/sync.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ import { stringify } from 'uuid';
import {
CRDTOperation,
CRDTOperationData,
useLibraryMutation,
useLibraryQuery,
useLibrarySubscription
} from '@sd/client';
import { Button } from '@sd/ui';
import { useRouteTitle } from '~/hooks/useRouteTitle';

type MessageGroup = {
Expand All @@ -18,6 +20,9 @@ export const Component = () => {
useRouteTitle('Sync');

const messages = useLibraryQuery(['sync.messages']);
const backfillSyncMessages = useLibraryMutation(['sync.backfill'], {
onSuccess: () => messages.refetch()
});

useLibrarySubscription(['sync.newMessage'], {
onData: () => messages.refetch()
Expand All @@ -30,6 +35,13 @@ export const Component = () => {

return (
<ul className="space-y-4 p-4">
<Button
variant="accent"
onClick={() => backfillSyncMessages.mutate(null)}
disabled={backfillSyncMessages.isLoading}
>
Backfill Sync Messages
</Button>
{groups?.map((group, index) => <OperationGroup key={index} group={group} />)}
</ul>
);
Expand Down Expand Up @@ -72,7 +84,7 @@ function calculateGroups(messages: CRDTOperation[]) {
return messages.reduce<MessageGroup[]>((acc, op) => {
const { data } = op;

const id = stringify((op.record_id as any).pub_id);
const id = JSON.stringify(op.record_id);

const latest = (() => {
const latest = acc[acc.length - 1];
Expand Down
5 changes: 3 additions & 2 deletions packages/client/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export type Procedures = {
{ key: "search.saved.create", input: LibraryArgs<{ name: string; search?: string | null; filters?: string | null; description?: string | null; icon?: string | null }>, result: null } |
{ key: "search.saved.delete", input: LibraryArgs<number>, result: null } |
{ key: "search.saved.update", input: LibraryArgs<[number, Args]>, result: null } |
{ key: "sync.backfill", input: LibraryArgs<null>, result: null } |
{ key: "tags.assign", input: LibraryArgs<{ targets: Target[]; tag_id: number; unassign: boolean }>, result: null } |
{ key: "tags.create", input: LibraryArgs<TagCreateArgs>, result: Tag } |
{ key: "tags.delete", input: LibraryArgs<number>, result: null } |
Expand Down Expand Up @@ -246,7 +247,7 @@ export type FileDeleterJobInit = { location_id: number; file_path_ids: number[]

export type FileEraserJobInit = { location_id: number; file_path_ids: number[]; passes: string }

export type FilePath = { id: number; pub_id: number[]; is_dir: boolean | null; cas_id: string | null; integrity_checksum: string | null; location_id: number | null; materialized_path: string | null; name: string | null; extension: string | null; hidden: boolean | null; size_in_bytes: string | null; size_in_bytes_bytes: number[] | null; inode: number[] | null; object_id: number | null; key_id: number | null; date_created: string | null; date_modified: string | null; date_indexed: string | null }
export type FilePath = { id: number; pub_id: number[]; is_dir: boolean | null; cas_id: string | null; integrity_checksum: string | null; location_id: number | null; materialized_path: string | null; name: string | null; extension: string | null; hidden: boolean | null; size_in_bytes: string | null; size_in_bytes_bytes: number[] | null; inode: number[] | null; object_id: number | null; date_created: string | null; date_modified: string | null; date_indexed: string | null }

export type FilePathCursor = { isDir: boolean; variant: FilePathCursorVariant }

Expand All @@ -260,7 +261,7 @@ export type FilePathOrder = { field: "name"; value: SortOrder } | { field: "size

export type FilePathSearchArgs = { take?: number | null; orderAndPagination?: OrderAndPagination<number, FilePathOrder, FilePathCursor> | null; filters?: SearchFilterArgs[]; groupDirectories?: boolean }

export type FilePathWithObject = { id: number; pub_id: number[]; is_dir: boolean | null; cas_id: string | null; integrity_checksum: string | null; location_id: number | null; materialized_path: string | null; name: string | null; extension: string | null; hidden: boolean | null; size_in_bytes: string | null; size_in_bytes_bytes: number[] | null; inode: number[] | null; object_id: number | null; key_id: number | null; date_created: string | null; date_modified: string | null; date_indexed: string | null; object: { id: number; pub_id: number[]; kind: number | null; key_id: number | null; hidden: boolean | null; favorite: boolean | null; important: boolean | null; note: string | null; date_created: string | null; date_accessed: string | null } | null }
export type FilePathWithObject = { id: number; pub_id: number[]; is_dir: boolean | null; cas_id: string | null; integrity_checksum: string | null; location_id: number | null; materialized_path: string | null; name: string | null; extension: string | null; hidden: boolean | null; size_in_bytes: string | null; size_in_bytes_bytes: number[] | null; inode: number[] | null; object_id: number | null; date_created: string | null; date_modified: string | null; date_indexed: string | null; object: { id: number; pub_id: number[]; kind: number | null; key_id: number | null; hidden: boolean | null; favorite: boolean | null; important: boolean | null; note: string | null; date_created: string | null; date_accessed: string | null } | null }

export type Flash = {
/**
Expand Down

0 comments on commit 9bc1a47

Please sign in to comment.