Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions mobile_config/src/gateway/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl Gateway {
pub fn stream_by_addresses<'a>(
db: impl PgExecutor<'a> + 'a,
addresses: Vec<PublicKeyBinary>,
min_updated_at: DateTime<Utc>,
min_last_changed_at: DateTime<Utc>,
) -> impl Stream<Item = Self> + 'a {
let addr_array: Vec<Vec<u8>> = addresses.iter().map(|a| a.as_ref().to_vec()).collect();

Expand All @@ -293,11 +293,11 @@ impl Gateway {
location_asserts
FROM gateways
WHERE address = ANY($1)
AND (updated_at >= $2 OR refreshed_at >= $2 OR created_at >= $2)
AND last_changed_at >= $2
"#,
)
.bind(addr_array)
.bind(min_updated_at)
.bind(min_last_changed_at)
.fetch(db)
.map_err(anyhow::Error::from)
.filter_map(|res| async move { res.ok() })
Expand All @@ -306,7 +306,7 @@ impl Gateway {
pub fn stream_by_types<'a>(
db: impl PgExecutor<'a> + 'a,
types: Vec<GatewayType>,
min_date: DateTime<Utc>,
min_last_changed_at: DateTime<Utc>,
min_location_changed_at: Option<DateTime<Utc>>,
) -> impl Stream<Item = Self> + 'a {
sqlx::query_as::<_, Self>(
Expand All @@ -327,15 +327,15 @@ impl Gateway {
location_asserts
FROM gateways
WHERE gateway_type = ANY($1)
AND (updated_at >= $2 OR refreshed_at >= $2 OR created_at >= $2)
AND last_changed_at >= $2
AND (
$3::timestamptz IS NULL
OR (location IS NOT NULL AND location_changed_at >= $3)
)
"#,
)
.bind(types)
.bind(min_date)
.bind(min_last_changed_at)
.bind(min_location_changed_at)
.fetch(db)
.map_err(anyhow::Error::from)
Expand Down
10 changes: 5 additions & 5 deletions mobile_config/tests/integrations/gateway_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async fn gateway_bulk_upsert_updates_and_change(pool: PgPool) -> anyhow::Result<
}

#[sqlx::test]
async fn stream_by_addresses_filters_by_min_updated(pool: PgPool) -> anyhow::Result<()> {
async fn stream_by_addresses_filters_by_min_last_changed_at(pool: PgPool) -> anyhow::Result<()> {
let a1 = pk_binary();
let a2 = pk_binary();
let t0 = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
Expand All @@ -206,15 +206,15 @@ async fn stream_by_addresses_filters_by_min_updated(pool: PgPool) -> anyhow::Res
g2.hash = "y".into();
g2.insert(&pool).await?;

// min_updated_at = t2 => expect empty
// min_last_changed_at = t2 => expect empty
let s = Gateway::stream_by_addresses(&pool, vec![a1.clone(), a2.clone()], t2);
pin_mut!(s);
assert!(s.next().await.is_none());

// bump g1.updated_at to t2
// bump g1.last_changed_at to t2
let mut g1b = g1.clone();
g1b.updated_at = t2;
g1b.refreshed_at = t0;
g1b.hash = "h1".to_string();
g1b.refreshed_at = t2;
g1b.insert(&pool).await?;

// now we should see g1 only
Expand Down