Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rework record sync for improved reliability #1478

Merged
merged 13 commits into from
Jan 5, 2024
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
.DS_Store
/target
*/target
.env
.idea/
.vscode/
result
publish.sh
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ uuid = { version = "1.3", features = ["v4", "v7", "serde"] }
whoami = "1.1.2"
typed-builder = "0.18.0"
pretty_assertions = "1.3.0"
thiserror = "1.0"

[workspace.dependencies.reqwest]
version = "0.11"
Expand Down
1 change: 1 addition & 0 deletions atuin-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ rmp = { version = "0.8.11" }
typed-builder = { workspace = true }
tokio = { workspace = true }
semver = { workspace = true }
thiserror = { workspace = true }
futures = "0.3"
crypto_secretbox = "0.1.1"
generic-array = { version = "0.14", features = ["serde"] }
Expand Down
15 changes: 15 additions & 0 deletions atuin-client/record-migrations/20231127090831_create-store.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Add migration script here
create table if not exists store (
id text primary key, -- globally unique ID

idx integer, -- incrementing integer ID unique per (host, tag)
host text not null, -- references the host row
tag text not null,

timestamp integer not null,
version text not null,
data blob not null,
cek blob not null
);

create unique index record_uniq ON store(host, tag, idx);
48 changes: 28 additions & 20 deletions atuin-client/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use atuin_common::{
AddHistoryRequest, CountResponse, DeleteHistoryRequest, ErrorResponse, IndexResponse,
LoginRequest, LoginResponse, RegisterResponse, StatusResponse, SyncHistoryResponse,
},
record::RecordIndex,
record::RecordStatus,
};
use atuin_common::{
api::{ATUIN_CARGO_VERSION, ATUIN_HEADER_VERSION, ATUIN_VERSION},
record::{EncryptedData, HostId, Record, RecordId},
record::{EncryptedData, HostId, Record, RecordIdx},
};
use semver::Version;
use time::format_description::well_known::Rfc3339;
Expand Down Expand Up @@ -267,10 +267,18 @@ impl<'a> Client<'a> {
}

pub async fn post_records(&self, records: &[Record<EncryptedData>]) -> Result<()> {
let url = format!("{}/record", self.sync_addr);
let url = format!("{}/api/v0/record", self.sync_addr);
let url = Url::parse(url.as_str())?;

self.client.post(url).json(records).send().await?;
let resp = self.client.post(url).json(records).send().await?;
info!("posted records, got {}", resp.status());

if !resp.status().is_success() {
error!(
"failed to post records to server; got: {:?}",
resp.text().await
);
}

Ok(())
}
Expand All @@ -279,24 +287,22 @@ impl<'a> Client<'a> {
&self,
host: HostId,
tag: String,
start: Option<RecordId>,
start: RecordIdx,
count: u64,
) -> Result<Vec<Record<EncryptedData>>> {
debug!(
"fetching record/s from host {}/{}/{}",
host.0.to_string(),
tag,
start
);

let url = format!(
"{}/record/next?host={}&tag={}&count={}",
self.sync_addr, host.0, tag, count
"{}/api/v0/record/next?host={}&tag={}&count={}&start={}",
self.sync_addr, host.0, tag, count, start
);
let mut url = Url::parse(url.as_str())?;

if let Some(start) = start {
url.set_query(Some(
format!(
"host={}&tag={}&count={}&start={}",
host.0, tag, count, start.0
)
.as_str(),
));
}

let url = Url::parse(url.as_str())?;

let resp = self.client.get(url).send().await?;

Expand All @@ -305,8 +311,8 @@ impl<'a> Client<'a> {
Ok(records)
}

pub async fn record_index(&self) -> Result<RecordIndex> {
let url = format!("{}/record", self.sync_addr);
pub async fn record_status(&self) -> Result<RecordStatus> {
let url = format!("{}/api/v0/record", self.sync_addr);
let url = Url::parse(url.as_str())?;

let resp = self.client.get(url).send().await?;
Expand All @@ -317,6 +323,8 @@ impl<'a> Client<'a> {

let index = resp.json().await?;

debug!("got remote index {:?}", index);

Ok(index)
}

Expand Down
210 changes: 209 additions & 1 deletion atuin-client/src/history.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
use rmp::decode::ValueReadError;
use rmp::{decode::Bytes, Marker};
use std::env;

use atuin_common::record::DecryptedData;
use atuin_common::utils::uuid_v7;

use eyre::{bail, eyre, Result};
use regex::RegexSet;

use crate::{secrets::SECRET_PATTERNS, settings::Settings};
use time::OffsetDateTime;

mod builder;
pub mod store;

const HISTORY_VERSION: &str = "v0";
const HISTORY_TAG: &str = "history";

/// Client-side history entry.
///
Expand Down Expand Up @@ -81,6 +90,108 @@ impl History {
}
}

pub fn serialize(&self) -> Result<DecryptedData> {
// This is pretty much the same as what we used for the old history, with one difference -
// it uses integers for timestamps rather than a string format.

use rmp::encode;

let mut output = vec![];

// write the version
encode::write_u16(&mut output, 0)?;
// INFO: ensure this is updated when adding new fields
encode::write_array_len(&mut output, 9)?;

encode::write_str(&mut output, &self.id)?;
encode::write_u64(&mut output, self.timestamp.unix_timestamp_nanos() as u64)?;
encode::write_sint(&mut output, self.duration)?;
encode::write_sint(&mut output, self.exit)?;
encode::write_str(&mut output, &self.command)?;
encode::write_str(&mut output, &self.cwd)?;
encode::write_str(&mut output, &self.session)?;
encode::write_str(&mut output, &self.hostname)?;

match self.deleted_at {
Some(d) => encode::write_u64(&mut output, d.unix_timestamp_nanos() as u64)?,
None => encode::write_nil(&mut output)?,
}

Ok(DecryptedData(output))
}

fn deserialize_v0(bytes: &[u8]) -> Result<History> {
use rmp::decode;

fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report {
eyre!("{err:?}")
}

let mut bytes = Bytes::new(bytes);

let version = decode::read_u16(&mut bytes).map_err(error_report)?;

if version != 0 {
bail!("expected decoding v0 record, found v{version}");
}

let nfields = decode::read_array_len(&mut bytes).map_err(error_report)?;

if nfields != 9 {
bail!("cannot decrypt history from a different version of Atuin");
}

let bytes = bytes.remaining_slice();
let (id, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;

let mut bytes = Bytes::new(bytes);
let timestamp = decode::read_u64(&mut bytes).map_err(error_report)?;
let duration = decode::read_int(&mut bytes).map_err(error_report)?;
let exit = decode::read_int(&mut bytes).map_err(error_report)?;

let bytes = bytes.remaining_slice();
let (command, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
let (cwd, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
let (session, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
let (hostname, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;

// if we have more fields, try and get the deleted_at
let mut bytes = Bytes::new(bytes);

let (deleted_at, bytes) = match decode::read_u64(&mut bytes) {
Ok(unix) => (Some(unix), bytes.remaining_slice()),
// we accept null here
Err(ValueReadError::TypeMismatch(Marker::Null)) => (None, bytes.remaining_slice()),
Err(err) => return Err(error_report(err)),
};

if !bytes.is_empty() {
bail!("trailing bytes in encoded history. malformed")
}

Ok(History {
id: id.to_owned(),
timestamp: OffsetDateTime::from_unix_timestamp_nanos(timestamp as i128)?,
duration,
exit,
command: command.to_owned(),
cwd: cwd.to_owned(),
session: session.to_owned(),
hostname: hostname.to_owned(),
deleted_at: deleted_at
.map(|t| OffsetDateTime::from_unix_timestamp_nanos(t as i128))
.transpose()?,
})
}

pub fn deserialize(bytes: &[u8], version: &str) -> Result<History> {
match version {
HISTORY_VERSION => Self::deserialize_v0(bytes),

_ => bail!("unknown version {version:?}"),
}
}

/// Builder for a history entry that is imported from shell history.
///
/// The only two required fields are `timestamp` and `command`.
Expand Down Expand Up @@ -202,8 +313,9 @@ impl History {
#[cfg(test)]
mod tests {
use regex::RegexSet;
use time::macros::datetime;

use crate::settings::Settings;
use crate::{history::HISTORY_VERSION, settings::Settings};

use super::History;

Expand Down Expand Up @@ -274,4 +386,100 @@ mod tests {

assert!(stripe_key.should_save(&settings));
}

#[test]
fn test_serialize_deserialize() {
let bytes = [
205, 0, 0, 153, 217, 32, 54, 54, 100, 49, 54, 99, 98, 101, 101, 55, 99, 100, 52, 55,
53, 51, 56, 101, 53, 99, 53, 98, 56, 98, 52, 52, 101, 57, 48, 48, 54, 101, 207, 23, 99,
98, 117, 24, 210, 246, 128, 206, 2, 238, 210, 240, 0, 170, 103, 105, 116, 32, 115, 116,
97, 116, 117, 115, 217, 42, 47, 85, 115, 101, 114, 115, 47, 99, 111, 110, 114, 97, 100,
46, 108, 117, 100, 103, 97, 116, 101, 47, 68, 111, 99, 117, 109, 101, 110, 116, 115,
47, 99, 111, 100, 101, 47, 97, 116, 117, 105, 110, 217, 32, 98, 57, 55, 100, 57, 97,
51, 48, 54, 102, 50, 55, 52, 52, 55, 51, 97, 50, 48, 51, 100, 50, 101, 98, 97, 52, 49,
102, 57, 52, 53, 55, 187, 102, 118, 102, 103, 57, 51, 54, 99, 48, 107, 112, 102, 58,
99, 111, 110, 114, 97, 100, 46, 108, 117, 100, 103, 97, 116, 101, 192,
];

let history = History {
id: "66d16cbee7cd47538e5c5b8b44e9006e".to_owned(),
timestamp: datetime!(2023-05-28 18:35:40.633872 +00:00),
duration: 49206000,
exit: 0,
command: "git status".to_owned(),
cwd: "/Users/conrad.ludgate/Documents/code/atuin".to_owned(),
session: "b97d9a306f274473a203d2eba41f9457".to_owned(),
hostname: "fvfg936c0kpf:conrad.ludgate".to_owned(),
deleted_at: None,
};

let serialized = history.serialize().expect("failed to serialize history");
assert_eq!(serialized.0, bytes);

let deserialized = History::deserialize(&serialized.0, HISTORY_VERSION)
.expect("failed to deserialize history");
assert_eq!(history, deserialized);

// test the snapshot too
let deserialized =
History::deserialize(&bytes, HISTORY_VERSION).expect("failed to deserialize history");
assert_eq!(history, deserialized);
}

#[test]
fn test_serialize_deserialize_deleted() {
let history = History {
id: "66d16cbee7cd47538e5c5b8b44e9006e".to_owned(),
timestamp: datetime!(2023-05-28 18:35:40.633872 +00:00),
duration: 49206000,
exit: 0,
command: "git status".to_owned(),
cwd: "/Users/conrad.ludgate/Documents/code/atuin".to_owned(),
session: "b97d9a306f274473a203d2eba41f9457".to_owned(),
hostname: "fvfg936c0kpf:conrad.ludgate".to_owned(),
deleted_at: Some(datetime!(2023-11-19 20:18 +00:00)),
};

let serialized = history.serialize().expect("failed to serialize history");

let deserialized = History::deserialize(&serialized.0, HISTORY_VERSION)
.expect("failed to deserialize history");

assert_eq!(history, deserialized);
}

#[test]
fn test_serialize_deserialize_version() {
// v0
let bytes_v0 = [
205, 0, 0, 153, 217, 32, 54, 54, 100, 49, 54, 99, 98, 101, 101, 55, 99, 100, 52, 55,
53, 51, 56, 101, 53, 99, 53, 98, 56, 98, 52, 52, 101, 57, 48, 48, 54, 101, 207, 23, 99,
98, 117, 24, 210, 246, 128, 206, 2, 238, 210, 240, 0, 170, 103, 105, 116, 32, 115, 116,
97, 116, 117, 115, 217, 42, 47, 85, 115, 101, 114, 115, 47, 99, 111, 110, 114, 97, 100,
46, 108, 117, 100, 103, 97, 116, 101, 47, 68, 111, 99, 117, 109, 101, 110, 116, 115,
47, 99, 111, 100, 101, 47, 97, 116, 117, 105, 110, 217, 32, 98, 57, 55, 100, 57, 97,
51, 48, 54, 102, 50, 55, 52, 52, 55, 51, 97, 50, 48, 51, 100, 50, 101, 98, 97, 52, 49,
102, 57, 52, 53, 55, 187, 102, 118, 102, 103, 57, 51, 54, 99, 48, 107, 112, 102, 58,
99, 111, 110, 114, 97, 100, 46, 108, 117, 100, 103, 97, 116, 101, 192,
];

// some other version
let bytes_v1 = [
205, 1, 0, 153, 217, 32, 54, 54, 100, 49, 54, 99, 98, 101, 101, 55, 99, 100, 52, 55,
53, 51, 56, 101, 53, 99, 53, 98, 56, 98, 52, 52, 101, 57, 48, 48, 54, 101, 207, 23, 99,
98, 117, 24, 210, 246, 128, 206, 2, 238, 210, 240, 0, 170, 103, 105, 116, 32, 115, 116,
97, 116, 117, 115, 217, 42, 47, 85, 115, 101, 114, 115, 47, 99, 111, 110, 114, 97, 100,
46, 108, 117, 100, 103, 97, 116, 101, 47, 68, 111, 99, 117, 109, 101, 110, 116, 115,
47, 99, 111, 100, 101, 47, 97, 116, 117, 105, 110, 217, 32, 98, 57, 55, 100, 57, 97,
51, 48, 54, 102, 50, 55, 52, 52, 55, 51, 97, 50, 48, 51, 100, 50, 101, 98, 97, 52, 49,
102, 57, 52, 53, 55, 187, 102, 118, 102, 103, 57, 51, 54, 99, 48, 107, 112, 102, 58,
99, 111, 110, 114, 97, 100, 46, 108, 117, 100, 103, 97, 116, 101, 192,
];

let deserialized = History::deserialize(&bytes_v0, HISTORY_VERSION);
assert!(deserialized.is_ok());

let deserialized = History::deserialize(&bytes_v1, HISTORY_VERSION);
assert!(deserialized.is_err());
}
}