Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<% if (session_token=`bw unlock --raw`.strip) != "" %>
BUCKET_NAME=test
ACCOUNT_ID=<%= `bw get notes 11bc718f-87d4-4c45-a50a-0d5ad6b2b7e9 --session #{session_token}` %>
ACCESS_KEY_ID=<%= `bw get notes 740192af-45cd-499d-aa5a-ae11d810ed1c --session #{session_token}` %>
SECRET_ACCESS_KEY=<%= `bw get notes f242eea7-4273-41ae-9ca0-81ae3bda717c --session #{session_token}` %>
<% else raise ArgumentError, "session_token token missing" end %>
4 changes: 4 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
{
devShells = forEachSupportedSystem ({ pkgs }: {
default = pkgs.mkShell {
buildInputs = with pkgs; [] ++ (with pkgs.darwin.apple_sdk.frameworks; pkgs.lib.optionals pkgs.stdenv.isDarwin [
Security
]);
packages = with pkgs; [
sqlite
rustToolchain
openssl
pkg-config
Expand Down
4 changes: 4 additions & 0 deletions src/argument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ pub struct Argument {
/// The path to the database to backup
#[arg(long)]
pub db: String,

/// Data retention: will keep recent `n` record, the max is 255
#[arg(long, default_value = "30")]
pub data_retention: u8,
}
7 changes: 6 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ async fn run(arg: &argument::Argument, cfg: &Config) -> Result<()> {

// upload
let uploader = R2Uploader::new(arg, cfg).await;
uploader.upload_object(dest, src_file.filename).await?;
let (upload_res, retain_res) = tokio::join!(
uploader.upload_object(dest, src_file.filename),
uploader.retain(arg.data_retention, src_file.filename)
);
upload_res?;
retain_res?;

// close temp dir
tmp_dir.close()?;
Expand Down
67 changes: 61 additions & 6 deletions src/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use async_trait::async_trait;
use aws_sdk_s3::{
config::{Credentials, Region},
primitives::ByteStream,
types::{Delete, ObjectIdentifier},
Client,
};
use time::format_description;
Expand All @@ -16,7 +17,8 @@ use crate::{

#[async_trait]
pub trait Uploader {
async fn upload_object(&self, src_path: PathBuf, dest_name: &str) -> Result<()>;
async fn upload_object(&self, src_path: PathBuf, src_name: &str) -> Result<()>;
async fn retain(&self, data_retention: u8, src_name: &str) -> Result<()>;
}

pub struct R2Uploader {
Expand Down Expand Up @@ -59,17 +61,15 @@ impl R2Uploader {

#[async_trait]
impl Uploader for R2Uploader {
async fn upload_object(&self, src_path: PathBuf, dest_name: &str) -> Result<()> {
async fn upload_object(&self, src_path: PathBuf, src_name: &str) -> Result<()> {
let body = ByteStream::from_path(src_path)
.await
.context("create file stream")?;
let key = uuid::Uuid::new_v4();
let format = format_description::parse("[year]-[month]-[day]")?;
let today = time::OffsetDateTime::now_utc().format(&format)?;
let object_key = format!(
"{}/{}/{}/{today}__{key}",
self.app_env, self.project_name, dest_name,
);
let key_prefix = self.object_key_prefix(src_name);
let object_key = format!("{key_prefix}/{today}__{key}");

self.client
.put_object()
Expand All @@ -82,4 +82,59 @@ impl Uploader for R2Uploader {

Ok(())
}

async fn retain(&self, count: u8, src_name: &str) -> Result<()> {
let key_prefix = self.object_key_prefix(src_name);
let result = self
.client
.list_objects_v2()
.bucket(self.bucket.clone())
.prefix(key_prefix)
.send()
.await
.context("list bojects from r2")?;

// skip the task if the key count is less than the data_retention count
if result.key_count() < count as i32 {
return Ok(());
}

if let Some(objects) = result.contents() {
let deleted_count = objects.len() - count as usize;
let mut objects = objects.to_vec();
objects.sort_by(|a, b| {
let last_modified_a = a.last_modified().unwrap();
let last_modified_b = b.last_modified().unwrap();
last_modified_a.cmp(last_modified_b)
});
let deleted_objects = &objects[..deleted_count]
.iter()
.map(|obj| {
let key = obj.key().unwrap_or_default().to_string();
ObjectIdentifier::builder().set_key(Some(key)).build()
})
.collect::<Vec<_>>();
self.delete_objects(deleted_objects.clone()).await?;
}

Ok(())
}
}

impl R2Uploader {
fn object_key_prefix(&self, src_name: &str) -> String {
format!("{}/{}/{}", self.app_env, self.project_name, src_name)
}

async fn delete_objects(&self, deleted_objects: Vec<ObjectIdentifier>) -> Result<()> {
let delete_builder = Delete::builder().set_objects(Some(deleted_objects)).build();
self.client
.delete_objects()
.bucket(self.bucket.clone())
.delete(delete_builder)
.send()
.await
.context("delete objects")?;
Ok(())
}
}