-
Notifications
You must be signed in to change notification settings - Fork 35
[Mobile config] Add v2 endpoints to GatewayInfoV2. Change min_refreshed_at to min_updated_at
#910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
2898d4b to
7393d81
Compare
7393d81 to
4f4cc9b
Compare
mobile_config/src/gateway_info.rs
Outdated
| let rows: Vec<Vec<u8>> = sqlx::query_scalar(GET_UPDATED_RADIOS) | ||
| .bind(min_updated_at) | ||
| .fetch_all(db) | ||
| .await?; | ||
| let mut radios = HashSet::new(); | ||
|
|
||
| for row in rows { | ||
| let entity_key_b: &[u8] = &row; | ||
| let entity_key = bs58::encode(entity_key_b).into_string(); | ||
| let pk = PublicKeyBinary::from_str(&entity_key)?; | ||
| radios.insert(pk); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| let rows: Vec<Vec<u8>> = sqlx::query_scalar(GET_UPDATED_RADIOS) | |
| .bind(min_updated_at) | |
| .fetch_all(db) | |
| .await?; | |
| let mut radios = HashSet::new(); | |
| for row in rows { | |
| let entity_key_b: &[u8] = &row; | |
| let entity_key = bs58::encode(entity_key_b).into_string(); | |
| let pk = PublicKeyBinary::from_str(&entity_key)?; | |
| radios.insert(pk); | |
| } | |
| let radios: HashSet<PublicKeyBinary> = sqlx::query_scalar(GET_UPDATED_RADIOS) | |
| .bind(min_updated_at) | |
| .fetch(db) | |
| .await | |
| .map(|row| { | |
| let entity_key_b: &[u8] = &row; | |
| let entity_key = bs58::encode(entity_key_b).into_string(); | |
| PublicKeyBinary::from_str(&entity_key) | |
| }) | |
| .collect::<Result<HashSet<PublicKeyBinary>, helium_crypto::Error>>()?; |
this probably necessitates importing StreamExt or similar but avoids the extra allocations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
helium-crypto-rs provides a sqlx mapping for PublicKeyBinary https://github.com/helium/helium-crypto-rs/blob/main/src/public_key_binary.rs#L115
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @michaeldjeffrey thanks for information. The helium-crypto-rs provides a sqlx mapping for PublicKeyBinary which stored in database as string. In our case pubkey is stored as a bytea type. So when I try to use helium-crypto-rs
mapping like below
Ok(sqlx::query_as::<_, (PublicKeyBinary,)>(GET_UPDATED_RADIOS)
.bind(min_updated_at)
.fetch(db)
.map_ok(|(entity_key,)| entity_key)
.try_collect::<HashSet<_>>()
.await?)I've got an error: mismatched types; Rust type `helium_crypto::public_key_binary::PublicKeyBinary` (as SQL type `text`) is not compatible with SQL type BYTEA
To make it work we need to extend decode function in helium-crypto-rs https://github.com/helium/helium-crypto-rs/blob/main/src/public_key_binary.rs#L150 to make it support bytea. I think this is not in the scope of this PR and I'm not sure that it is worth since I have no information how often we store PublicKeyBinary in bytea type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No doubt. That's a good @madninja question. Anyways, carry on 👯
mobile_config/src/gateway_service.rs
Outdated
| self.verify_request_signature(&signer, &request)?; | ||
|
|
||
| let pool = self.metadata_pool.clone(); | ||
| let mc_pool = self.mobile_config_db_pool.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the original pool variable name made sense here as-is when it was the only database connection this function needed but in general i'd lean more towards the app's "owned" database connection more generically and the one "owned" by an outside service (the metadata db in this case) with more specificity the way we do in the config service's settings or else just give them both more specific names to avoid any confusion
mobile_config/src/gateway_service.rs
Outdated
| let stream = gateway_info::db::all_info_stream(&pool, &device_types); | ||
| if request.min_updated_at > 0 { | ||
| let min_updated_at = Utc | ||
| .timestamp_opt(request.min_updated_at as i64, 0) | ||
| .single() | ||
| .ok_or(Status::invalid_argument( | ||
| "Invalid min_refreshed_at argument", | ||
| ))?; | ||
|
|
||
| let updated_redios = get_updated_radios(&mc_pool, min_updated_at).await?; | ||
| let stream = stream | ||
| .filter(|v| future::ready(updated_redios.contains(&v.address))) | ||
| .boxed(); | ||
| stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) | ||
| .await | ||
| } else { | ||
| stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) | ||
| .await | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| let stream = gateway_info::db::all_info_stream(&pool, &device_types); | |
| if request.min_updated_at > 0 { | |
| let min_updated_at = Utc | |
| .timestamp_opt(request.min_updated_at as i64, 0) | |
| .single() | |
| .ok_or(Status::invalid_argument( | |
| "Invalid min_refreshed_at argument", | |
| ))?; | |
| let updated_redios = get_updated_radios(&mc_pool, min_updated_at).await?; | |
| let stream = stream | |
| .filter(|v| future::ready(updated_redios.contains(&v.address))) | |
| .boxed(); | |
| stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) | |
| .await | |
| } else { | |
| stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size) | |
| .await | |
| } | |
| let stream = | |
| if request.min_updated_at > 0 { | |
| let min_updated_at = Utc | |
| .timestamp_opt(request.min_updated_at as i64, 0) | |
| .single() | |
| .ok_or(Status::invalid_argument( | |
| "Invalid min_refreshed_at argument", | |
| ))?; | |
| let updated_redios = get_updated_radios(&mc_pool, min_updated_at).await?; | |
| gateway_info::db::all_info_stream(&pool, &device_types) | |
| .filter(|v| future::ready(updated_redios.contains(&v.address))) | |
| .boxed(); | |
| } else { | |
| gateway_info::db::all_info_stream(&pool, &device_types) | |
| }; | |
| stream_multi_gateways_info(stream, tx.clone(), signing_key.clone(), batch_size).await |
i don't know that this really makes a difference; i wanted to get it down to a single invocation of the stream but i don't think we can fully clean that up unless we added a no-op filter on the branch that doesn't check updated at so feel free to ignore here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried to do it in a similar way, but I'm facing incompatible types or borrow checker errors.
min_refreshed_at to min_updated_at in gateway_info_v2
4d30558 to
c2343a8
Compare
|
PR is converted back to draft |
4f187ce to
119ad63
Compare
0144fd1 to
df16f90
Compare
min_refreshed_at to min_updated_at in gateway_info_v2min_refreshed_at to min_updated_at
After #902 was implemented, there is now a
last_changed_atfield, which better fits the purpose compared torefreshed_at, aslast_changed_atsignals radio changes.This breaks
info_stream_v2backward compatibility, but it is expected that no clients are using v2 yet.Related PR: helium/proto#434
UPD: Added appropriate v2 endpoints that returns GatewayInfoV2 which uses v1 request version since nothing (from request side) has changed