-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Out of band email #67
Conversation
74fa29d
to
56da705
Compare
src/queue.rs
Outdated
[JobStatus::Pending.into(), OffsetDateTime::now_utc().into()], | ||
), | ||
|
||
backend @ DatabaseBackend::Sqlite => Statement::from_sql_and_values( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing strategy: as you noted in Slack, Postgres' locking behavior differs from SQLite's. Given how important locking is to a working queue implementation, we may want to switch to spinning up a Postgres container for relevant tests, as is done by Janus.
src/queue.rs
Outdated
backend @ DatabaseBackend::Postgres => Statement::from_sql_and_values( | ||
backend, | ||
r#"SELECT * FROM queue | ||
WHERE status = $1 AND (scheduled_at IS NULL OR scheduled_at < $2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This queue strategy works by holding a DB lock on the relevant row for the length of processing of the queue item, so e.g. for sending email it would hold a DB lock across the API call to Postmark.
This probably works for the scale we envision. But we might want to set an appropriately-long scheduled_at
value, process the record outside of a transaction (with a timeout based on scheduled_at
), then update status in a second transaction. (This is effectively a "lease" over the queue item.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the concern here that we will exhaust the connection pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, or that too-long transactions may lead to locking contention. (I think the touches to the queue table itself are unlikely to contend, as not contending is the entire purpose of the way the queries are written; but queries made while running the job itself might.)
Actually, now that I think about it, what isolation level are we running under for Postgres? I would hope for at least Repeatable Read; but if we're running at that level, a serialization failure at the very end of the transaction might lead to retrying processing of the job, which in practice would send a duplicate email. I think in that case, we need to find some way to avoid transaction retries necessitating retries of job-processing logic; one way to do so would be the suggested "lease" system, though there are other solutions too (we had previously discussed the queue states as being START
, IN_PROCESS
, DONE
, with the IN_PROCESS
state meaning "currently processing, don't pick me up, if I crash we get stuck here).
fn generate_password() -> String { | ||
Alphanumeric.sample_string(&mut rand::thread_rng(), 60) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our stylistic convention in Janus would be to do this as an impl Distribution<Password> for Standard
, as in AuthenticationToken
. You'd also then need a type like struct Password(String)
so you'd have something to hang the implementation on, which might not be a bad idea anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the advantage of this over the existing Distribution of Alphanumeric? Why reimplement code that exists in rand
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a stylistic convention we landed on. TBH I imagine there are several places where this project style isn't in line with janus
policies so I think we could move forward with this as it is, and consider aligning style with other Divvi Up projects in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The advantage of impl Distribution<Password> for Standard
, if Password
is an existing type, is that this is how the rand
crate "expects" to be used & it allows this type to fit in better with other uses of the API -- various other places in rand
have bounds on Distribution<T>
.
For example, one could type let password: Password = random()
(or even let password = random()
if the context is clear enough for rustc & human readers to infer the type). Or one could generate a vector of passwords via sample_iter
, etc.
The implementation would be almost exactly like the body of generate_password
currently, except that the Distribution<Password>
version gives you an Rng
to use instead of using thread_rng
(in practice, I believe this RNG will be a thread_rng in most cases).
(I'm not sure a Password
type is justified, but I wouldn't object to it, and if it's there we should certainly impl Distribution<Password>
rather than rolling-our-own convention.)
pub buckets: Option<Vec<i32>>, | ||
} | ||
|
||
fn sorted(buckets: &Vec<i32>) -> Result<(), ValidationError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this method makes a clone (allocating space), and sorts (taking O(n lg n)
time).
what we're trying to do is determine if the list is already sorted; instead, consider just iterating through the elements and checking for that they are ordered (pairwise), which doesn't require an allocation and runs in linear time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there going to be enough buckets we care about the computation in comparison to the network and db io?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We expect to have at least one use case with large histograms having tens of thousands of buckets. Longer-term, it would be good to just store the number of buckets, since that's all the aggregator cares about, but that'll probably take some changes across the VDAF spec and libprio.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened #102 as this is not really related to this PR aside from having moved these lines between files
} | ||
} | ||
|
||
fn unique(buckets: &Vec<i32>) -> Result<(), ValidationError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if you take the above suggestion, we could also fuse these two validators into something like sorted_and_unique
: for each consecutive pair of elements a
, b
, validate that a < b
. in addition to the benefits to sorted
above, this would also drop the HashSet
allocation from unique
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we wouldn't get different validation messages then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regardless, this code isn't new in this PR, it just moved into its own vdaf.rs because task.rs was getting too big
Trillium needs a change in order to expose CloneCounter for even more graceful shutdown
Co-authored-by: David Cook <divergentdave@gmail.com>
No description provided.