Skip to content

Commit

Permalink
feat: rework record sync for improved reliability
Browse files Browse the repository at this point in the history
So, to tell a story

1. We introduced the record sync, intended to be the new algorithm to
   sync history.
2. On top of this, I added the KV store. This was intended as a simple
   test of the record sync, and to see if people wanted that sort of
   functionality
3. History remained syncing via the old means, as while it had issues it
   worked more-or-less OK. And we are aware of its flaws
4. If KV syncing worked ok, history would be moved across

KV syncing ran ok for 6mo or so, so I started to move across history.
For several weeks, I ran a local fork of Atuin + the server that synced
via records instead.

The record store maintained ordering via a linked list, which was a
mistake. It performed well in testing, but was really difficult to debug
and reason about. So when a few small sync issues occured, they took an
extremely long time to debug.

This PR is huge, which I regret. It involves replacing the "parent"
relationship that records once had (pointing to the previous record)
with a simple index (generally referred to as idx). This also means we
had to change the recordindex, which referenced "tails". Tails were the
last item in the chain.

Now that we use an "array" vs linked list, that logic was also replaced.
And is much simpler :D

Same for the queries that act on this data.

----

This isn't final - we still need to add

1. Proper server/client error handling, which has been lacking for a
   while
2. The actual history implementation on top
    This exists in a branch, just without deletions. Won't be much to
    add that, I just don't want to make this any larger than it already
    is

The _only_ caveat here is that we basically lose data synced via the old
record store. This is the KV data from before.

It hasn't been deleted or anything, just no longer hooked up. So it's
totally possible to write a migration script. I just need to do that.
  • Loading branch information
ellie committed Jan 3, 2024
1 parent d303d68 commit 5dbaeab
Show file tree
Hide file tree
Showing 29 changed files with 1,274 additions and 620 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.DS_Store
/target
*/target
.env
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ uuid = { version = "1.3", features = ["v4", "v7", "serde"] }
whoami = "1.1.2"
typed-builder = "0.18.0"
pretty_assertions = "1.3.0"
thiserror = "1.0"

[workspace.dependencies.reqwest]
version = "0.11"
Expand Down
1 change: 1 addition & 0 deletions atuin-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ rmp = { version = "0.8.11" }
typed-builder = { workspace = true }
tokio = { workspace = true }
semver = { workspace = true }
thiserror = { workspace = true }
futures = "0.3"
crypto_secretbox = "0.1.1"
generic-array = { version = "0.14", features = ["serde"] }
Expand Down
15 changes: 15 additions & 0 deletions atuin-client/record-migrations/20231127090831_create-store.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Add migration script here
create table if not exists store (
id text primary key, -- globally unique ID

idx integer, -- incrementing integer ID unique per (host, tag)
host text not null, -- references the host row
tag text not null,

timestamp integer not null,
version text not null,
data blob not null,
cek blob not null
);

create unique index record_uniq ON store(host, tag, idx);
34 changes: 17 additions & 17 deletions atuin-client/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use atuin_common::{
AddHistoryRequest, CountResponse, DeleteHistoryRequest, ErrorResponse, IndexResponse,
LoginRequest, LoginResponse, RegisterResponse, StatusResponse, SyncHistoryResponse,
},
record::RecordIndex,
record::RecordStatus,
};
use atuin_common::{
api::{ATUIN_CARGO_VERSION, ATUIN_HEADER_VERSION, ATUIN_VERSION},
record::{EncryptedData, HostId, Record, RecordId},
record::{EncryptedData, HostId, Record, RecordIdx},
};
use semver::Version;
use time::format_description::well_known::Rfc3339;
Expand Down Expand Up @@ -279,24 +279,22 @@ impl<'a> Client<'a> {
&self,
host: HostId,
tag: String,
start: Option<RecordId>,
start: RecordIdx,
count: u64,
) -> Result<Vec<Record<EncryptedData>>> {
debug!(
"fetching record/s from host {}/{}/{}",
host.0.to_string(),
tag,
start
);

let url = format!(
"{}/record/next?host={}&tag={}&count={}",
self.sync_addr, host.0, tag, count
"{}/record/next?host={}&tag={}&count={}&start={}",
self.sync_addr, host.0, tag, count, start
);
let mut url = Url::parse(url.as_str())?;

if let Some(start) = start {
url.set_query(Some(
format!(
"host={}&tag={}&count={}&start={}",
host.0, tag, count, start.0
)
.as_str(),
));
}

let url = Url::parse(url.as_str())?;

let resp = self.client.get(url).send().await?;

Expand All @@ -305,7 +303,7 @@ impl<'a> Client<'a> {
Ok(records)
}

pub async fn record_index(&self) -> Result<RecordIndex> {
pub async fn record_status(&self) -> Result<RecordStatus> {
let url = format!("{}/record", self.sync_addr);
let url = Url::parse(url.as_str())?;

Expand All @@ -317,6 +315,8 @@ impl<'a> Client<'a> {

let index = resp.json().await?;

debug!("got remote index {:?}", index);

Ok(index)
}

Expand Down
210 changes: 209 additions & 1 deletion atuin-client/src/history.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
use rmp::decode::ValueReadError;
use rmp::{decode::Bytes, Marker};
use std::env;

use atuin_common::record::DecryptedData;
use atuin_common::utils::uuid_v7;

use eyre::{bail, eyre, Result};
use regex::RegexSet;

use crate::{secrets::SECRET_PATTERNS, settings::Settings};
use time::OffsetDateTime;

mod builder;
pub mod store;

const HISTORY_VERSION: &str = "v0";
const HISTORY_TAG: &str = "history";

/// Client-side history entry.
///
Expand Down Expand Up @@ -81,6 +90,108 @@ impl History {
}
}

pub fn serialize(&self) -> Result<DecryptedData> {
// This is pretty much the same as what we used for the old history, with one difference -
// it uses integers for timestamps rather than a string format.

use rmp::encode;

let mut output = vec![];

// write the version
encode::write_u16(&mut output, 0)?;
// INFO: ensure this is updated when adding new fields
encode::write_array_len(&mut output, 9)?;

encode::write_str(&mut output, &self.id)?;
encode::write_u64(&mut output, self.timestamp.unix_timestamp_nanos() as u64)?;
encode::write_sint(&mut output, self.duration)?;
encode::write_sint(&mut output, self.exit)?;
encode::write_str(&mut output, &self.command)?;
encode::write_str(&mut output, &self.cwd)?;
encode::write_str(&mut output, &self.session)?;
encode::write_str(&mut output, &self.hostname)?;

match self.deleted_at {
Some(d) => encode::write_u64(&mut output, d.unix_timestamp_nanos() as u64)?,
None => encode::write_nil(&mut output)?,
}

Ok(DecryptedData(output))
}

fn deserialize_v0(bytes: &[u8]) -> Result<History> {
use rmp::decode;

fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report {
eyre!("{err:?}")
}

let mut bytes = Bytes::new(bytes);

let version = decode::read_u16(&mut bytes).map_err(error_report)?;

if version != 0 {
bail!("expected decoding v0 record, found v{version}");
}

let nfields = decode::read_array_len(&mut bytes).map_err(error_report)?;

if nfields != 9 {
bail!("cannot decrypt history from a different version of Atuin");
}

let bytes = bytes.remaining_slice();
let (id, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;

let mut bytes = Bytes::new(bytes);
let timestamp = decode::read_u64(&mut bytes).map_err(error_report)?;
let duration = decode::read_int(&mut bytes).map_err(error_report)?;
let exit = decode::read_int(&mut bytes).map_err(error_report)?;

let bytes = bytes.remaining_slice();
let (command, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
let (cwd, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
let (session, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
let (hostname, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;

// if we have more fields, try and get the deleted_at
let mut bytes = Bytes::new(bytes);

let (deleted_at, bytes) = match decode::read_u64(&mut bytes) {
Ok(unix) => (Some(unix), bytes.remaining_slice()),
// we accept null here
Err(ValueReadError::TypeMismatch(Marker::Null)) => (None, bytes.remaining_slice()),
Err(err) => return Err(error_report(err)),
};

if !bytes.is_empty() {
bail!("trailing bytes in encoded history. malformed")
}

Ok(History {
id: id.to_owned(),
timestamp: OffsetDateTime::from_unix_timestamp_nanos(timestamp as i128)?,
duration,
exit,
command: command.to_owned(),
cwd: cwd.to_owned(),
session: session.to_owned(),
hostname: hostname.to_owned(),
deleted_at: deleted_at
.map(|t| OffsetDateTime::from_unix_timestamp_nanos(t as i128))
.transpose()?,
})
}

pub fn deserialize(bytes: &[u8], version: &str) -> Result<History> {
match version {
HISTORY_VERSION => Self::deserialize_v0(bytes),

_ => bail!("unknown version {version:?}"),
}
}

/// Builder for a history entry that is imported from shell history.
///
/// The only two required fields are `timestamp` and `command`.
Expand Down Expand Up @@ -202,8 +313,9 @@ impl History {
#[cfg(test)]
mod tests {
use regex::RegexSet;
use time::macros::datetime;

use crate::settings::Settings;
use crate::{history::HISTORY_VERSION, settings::Settings};

use super::History;

Expand Down Expand Up @@ -274,4 +386,100 @@ mod tests {

assert!(stripe_key.should_save(&settings));
}

#[test]
fn test_serialize_deserialize() {
let bytes = [
205, 0, 0, 153, 217, 32, 54, 54, 100, 49, 54, 99, 98, 101, 101, 55, 99, 100, 52, 55,
53, 51, 56, 101, 53, 99, 53, 98, 56, 98, 52, 52, 101, 57, 48, 48, 54, 101, 207, 23, 99,
98, 117, 24, 210, 246, 128, 206, 2, 238, 210, 240, 0, 170, 103, 105, 116, 32, 115, 116,
97, 116, 117, 115, 217, 42, 47, 85, 115, 101, 114, 115, 47, 99, 111, 110, 114, 97, 100,
46, 108, 117, 100, 103, 97, 116, 101, 47, 68, 111, 99, 117, 109, 101, 110, 116, 115,
47, 99, 111, 100, 101, 47, 97, 116, 117, 105, 110, 217, 32, 98, 57, 55, 100, 57, 97,
51, 48, 54, 102, 50, 55, 52, 52, 55, 51, 97, 50, 48, 51, 100, 50, 101, 98, 97, 52, 49,
102, 57, 52, 53, 55, 187, 102, 118, 102, 103, 57, 51, 54, 99, 48, 107, 112, 102, 58,
99, 111, 110, 114, 97, 100, 46, 108, 117, 100, 103, 97, 116, 101, 192,
];

let history = History {
id: "66d16cbee7cd47538e5c5b8b44e9006e".to_owned(),
timestamp: datetime!(2023-05-28 18:35:40.633872 +00:00),
duration: 49206000,
exit: 0,
command: "git status".to_owned(),
cwd: "/Users/conrad.ludgate/Documents/code/atuin".to_owned(),
session: "b97d9a306f274473a203d2eba41f9457".to_owned(),
hostname: "fvfg936c0kpf:conrad.ludgate".to_owned(),
deleted_at: None,
};

let serialized = history.serialize().expect("failed to serialize history");
assert_eq!(serialized.0, bytes);

let deserialized = History::deserialize(&serialized.0, HISTORY_VERSION)
.expect("failed to deserialize history");
assert_eq!(history, deserialized);

// test the snapshot too
let deserialized =
History::deserialize(&bytes, HISTORY_VERSION).expect("failed to deserialize history");
assert_eq!(history, deserialized);
}

#[test]
fn test_serialize_deserialize_deleted() {
let history = History {
id: "66d16cbee7cd47538e5c5b8b44e9006e".to_owned(),
timestamp: datetime!(2023-05-28 18:35:40.633872 +00:00),
duration: 49206000,
exit: 0,
command: "git status".to_owned(),
cwd: "/Users/conrad.ludgate/Documents/code/atuin".to_owned(),
session: "b97d9a306f274473a203d2eba41f9457".to_owned(),
hostname: "fvfg936c0kpf:conrad.ludgate".to_owned(),
deleted_at: Some(datetime!(2023-11-19 20:18 +00:00)),
};

let serialized = history.serialize().expect("failed to serialize history");

let deserialized = History::deserialize(&serialized.0, HISTORY_VERSION)
.expect("failed to deserialize history");

assert_eq!(history, deserialized);
}

#[test]
fn test_serialize_deserialize_version() {
// v0
let bytes_v0 = [
205, 0, 0, 153, 217, 32, 54, 54, 100, 49, 54, 99, 98, 101, 101, 55, 99, 100, 52, 55,
53, 51, 56, 101, 53, 99, 53, 98, 56, 98, 52, 52, 101, 57, 48, 48, 54, 101, 207, 23, 99,
98, 117, 24, 210, 246, 128, 206, 2, 238, 210, 240, 0, 170, 103, 105, 116, 32, 115, 116,
97, 116, 117, 115, 217, 42, 47, 85, 115, 101, 114, 115, 47, 99, 111, 110, 114, 97, 100,
46, 108, 117, 100, 103, 97, 116, 101, 47, 68, 111, 99, 117, 109, 101, 110, 116, 115,
47, 99, 111, 100, 101, 47, 97, 116, 117, 105, 110, 217, 32, 98, 57, 55, 100, 57, 97,
51, 48, 54, 102, 50, 55, 52, 52, 55, 51, 97, 50, 48, 51, 100, 50, 101, 98, 97, 52, 49,
102, 57, 52, 53, 55, 187, 102, 118, 102, 103, 57, 51, 54, 99, 48, 107, 112, 102, 58,
99, 111, 110, 114, 97, 100, 46, 108, 117, 100, 103, 97, 116, 101, 192,
];

// some other version
let bytes_v1 = [
205, 1, 0, 153, 217, 32, 54, 54, 100, 49, 54, 99, 98, 101, 101, 55, 99, 100, 52, 55,
53, 51, 56, 101, 53, 99, 53, 98, 56, 98, 52, 52, 101, 57, 48, 48, 54, 101, 207, 23, 99,
98, 117, 24, 210, 246, 128, 206, 2, 238, 210, 240, 0, 170, 103, 105, 116, 32, 115, 116,
97, 116, 117, 115, 217, 42, 47, 85, 115, 101, 114, 115, 47, 99, 111, 110, 114, 97, 100,
46, 108, 117, 100, 103, 97, 116, 101, 47, 68, 111, 99, 117, 109, 101, 110, 116, 115,
47, 99, 111, 100, 101, 47, 97, 116, 117, 105, 110, 217, 32, 98, 57, 55, 100, 57, 97,
51, 48, 54, 102, 50, 55, 52, 52, 55, 51, 97, 50, 48, 51, 100, 50, 101, 98, 97, 52, 49,
102, 57, 52, 53, 55, 187, 102, 118, 102, 103, 57, 51, 54, 99, 48, 107, 112, 102, 58,
99, 111, 110, 114, 97, 100, 46, 108, 117, 100, 103, 97, 116, 101, 192,
];

let deserialized = History::deserialize(&bytes_v0, HISTORY_VERSION);
assert!(deserialized.is_ok());

let deserialized = History::deserialize(&bytes_v1, HISTORY_VERSION);
assert!(deserialized.is_err());
}
}

0 comments on commit 5dbaeab

Please sign in to comment.