Skip to content

Commit

Permalink
Add new sync (atuinsh#1093)
Browse files Browse the repository at this point in the history
* Add record migration

* Add database functions for inserting history

No real tests yet :( I would like to avoid running postgres lol

* Add index handler, use UUIDs not strings

* Fix a bunch of tests, remove Option<Uuid>

* Add tests, all passing

* Working upload sync

* Record downloading works

* Sync download works

* Don't waste requests

* Use a page size for uploads, make it variable later

* Aaaaaand they're encrypted now too

* Add cek

* Allow reading tail across hosts

* Revert "Allow reading tail across hosts"

Not like that

This reverts commit 7b0c72e.

* Handle multiple shards properly

* format

* Format and make clippy happy

* use some fancy types (atuinsh#1098)

* use some fancy types

* fmt

* Goodbye horrible tuple

* Update atuin-server-postgres/migrations/20230623070418_records.sql

Co-authored-by: Conrad Ludgate <conradludgate@gmail.com>

* fmt

* Sort tests too because time sucks

* fix features

---------

Co-authored-by: Conrad Ludgate <conradludgate@gmail.com>
  • Loading branch information
2 people authored and ealap committed Jul 18, 2023
1 parent 722f994 commit 25c6e6b
Show file tree
Hide file tree
Showing 29 changed files with 1,094 additions and 143 deletions.
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[workspace]
members = [
"atuin",
"atuin-client",
"atuin-server",
"atuin-server-postgres",
"atuin-server-database",
"atuin-common",
"atuin",
"atuin-client",
"atuin-server",
"atuin-server-postgres",
"atuin-server-database",
"atuin-common",
]

[workspace.package]
Expand Down Expand Up @@ -35,9 +35,10 @@ semver = "1.0.14"
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0.86"
tokio = { version = "1", features = ["full"] }
uuid = { version = "1.3", features = ["v4"] }
uuid = { version = "1.3", features = ["v4", "serde"] }
whoami = "1.1.2"
typed-builder = "0.14.0"
pretty_assertions = "1.3.0"

[workspace.dependencies.reqwest]
version = "0.11"
Expand All @@ -46,4 +47,4 @@ default-features = false

[workspace.dependencies.sqlx]
version = "0.6"
features = ["runtime-tokio-rustls", "chrono", "postgres"]
features = ["runtime-tokio-rustls", "chrono", "postgres", "uuid"]
7 changes: 6 additions & 1 deletion atuin-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,14 @@ rmp = { version = "0.8.11" }
typed-builder = "0.14.0"
tokio = { workspace = true }
semver = { workspace = true }
futures = "0.3"

# encryption
rusty_paseto = { version = "0.5.0", default-features = false }
rusty_paserk = { version = "0.2.0", default-features = false, features = ["v4", "serde"] }
rusty_paserk = { version = "0.2.0", default-features = false, features = [
"v4",
"serde",
] }

# sync
urlencoding = { version = "2.1.0", optional = true }
Expand All @@ -69,3 +73,4 @@ generic-array = { version = "0.14", optional = true, features = ["serde"] }

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
pretty_assertions = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ create table if not exists records (
timestamp integer not null,
tag text not null,
version text not null,
data blob not null
data blob not null,
cek blob not null
);

create index host_idx on records (host);
Expand Down

This file was deleted.

59 changes: 56 additions & 3 deletions atuin-client/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ use reqwest::{
StatusCode, Url,
};

use atuin_common::api::{
AddHistoryRequest, CountResponse, DeleteHistoryRequest, ErrorResponse, IndexResponse,
LoginRequest, LoginResponse, RegisterResponse, StatusResponse, SyncHistoryResponse,
use atuin_common::record::{EncryptedData, HostId, Record, RecordId};
use atuin_common::{
api::{
AddHistoryRequest, CountResponse, DeleteHistoryRequest, ErrorResponse, IndexResponse,
LoginRequest, LoginResponse, RegisterResponse, StatusResponse, SyncHistoryResponse,
},
record::RecordIndex,
};
use semver::Version;

Expand Down Expand Up @@ -195,6 +199,55 @@ impl<'a> Client<'a> {
Ok(())
}

pub async fn post_records(&self, records: &[Record<EncryptedData>]) -> Result<()> {
let url = format!("{}/record", self.sync_addr);
let url = Url::parse(url.as_str())?;

self.client.post(url).json(records).send().await?;

Ok(())
}

pub async fn next_records(
&self,
host: HostId,
tag: String,
start: Option<RecordId>,
count: u64,
) -> Result<Vec<Record<EncryptedData>>> {
let url = format!(
"{}/record/next?host={}&tag={}&count={}",
self.sync_addr, host.0, tag, count
);
let mut url = Url::parse(url.as_str())?;

if let Some(start) = start {
url.set_query(Some(
format!(
"host={}&tag={}&count={}&start={}",
host.0, tag, count, start.0
)
.as_str(),
));
}

let resp = self.client.get(url).send().await?;

let records = resp.json::<Vec<Record<EncryptedData>>>().await?;

Ok(records)
}

pub async fn record_index(&self) -> Result<RecordIndex> {
let url = format!("{}/record", self.sync_addr);
let url = Url::parse(url.as_str())?;

let resp = self.client.get(url).send().await?;
let index = resp.json().await?;

Ok(index)
}

pub async fn delete(&self) -> Result<()> {
let url = format!("{}/account", self.sync_addr);
let url = Url::parse(url.as_str())?;
Expand Down
2 changes: 1 addition & 1 deletion atuin-client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub fn current_context() -> Context {
session,
hostname,
cwd,
host_id,
host_id: host_id.0.as_simple().to_string(),
}
}

Expand Down
22 changes: 12 additions & 10 deletions atuin-client/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ impl KvStore {

let bytes = record.serialize()?;

let parent = store
.last(host_id.as_str(), KV_TAG)
.await?
.map(|entry| entry.id);
let parent = store.tail(host_id, KV_TAG).await?.map(|entry| entry.id);

let record = atuin_common::record::Record::builder()
.host(host_id)
Expand All @@ -130,17 +127,22 @@ impl KvStore {
namespace: &str,
key: &str,
) -> Result<Option<KvRecord>> {
// TODO: don't load this from disk so much
let host_id = Settings::host_id().expect("failed to get host_id");

// Currently, this is O(n). When we have an actual KV store, it can be better
// Just a poc for now!

// iterate records to find the value we want
// start at the end, so we get the most recent version
let Some(mut record) = store.last(host_id.as_str(), KV_TAG).await? else {
let tails = store.tag_tails(KV_TAG).await?;

if tails.is_empty() {
return Ok(None);
};
}

// first, decide on a record.
// try getting the newest first
// we always need a way of deciding the "winner" of a write
// TODO(ellie): something better than last-write-wins, what if two write at the same time?
let mut record = tails.iter().max_by_key(|r| r.timestamp).unwrap().clone();

loop {
let decrypted = match record.version.as_str() {
Expand All @@ -154,7 +156,7 @@ impl KvStore {
}

if let Some(parent) = decrypted.parent {
record = store.get(parent.as_str()).await?;
record = store.get(parent).await?;
} else {
break;
}
Expand Down

0 comments on commit 25c6e6b

Please sign in to comment.