diff --git a/Cargo.toml b/Cargo.toml index c4cb9c124..98b8de21b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ members = [ "solana", "task_manager", "hex_assignments", - "aws_local" + "aws_local", ] resolver = "2" @@ -64,7 +64,7 @@ sqlx = { version = "0.8", default-features = false, features = [ "macros", "runtime-tokio-rustls", ] } -helium-crypto = { version = "0.9.2", features = ["multisig", "sqlx-postgres"] } +helium-crypto = { version = "0.9.2", default-features = false } hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [ "disktree", ] } diff --git a/file_store/Cargo.toml b/file_store/Cargo.toml index f1cba02f1..41fb754fc 100644 --- a/file_store/Cargo.toml +++ b/file_store/Cargo.toml @@ -7,48 +7,48 @@ authors.workspace = true license.workspace = true [dependencies] -anyhow = {workspace = true} -clap = {workspace = true} -config = {workspace = true} -serde = {workspace = true} -serde_json = {workspace = true} -thiserror = {workspace = true} +anyhow = { workspace = true } +clap = { workspace = true } +config = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } -tokio-stream = {workspace = true} -triggered = {workspace = true} -async-compression = {version = "0", features = ["tokio", "gzip"]} -futures = {workspace = true} -futures-util = {workspace = true} -prost = {workspace = true} +tokio-stream = { workspace = true } +triggered = { workspace = true } +async-compression = { version = "0", features = ["tokio", "gzip"] } +futures = { workspace = true } +futures-util = { workspace = true } +prost = { workspace = true } bytes = "*" regex = "1" -lazy_static = {workspace = true} +lazy_static = { workspace = true } tracing = { workspace = true } chrono = { workspace = true } -helium-proto = {workspace = true} -helium-crypto = {workspace = true} +helium-proto = { workspace = true } +helium-crypto = { workspace = true } csv = "*" -http = {workspace = true} -aws-config = {workspace = true} -aws-sdk-s3 = {workspace = true} -aws-types = {workspace = true, optional = true} -strum = {version = "0", features = ["derive"]} +http = { workspace = true } +aws-config = { workspace = true } +aws-sdk-s3 = { workspace = true } +aws-types = { workspace = true, optional = true } +strum = { version = "0", features = ["derive"] } strum_macros = "0" -sha2 = {workspace = true} -metrics = {workspace = true } -blake3 = {workspace = true} +sha2 = { workspace = true } +metrics = { workspace = true } +blake3 = { workspace = true } poc-metrics = { path = "../metrics" } -rust_decimal = {workspace = true} -rust_decimal_macros = {workspace = true} -base64 = {workspace = true} -beacon = {workspace = true} -sqlx = {workspace = true, optional = true} -async-trait = {workspace = true} -derive_builder = {workspace = true} -retainer = {workspace = true} -uuid = {workspace = true} -h3o = {workspace = true} +rust_decimal = { workspace = true } +rust_decimal_macros = { workspace = true } +base64 = { workspace = true } +beacon = { workspace = true } +sqlx = { workspace = true, optional = true } +async-trait = { workspace = true } +derive_builder = { workspace = true } +retainer = { workspace = true } +uuid = { workspace = true } +h3o = { workspace = true } task-manager = { path = "../task_manager" } [dev-dependencies] diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index 2c9e4beb2..23e87045c 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -5,7 +5,6 @@ use derive_builder::Builder; use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt}; use futures_util::TryFutureExt; use retainer::Cache; -use sqlx::PgPool; use std::{collections::VecDeque, marker::PhantomData, sync::Arc, time::Duration}; use task_manager::ManagedTask; use tokio::sync::mpsc::{Receiver, Sender}; @@ -134,7 +133,7 @@ pub struct FileInfoPollerConfig { #[derive(Clone)] pub struct FileInfoPollerServer< Message, - State = PgPool, + State, Store = FileStore, Parser = MsgDecodeFileInfoPollerParser, > { @@ -429,13 +428,15 @@ impl FileInfoPollerStore for FileStore { } #[cfg(feature = "sqlx-postgres")] -use sqlx::postgres::PgQueryResult; +pub mod sqlx_postgres { + use super::*; -#[cfg(feature = "sqlx-postgres")] -#[async_trait::async_trait] -impl FileInfoPollerStateRecorder for sqlx::Transaction<'_, sqlx::Postgres> { - async fn record(&mut self, process_name: &str, file_info: &FileInfo) -> Result { - sqlx::query( + use sqlx::postgres::PgQueryResult; + + #[async_trait::async_trait] + impl FileInfoPollerStateRecorder for sqlx::Transaction<'_, sqlx::Postgres> { + async fn record(&mut self, process_name: &str, file_info: &FileInfo) -> Result { + sqlx::query( r#" INSERT INTO files_processed(process_name, file_name, file_type, file_timestamp, processed_at) VALUES($1, $2, $3, $4, $5) "#) @@ -448,18 +449,17 @@ impl FileInfoPollerStateRecorder for sqlx::Transaction<'_, sqlx::Postgres> { .await .map(|_| ()) .map_err(Error::from) + } } -} -#[cfg(feature = "sqlx-postgres")] -#[async_trait::async_trait] -impl FileInfoPollerState for sqlx::Pool { - async fn latest_timestamp( - &self, - process_name: &str, - file_type: &str, - ) -> Result>> { - sqlx::query_scalar::<_, Option>>( + #[async_trait::async_trait] + impl FileInfoPollerState for sqlx::Pool { + async fn latest_timestamp( + &self, + process_name: &str, + file_type: &str, + ) -> Result>> { + sqlx::query_scalar::<_, Option>>( r#" SELECT MAX(file_timestamp) FROM files_processed where process_name = $1 and file_type = $2 "#, @@ -469,10 +469,10 @@ impl FileInfoPollerState for sqlx::Pool { .fetch_one(self) .await .map_err(Error::from) - } + } - async fn exists(&self, process_name: &str, file_info: &FileInfo) -> Result { - sqlx::query_scalar::<_, bool>( + async fn exists(&self, process_name: &str, file_info: &FileInfo) -> Result { + sqlx::query_scalar::<_, bool>( r#" SELECT EXISTS(SELECT 1 from files_processed where process_name = $1 and file_name = $2) "#, @@ -482,210 +482,211 @@ impl FileInfoPollerState for sqlx::Pool { .fetch_one(self) .await .map_err(Error::from) - } + } - async fn clean( - &self, - process_name: &str, - file_type: &str, - offset: DateTime, - ) -> Result { - let t100_timestamp: Option> = sqlx::query_scalar( - r#" - SELECT file_timestamp - FROM files_processed - WHERE process_name = $1 - AND file_type = $2 - ORDER BY file_timestamp DESC - LIMIT 1 OFFSET 100; - "#, - ) - .bind(process_name) - .bind(file_type) - .fetch_optional(self) - .await?; + async fn clean( + &self, + process_name: &str, + file_type: &str, + offset: DateTime, + ) -> Result { + let t100_timestamp: Option> = sqlx::query_scalar( + r#" + SELECT file_timestamp + FROM files_processed + WHERE process_name = $1 + AND file_type = $2 + ORDER BY file_timestamp DESC + LIMIT 1 OFFSET 100; + "#, + ) + .bind(process_name) + .bind(file_type) + .fetch_optional(self) + .await?; - let Some(t100) = t100_timestamp else { - // The cleaning limit has not been reached, remove nothing. - return Ok(0); - }; + let Some(t100) = t100_timestamp else { + // The cleaning limit has not been reached, remove nothing. + return Ok(0); + }; - // To keep from reprocessing files, we need to make sure rows that exist - // within the offset window are not removed. - let older_than_limit = t100.min(offset); + // To keep from reprocessing files, we need to make sure rows that exist + // within the offset window are not removed. + let older_than_limit = t100.min(offset); + + let query_result: PgQueryResult = sqlx::query( + r#" + DELETE FROM files_processed + WHERE process_name = $1 + AND file_type = $2 + AND file_timestamp < $3 + "#, + ) + .bind(process_name) + .bind(file_type) + .bind(older_than_limit) + .execute(self) + .await + .map_err(Error::from)?; - let query_result: PgQueryResult = sqlx::query( - r#" - DELETE FROM files_processed - WHERE process_name = $1 - AND file_type = $2 - AND file_timestamp < $3 - "#, - ) - .bind(process_name) - .bind(file_type) - .bind(older_than_limit) - .execute(self) - .await - .map_err(Error::from)?; - - Ok(query_result.rows_affected()) + Ok(query_result.rows_affected()) + } } -} -#[cfg(test)] -mod tests { + #[cfg(test)] + mod tests { - use sqlx::{Executor, PgPool}; - use std::time::Duration; - use tokio::time::timeout; + use sqlx::{Executor, PgPool}; + use std::time::Duration; + use tokio::time::timeout; - use super::*; + use super::*; - struct TestParser; - struct TestStore(Vec); + struct TestParser; + struct TestStore(Vec); - #[async_trait::async_trait] - impl FileInfoPollerParser for TestParser { - async fn parse(&self, _byte_stream: ByteStream) -> Result> { - Ok(vec![]) + #[async_trait::async_trait] + impl FileInfoPollerParser for TestParser { + async fn parse(&self, _byte_stream: ByteStream) -> Result> { + Ok(vec![]) + } } - } - #[async_trait::async_trait] - impl FileInfoPollerStore for TestStore { - async fn list_all( - &self, - _file_type: &str, - after: A, - before: B, - ) -> Result> - where - A: Into>> + Send + Sync + Copy, - B: Into>> + Send + Sync + Copy, - { - let after = after.into(); - let before = before.into(); - - Ok(self - .0 - .clone() - .into_iter() - .filter(|file_info| after.is_none_or(|v| file_info.timestamp > v)) - .filter(|file_info| before.is_none_or(|v| file_info.timestamp <= v)) - .collect()) - } + #[async_trait::async_trait] + impl FileInfoPollerStore for TestStore { + async fn list_all( + &self, + _file_type: &str, + after: A, + before: B, + ) -> Result> + where + A: Into>> + Send + Sync + Copy, + B: Into>> + Send + Sync + Copy, + { + let after = after.into(); + let before = before.into(); + + Ok(self + .0 + .clone() + .into_iter() + .filter(|file_info| after.is_none_or(|v| file_info.timestamp > v)) + .filter(|file_info| before.is_none_or(|v| file_info.timestamp <= v)) + .collect()) + } - async fn get_raw(&self, _key: K) -> Result - where - K: Into + Send + Sync, - { - Ok(ByteStream::default()) + async fn get_raw(&self, _key: K) -> Result + where + K: Into + Send + Sync, + { + Ok(ByteStream::default()) + } } - } - - #[sqlx::test] - async fn do_not_reprocess_files_when_offset_exceeds_earliest_file( - pool: PgPool, - ) -> anyhow::Result<()> { - // Cleaning the files_processed table should not cause files within the - // `FileInfoPoller.config.offset` window to be reprocessed. - // There is no auto-migration for tests in this lib workspace. - pool.execute( - r#" - CREATE TABLE files_processed ( - process_name TEXT NOT NULL DEFAULT 'default', - file_name VARCHAR PRIMARY KEY, - file_type VARCHAR NOT NULL, - file_timestamp TIMESTAMPTZ NOT NULL, - processed_at TIMESTAMPTZ NOT NULL - ); - "#, - ) - .await?; - - // The important aspect of this test is that all the files to be - // processed happen _within_ the lookback offset. - const EXPECTED_FILE_COUNT: i64 = 150; - let mut infos = vec![]; - for seconds in 0..EXPECTED_FILE_COUNT { - let file_info = FileInfo { - key: format!("key-{seconds}"), - prefix: "file_type".to_string(), - timestamp: Utc::now() - chrono::Duration::seconds(seconds), - size: 42, - }; - infos.push(file_info); - } + #[sqlx::test] + async fn do_not_reprocess_files_when_offset_exceeds_earliest_file( + pool: PgPool, + ) -> anyhow::Result<()> { + // Cleaning the files_processed table should not cause files within the + // `FileInfoPoller.config.offset` window to be reprocessed. + + // There is no auto-migration for tests in this lib workspace. + pool.execute( + r#" + CREATE TABLE files_processed ( + process_name TEXT NOT NULL DEFAULT 'default', + file_name VARCHAR PRIMARY KEY, + file_type VARCHAR NOT NULL, + file_timestamp TIMESTAMPTZ NOT NULL, + processed_at TIMESTAMPTZ NOT NULL + ); + "#, + ) + .await?; - // To simulate a restart, we're going to make a new FileInfoPoller. - // This closure is to ensure they have the same settings. - let file_info_builder = || { - let six_hours = chrono::Duration::hours(6).to_std().unwrap(); - FileInfoPollerConfigBuilder::::default() - .parser(TestParser) - .state(pool.clone()) - .store(TestStore(infos.clone())) - .lookback(LookbackBehavior::Max(six_hours)) - .prefix("file_type".to_string()) - .offset(six_hours) - .create() - }; - - // The first startup of the file info poller, there is nothing to clean. - // And all file_infos will be returned to be processed. - let (mut receiver, ingest_server) = file_info_builder().await?; - let (trigger, shutdown) = triggered::trigger(); - tokio::spawn(async move { - if let Err(status) = ingest_server.run(shutdown).await { - println!("ingest server went down unexpectedly: {status:?}"); + // The important aspect of this test is that all the files to be + // processed happen _within_ the lookback offset. + const EXPECTED_FILE_COUNT: i64 = 150; + let mut infos = vec![]; + for seconds in 0..EXPECTED_FILE_COUNT { + let file_info = FileInfo { + key: format!("key-{seconds}"), + prefix: "file_type".to_string(), + timestamp: Utc::now() - chrono::Duration::seconds(seconds), + size: 42, + }; + infos.push(file_info); } - }); - - // "process" all the files. They are not recorded into the database - // until the file is consumed as a stream. - let mut processed = 0; - while processed < EXPECTED_FILE_COUNT { - match timeout(Duration::from_secs(1), receiver.recv()).await? { - Some(msg) => { - processed += 1; - let mut txn = pool.begin().await?; - let _x = msg.into_stream(&mut txn).await?; - txn.commit().await?; - } - err => panic!("something went wrong: {err:?}"), + + // To simulate a restart, we're going to make a new FileInfoPoller. + // This closure is to ensure they have the same settings. + let file_info_builder = || { + let six_hours = chrono::Duration::hours(6).to_std().unwrap(); + FileInfoPollerConfigBuilder::::default() + .parser(TestParser) + .state(pool.clone()) + .store(TestStore(infos.clone())) + .lookback(LookbackBehavior::Max(six_hours)) + .prefix("file_type".to_string()) + .offset(six_hours) + .create() }; - } - // Shutdown the ingest server, we're going to create a new one and start it. - trigger.trigger(); - - // The second startup of the file info poller, there are 100+ files that - // have been processed. The initial clean should not remove processed - // files in a way that causes us to re-receive any files within our - // offset for processing. - let (mut receiver, ingest_server) = file_info_builder().await?; - let (trigger, shutdown) = triggered::trigger(); - let _handle = tokio::spawn(async move { - if let Err(status) = ingest_server.run(shutdown).await { - println!("ingest server went down unexpectedly: {status:?}"); + // The first startup of the file info poller, there is nothing to clean. + // And all file_infos will be returned to be processed. + let (mut receiver, ingest_server) = file_info_builder().await?; + let (trigger, shutdown) = triggered::trigger(); + tokio::spawn(async move { + if let Err(status) = ingest_server.run(shutdown).await { + println!("ingest server went down unexpectedly: {status:?}"); + } + }); + + // "process" all the files. They are not recorded into the database + // until the file is consumed as a stream. + let mut processed = 0; + while processed < EXPECTED_FILE_COUNT { + match timeout(Duration::from_secs(1), receiver.recv()).await? { + Some(msg) => { + processed += 1; + let mut txn = pool.begin().await?; + let _x = msg.into_stream(&mut txn).await?; + txn.commit().await?; + } + err => panic!("something went wrong: {err:?}"), + }; } - }); - - // Attempting to recieve files for processing. The timeout should fire, - // because all the files we have setup exist within the offset, and - // should still be in the database. - match timeout(Duration::from_secs(1), receiver.recv()).await { - Err(_err) => (), - Ok(msg) => { - panic!("we got something when we expected nothing.: {msg:?}"); + + // Shutdown the ingest server, we're going to create a new one and start it. + trigger.trigger(); + + // The second startup of the file info poller, there are 100+ files that + // have been processed. The initial clean should not remove processed + // files in a way that causes us to re-receive any files within our + // offset for processing. + let (mut receiver, ingest_server) = file_info_builder().await?; + let (trigger, shutdown) = triggered::trigger(); + let _handle = tokio::spawn(async move { + if let Err(status) = ingest_server.run(shutdown).await { + println!("ingest server went down unexpectedly: {status:?}"); + } + }); + + // Attempting to recieve files for processing. The timeout should fire, + // because all the files we have setup exist within the offset, and + // should still be in the database. + match timeout(Duration::from_secs(1), receiver.recv()).await { + Err(_err) => (), + Ok(msg) => { + panic!("we got something when we expected nothing.: {msg:?}"); + } } - } - // Shut down for great good - trigger.trigger(); + // Shut down for great good + trigger.trigger(); - Ok(()) + Ok(()) + } } } diff --git a/file_store/src/mobile_ban.rs b/file_store/src/mobile_ban.rs index a517fa6fe..1f6533f02 100644 --- a/file_store/src/mobile_ban.rs +++ b/file_store/src/mobile_ban.rs @@ -2,11 +2,12 @@ use std::str::FromStr; use chrono::{DateTime, Utc}; use helium_crypto::PublicKeyBinary; -use sqlx::PgPool; use crate::{ error::DecodeError, - file_info_poller::{FileInfoPollerServer, FileInfoStream, LookbackBehavior}, + file_info_poller::{ + FileInfoPollerServer, FileInfoPollerState, FileInfoStream, LookbackBehavior, + }, file_sink::FileSinkClient, traits::{FileSinkWriteExt, MsgDecode, TimestampDecode, TimestampEncode}, Error, FileSink, FileStore, @@ -298,11 +299,11 @@ pub async fn verified_report_sink( .await } -pub async fn report_source( - pool: PgPool, +pub async fn report_source( + pool: State, file_store: FileStore, start_after: DateTime, -) -> crate::Result<(BanReportSource, FileInfoPollerServer)> { +) -> crate::Result<(BanReportSource, FileInfoPollerServer)> { crate::file_source::continuous_source() .state(pool) .store(file_store) @@ -312,13 +313,13 @@ pub async fn report_source( .await } -pub async fn verified_report_source( - pool: PgPool, +pub async fn verified_report_source( + pool: State, file_store: FileStore, start_after: DateTime, ) -> crate::Result<( VerifiedBanReportSource, - FileInfoPollerServer, + FileInfoPollerServer, )> { crate::file_source::continuous_source() .state(pool)