Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ jobs:
--health-retries 5
ports:
- 5432:5432
localstack:
image: localstack/localstack:latest
environment:
SERVICES: s3
EAGER_SERVICE_LOADING: 1
ports:
- 4566:4566
steps:
- name: Install Dependencies
run: |
Expand Down Expand Up @@ -229,4 +236,5 @@ jobs:
RELEASE_VERSION: ${{ github.event.inputs.release_version }}
run: |
chmod +x ./.github/scripts/make_debian.sh
./.github/scripts/make_debian.sh
./.github/scripts/make_debian.sh

9 changes: 8 additions & 1 deletion .github/workflows/DockerCI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ jobs:
--health-retries 5
ports:
- 5432:5432
localstack:
image: localstack/localstack:latest
env:
SERVICES: s3
EAGER_SERVICE_LOADING: 1
ports:
- 4566:4566
steps:
- name: Checkout Repository
uses: actions/checkout@v4
Expand Down Expand Up @@ -218,4 +225,4 @@ jobs:
cargo test -p ${{ matrix.package }} -- --include-ignored

- name: Fix Permissions for Caching
run: sudo chown -R $(whoami):$(whoami) target
run: sudo chown -R $(whoami):$(whoami) target
43 changes: 43 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ members = [
"solana",
"task_manager",
"hex_assignments",
]
"aws_local"
, "dataset_downloader"]
resolver = "2"

[workspace.package]
Expand Down Expand Up @@ -113,6 +114,11 @@ tokio-util = "0"
uuid = { version = "1", features = ["v4", "serde"] }
tower-http = { version = "0", features = ["trace"] }
derive_builder = "0"
aws-config = "0.51"
aws-sdk-s3 = "0.21"
aws-types = { version = "0.51", features = ["hardcoded-credentials"]}
regex = "1"
async-compression = { version = "0", features = ["tokio", "gzip"] }

[patch.crates-io]
anchor-lang = { git = "https://github.com/madninja/anchor.git", branch = "madninja/const_pubkey" }
Expand Down
20 changes: 20 additions & 0 deletions aws_local/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "aws-local"
version = "0.1.0"
authors.workspace = true
license.workspace = true
edition.workspace = true

[dependencies]
aws-config = {workspace = true}
aws-sdk-s3 = {workspace = true}
aws-types = {workspace = true, features = ["hardcoded-credentials"]}
tokio = {workspace = true}
triggered = {workspace = true}
tonic = {workspace = true}
chrono = {workspace = true}
prost = {workspace = true}
anyhow = {workspace = true}
uuid = {workspace = true}
file-store = { path = "../file_store", features = ["local"] }

1 change: 1 addition & 0 deletions aws_local/src/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
It helps to run tests with [localstack](https://www.localstack.cloud/)
159 changes: 159 additions & 0 deletions aws_local/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
use anyhow::{anyhow, Result};
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::{Client, Endpoint, Region};
use chrono::Utc;
use file_store::traits::MsgBytes;
use file_store::{file_sink, file_upload, FileStore, FileType, Settings};
use std::path::Path;
use std::{str::FromStr, sync::Arc};
use tokio::sync::Mutex;
use tonic::transport::Uri;
use uuid::Uuid;

pub const AWSLOCAL_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:4566";

pub fn gen_bucket_name() -> String {
format!("mvr-{}-{}", Uuid::new_v4(), Utc::now().timestamp_millis())
}

// Interacts with the locastack.
// Used to create mocked aws buckets and files.
pub struct AwsLocal {
pub fs_settings: Settings,
pub file_store: FileStore,
pub aws_client: aws_sdk_s3::Client,
}

impl AwsLocal {
async fn create_aws_client(settings: &Settings) -> aws_sdk_s3::Client {
let endpoint: Option<Endpoint> = match &settings.endpoint {
Some(endpoint) => Uri::from_str(endpoint)
.map(Endpoint::immutable)
.map(Some)
.unwrap(),
_ => None,
};
let region = Region::new(settings.region.clone());
let region_provider = RegionProviderChain::first_try(region).or_default_provider();

let mut config = aws_config::from_env().region(region_provider);
config = config.endpoint_resolver(endpoint.unwrap());

let creds = aws_types::credentials::Credentials::from_keys(
settings.access_key_id.as_ref().unwrap(),
settings.secret_access_key.as_ref().unwrap(),
None,
);
config = config.credentials_provider(creds);

let config = config.load().await;

Client::new(&config)
}

pub async fn new(endpoint: &str, bucket: &str) -> AwsLocal {
let settings = Settings {
bucket: bucket.into(),
endpoint: Some(endpoint.into()),
region: "us-east-1".into(),
access_key_id: Some("random".into()),
secret_access_key: Some("random2".into()),
};
let client = Self::create_aws_client(&settings).await;
client.create_bucket().bucket(bucket).send().await.unwrap();
AwsLocal {
aws_client: client,
fs_settings: settings.clone(),
file_store: file_store::FileStore::from_settings(&settings)
.await
.unwrap(),
}
}
pub async fn put_proto_to_aws<T: prost::Message + MsgBytes>(
&self,
items: Vec<T>,
file_type: FileType,
metric_name: &'static str,
) -> Result<String> {
// Uuid uses as random to avoid colisions
let uuid: Uuid = Uuid::new_v4();
let dir_path = format!("/tmp/{}/{}", uuid, self.fs_settings.bucket);
let store_base_path = std::path::Path::new(&dir_path);
let (shutdown_trigger, shutdown_listener) = triggered::trigger();

let (file_upload, file_upload_server) =
file_upload::FileUpload::from_settings_tm(&self.fs_settings)
.await
.unwrap();

let (item_sink, item_server) =
file_sink::FileSinkBuilder::new(file_type, store_base_path, file_upload, metric_name)
.auto_commit(false)
.roll_time(std::time::Duration::new(15, 0))
.create::<T>()
.await
.unwrap();

for item in items {
item_sink.write(item, &[]).await.unwrap();
}
let item_recv = item_sink.commit().await.unwrap();

let dir_path_clone = dir_path.clone();
let uploaded_file = Arc::new(Mutex::new(String::default()));
let up_2 = uploaded_file.clone();
let mut timeout = std::time::Duration::new(5, 0);

tokio::spawn(async move {
let uploaded_files = item_recv.await.unwrap().unwrap();
assert!(uploaded_files.len() == 1);
let mut val = up_2.lock().await;
*val = uploaded_files.first().unwrap().to_string();

// After files uploaded to aws the must be removed.
// So we wait when dir will be empty.
// It means all files are uploaded to aws
loop {
if is_dir_has_files(&dir_path_clone) {
let dur = std::time::Duration::from_millis(10);
tokio::time::sleep(dur).await;
timeout -= dur;
continue;
}
break;
}

shutdown_trigger.trigger();
});

tokio::try_join!(
file_upload_server.run(shutdown_listener.clone()),
item_server.run(shutdown_listener.clone())
)
.unwrap();
std::fs::remove_dir_all(dir_path).unwrap();
let res = uploaded_file.lock().await;
Ok(res.clone())
}
pub async fn put_file_to_aws(&self, file_path: &Path) -> Result<()> {
let path_str = file_path.display();
if !file_path.exists() {
return Err(anyhow!("File {path_str} is absent"));
}
if !file_path.is_file() {
return Err(anyhow!("File {path_str} is not a file"));
}
self.file_store.put(file_path).await?;

Ok(())
}
}

fn is_dir_has_files(dir_path: &str) -> bool {
let entries = std::fs::read_dir(dir_path)
.unwrap()
.map(|res| res.map(|e| e.path().is_dir()))
.collect::<Result<Vec<_>, std::io::Error>>()
.unwrap();
entries.contains(&false)
}
28 changes: 28 additions & 0 deletions dataset_downloader/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "dataset-downloader"
version = "0.1.0"
authors.workspace = true
license.workspace = true
edition.workspace = true

[dependencies]
sqlx = { workspace = true }
anyhow = { workspace = true }
uuid = { workspace = true }
async-trait = { workspace = true }
regex = { workspace = true }
hextree = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
lazy_static = { workspace = true }
tracing = { workspace = true }
chrono = { workspace = true }
futures-util = { workspace = true }
async-compression = { workspace = true }
file-store = { path = "../file_store" }
hex-assignments = { path = "../hex_assignments" }

[dev-dependencies]
aws-local = { path = "../aws_local" }
tempfile = "3"

Loading