Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement time bucketing for fixed size query type #1626

Merged
merged 2 commits into from
Aug 3, 2023

Conversation

divergentdave
Copy link
Contributor

This resolves #1574, adding a per-task configuration option to control how fixed size batches are constructed, by dividing reports into time buckets and only putting reports from the same bucket in each batch. The aggregation job creator logic is refactored so it can work in a streaming manner, and then unaggregated reports are fed through this, with a different instance for each time bucket (if applicable). There is a database change to add a new column on outstanding_batches. Note that once a fixed size batch has been finalized, the time bucketing setting is no longer relevant, and we already otherwise track client timestamp intervals on batches themselves. The change to the query type enum affects the tasks.query_type jsonb column in the database, the task definition YAML format, and the aggregator API, but in a backwards compatible way.

@divergentdave divergentdave requested a review from a team as a code owner July 20, 2023 22:27
@divergentdave divergentdave force-pushed the david/fixed-size-time-bucketing branch from de66544 to f8f72a2 Compare July 20, 2023 23:45
Copy link
Member

@branlwyd branlwyd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! One big comment, and a couple of nits.

@@ -547,6 +591,7 @@ impl<C: Clock + 'static> AggregationJobCreator<C> {
task: Arc<Task>,
vdaf: Arc<A>,
task_max_batch_size: u64,
task_max_batch_duration: Option<janus_messages::Duration>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this something like task_batch_time_window_size? While this parameter does define the maximum duration of a batch, it also defines the alignment of a batch.

@@ -109,6 +109,7 @@
# parameter must be provided.
query_type: !FixedSize
max_batch_size: 100
max_batch_duration: null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we allow this to default to None if unspecified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the last case in the task::tests::query_type_serde test checks that omitting it is supported. I just added this to the sample file for the sake of illustration.

}));
// Find unaggregated client reports.
let unaggregated_report_ids = tx
.get_unaggregated_client_report_ids_for_task(task.id())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will return some client reports, but the reports that are returned are returned independently of their client timestamps.

I think this would permit a failure mode where, even when there are more than enough client reports available to create aggregation jobs in the relevant batch interval(s), we just-so-happen to retrieve too few client reports per batch interval to create an aggregation job. (This might even be cascading: as we fail to create aggregation jobs, more reports will arrive living in more batch intervals, so we are even more likely to retrieve a set of client reports which don't permit creation of aggregation jobs.)

One non-solution would be to introduce an index on (task_id, client_timestamp) and use that to retrieve the oldest aggregatable reports. This doesn't necessarily fix the issue because we might end up in a situation where there are many old "straggler" reports which are returned each time, causing us to not see the newer reports.

One solution that might work would be to introduce the above index, then use that to retrieve the newest aggregatable reports. This will work better as long as incoming reports have roughly increasing client timestamps. But I suppose even here we might end up "stranding" some older reports behind a number of intervals without enough reports to create an aggregation job, which will then never be aggregated.

Maybe there's a better solution possible. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fetching the newest unaggregated reports sounds promising. One other idea: we could solve this problem by aggregating straggler reports, turning our straggler report problem into a straggler batch problem (which we already have). We could adjust our aggregation job size heuristics, either based on report age, or adaptively based on the progress made by the aggregation job creator, and override the minimum batch size or minimum aggregation size. This would trade overhead on aggregating those stragglers for liveness and efficiency of the aggregation job creator.

On another note, time bucketing means we will have outstanding batches that never qualify for collection. (until they become expired, and get ignored and cleaned up) The database query that resolves current_batch requests will have to filter through these (and potentially run aggregations) to find completed batches. We could similarly add an ORDER BY DESC here to check newer batches first. This should ensure the query finds completed batches soon in the normal case, but it doesn't change that we'd have to run aggregations over all stragglers when we're caught up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can start with "aggregate newest unaggregated reports" and go from there.

A heuristic might work, but getting the details right would be complicated. (also, nit: we can't override the minimum batch size without protocol text support, since it is also checked by the helper. we can of course override the minimum aggregation job size.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that situation, we would only create outstanding batches and run the aggregation flow on them, but never run the collection flow, (assuming we never get enough late reports) so it shouldn't be a problem at the protocol level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the ORDER BY clause to the report query while rebasing; we already have an index covering those columns, client_reports_task_and_timestamp_index.

Copy link
Member

@branlwyd branlwyd Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Postgres indexes are (somewhat?) unidirectional. We should consider adding an index in descending timestamp order, or perhaps converting the existing index to be in descending timestamp order (if this does not slow down other queries).

https://dba.stackexchange.com/questions/19797/postgres-primary-key-sorted-in-the-reverse-order-will-it-use-the-index-efficien

edit: ignore this comment, the docs say that Postgres indexes can be scanned backwards efficiently: https://www.postgresql.org/docs/14/indexes-ordering.html

Comment on lines 45 to +55
FixedSize {
/// The maximum number of reports in a batch to allow it to be collected.
max_batch_size: u64,
/// If present, reports will be separated into different batches by timestamp, such that
/// the client timestamp interval duration will not exceed this value. The minimum and
/// maximum allowed report timestamps for each batch will be multiples of this value as
/// well. This must be a multiple of the task's time precision.
///
/// This is an implementation-specific configuration parameter, and not part of the query
/// type as defined in DAP.
batch_time_window_size: Option<Duration>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to note is that taskprov has no way of encoding this.

struct {
    QueryType query_type;         /* I-D.draft-ietf-ppm-dap-04 */
    [...]
    select (QueryConfig.query_type) {
        case time_interval: Empty;
        case fixed_size:    uint32 max_batch_size;
    }
} QueryConfig;

--we don't have space to specify batch_time_window_size. I think it's not a real problem given our use cases for taskprov, just making a note.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think this is OK given how we want to deploy taskprov.

@divergentdave divergentdave force-pushed the david/fixed-size-time-bucketing branch from 3c01d7e to 94056ed Compare July 26, 2023 20:26
@divergentdave divergentdave force-pushed the david/fixed-size-time-bucketing branch from 94056ed to 84ed393 Compare July 31, 2023 15:27
Copy link
Member

@branlwyd branlwyd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, one last small issue & this is good to go!

}));
// Find unaggregated client reports.
let unaggregated_report_ids = tx
.get_unaggregated_client_report_ids_for_task(task.id())
Copy link
Member

@branlwyd branlwyd Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Postgres indexes are (somewhat?) unidirectional. We should consider adding an index in descending timestamp order, or perhaps converting the existing index to be in descending timestamp order (if this does not slow down other queries).

https://dba.stackexchange.com/questions/19797/postgres-primary-key-sorted-in-the-reverse-order-will-it-use-the-index-efficien

edit: ignore this comment, the docs say that Postgres indexes can be scanned backwards efficiently: https://www.postgresql.org/docs/14/indexes-ordering.html

@divergentdave
Copy link
Contributor Author

Yes, here's a couple plans for synthetic queries that the cost model treats forward and backward scans equivalently.

EXPLAIN ANALYZE SELECT client_reports.id FROM client_reports WHERE client_reports.task_id = 0 ORDER BY client_timestamp DESC LIMIT 5000;

                                                                               QUERY PLAN                                                                               
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=0.28..6.14 rows=1 width=16) (actual time=0.048..0.048 rows=0 loops=1)
   ->  Index Scan Backward using client_reports_task_and_timestamp_index on client_reports  (cost=0.28..6.14 rows=1 width=16) (actual time=0.046..0.046 rows=0 loops=1)
         Index Cond: (task_id = 0)
 Planning Time: 0.124 ms
 Execution Time: 0.077 ms
(5 rows)

EXPLAIN ANALYZE SELECT client_reports.id FROM client_reports WHERE client_reports.task_id = 0 ORDER BY client_timestamp ASC LIMIT 5000;

                                                                          QUERY PLAN                                                                           
---------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=0.28..6.14 rows=1 width=16) (actual time=0.023..0.023 rows=0 loops=1)
   ->  Index Scan using client_reports_task_and_timestamp_index on client_reports  (cost=0.28..6.14 rows=1 width=16) (actual time=0.020..0.021 rows=0 loops=1)
         Index Cond: (task_id = 0)
 Planning Time: 0.206 ms
 Execution Time: 0.057 ms
(5 rows)

When looking at a plan for our actual query, it instead uses a different index, because its partial index condition satisfies part of the query (with good selectivity I imagine), and then sorts the result (before the limit node, ouch).

 EXPLAIN ANALYZE WITH unaggregated_reports AS (SELECT client_reports.id FROM client_reports JOIN tasks ON tasks.id = client_reports.task_id WHERE tasks.id = 0 AND client_reports.aggregation_started = FALSE AND client_reports.client_timestamp >= COALESCE('2023-08-02 23:32:00'::timestamp - tasks.report_expiry_age * '1 second'::interval, '-infinity'::timestamp) ORDER BY client_timestamp DESC FOR UPDATE OF client_reports SKIP LOCKED LIMIT 5000) UPDATE client_reports SET aggregation_started = true WHERE id IN (SELECT id FROM unaggregated_reports) RETURNING report_id, client_timestamp;

                                                                                                                           QUERY PLAN                                                                                                                            
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Update on client_reports  (cost=5.72..13.76 rows=1 width=39) (actual time=0.021..0.022 rows=0 loops=1)
   CTE unaggregated_reports
     ->  Limit  (cost=5.40..5.42 rows=1 width=28) (actual time=0.018..0.019 rows=0 loops=1)
           ->  LockRows  (cost=5.40..5.42 rows=1 width=28) (actual time=0.018..0.018 rows=0 loops=1)
                 ->  Sort  (cost=5.40..5.41 rows=1 width=28) (actual time=0.017..0.018 rows=0 loops=1)
                       Sort Key: client_reports_1.client_timestamp DESC
                       Sort Method: quicksort  Memory: 25kB
                       ->  Nested Loop  (cost=0.12..5.39 rows=1 width=28) (actual time=0.011..0.011 rows=0 loops=1)
                             Join Filter: (client_reports_1.client_timestamp >= COALESCE(('2023-08-02 23:32:00'::timestamp without time zone - ((tasks.report_expiry_age)::double precision * '00:00:01'::interval)), '-infinity'::timestamp without time zone))
                             ->  Index Scan using client_reports_task_unaggregated on client_reports client_reports_1  (cost=0.12..4.31 rows=1 width=30) (actual time=0.011..0.011 rows=0 loops=1)
                                   Index Cond: (task_id = 0)
                                   Filter: (NOT aggregation_started)
                             ->  Seq Scan on tasks  (cost=0.00..1.06 rows=1 width=22) (never executed)
                                   Filter: (id = 0)
   ->  Nested Loop  (cost=0.30..8.34 rows=1 width=39) (actual time=0.020..0.021 rows=0 loops=1)
         ->  HashAggregate  (cost=0.02..0.03 rows=1 width=40) (actual time=0.020..0.020 rows=0 loops=1)
               Group Key: unaggregated_reports.id
               Batches: 1  Memory Usage: 24kB
               ->  CTE Scan on unaggregated_reports  (cost=0.00..0.02 rows=1 width=40) (actual time=0.019..0.019 rows=0 loops=1)
         ->  Index Scan using client_reports_pkey on client_reports  (cost=0.28..8.30 rows=1 width=14) (never executed)
               Index Cond: (id = unaggregated_reports.id)
 Planning Time: 0.175 ms
 Execution Time: 0.098 ms
(23 rows)

We should swap out this partial index for another that covers the timestamp column too, then we should be good to go.

@divergentdave
Copy link
Contributor Author

Here's the updated plan, the new index lets us get rid of the sort.

 EXPLAIN ANALYZE WITH unaggregated_reports AS (SELECT client_reports.id FROM client_reports JOIN tasks ON tasks.id = client_reports.task_id WHERE tasks.id = 1 AND client_reports.aggregation_started = FALSE AND client_reports.client_timestamp >= COALESCE('2023-08-02 23:32:00'::timestamp - tasks.report_expiry_age * '1 second'::interval, '-infinity'::timestamp) ORDER BY client_timestamp DESC FOR UPDATE OF client_reports SKIP LOCKED LIMIT 5000) UPDATE client_reports SET aggregation_started = true WHERE id IN (SELECT id FROM unaggregated_reports) RETURNING report_id, client_timestamp;

                                                                                                                        QUERY PLAN                                                                                                                         
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Update on client_reports  (cost=7.35..15.38 rows=1 width=39) (actual time=0.033..0.035 rows=0 loops=1)
   CTE unaggregated_reports
     ->  Limit  (cost=0.12..7.05 rows=1 width=28) (actual time=0.027..0.028 rows=0 loops=1)
           ->  LockRows  (cost=0.12..7.05 rows=1 width=28) (actual time=0.026..0.027 rows=0 loops=1)
                 ->  Nested Loop  (cost=0.12..7.04 rows=1 width=28) (actual time=0.025..0.026 rows=0 loops=1)
                       Join Filter: (client_reports_1.client_timestamp >= COALESCE(('2023-08-02 23:32:00'::timestamp without time zone - ((tasks.report_expiry_age)::double precision * '00:00:01'::interval)), '-infinity'::timestamp without time zone))
                       ->  Index Scan Backward using client_reports_task_and_timestamp_unaggregated_index on client_reports client_reports_1  (cost=0.12..5.95 rows=1 width=30) (actual time=0.025..0.025 rows=0 loops=1)
                             Index Cond: (task_id = 1)
                             Filter: (NOT aggregation_started)
                       ->  Seq Scan on tasks  (cost=0.00..1.06 rows=1 width=22) (never executed)
                             Filter: (id = 1)
   ->  Nested Loop  (cost=0.30..8.34 rows=1 width=39) (actual time=0.032..0.032 rows=0 loops=1)
         ->  HashAggregate  (cost=0.02..0.03 rows=1 width=40) (actual time=0.031..0.031 rows=0 loops=1)
               Group Key: unaggregated_reports.id
               Batches: 1  Memory Usage: 24kB
               ->  CTE Scan on unaggregated_reports  (cost=0.00..0.02 rows=1 width=40) (actual time=0.029..0.029 rows=0 loops=1)
         ->  Index Scan using client_reports_pkey on client_reports  (cost=0.28..8.30 rows=1 width=14) (never executed)
               Index Cond: (id = unaggregated_reports.id)
 Planning Time: 0.453 ms
 Execution Time: 0.207 ms
(20 rows)

@divergentdave divergentdave merged commit 14b23ca into main Aug 3, 2023
7 checks passed
@divergentdave divergentdave deleted the david/fixed-size-time-bucketing branch August 3, 2023 16:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement time-bucketing for fixed-size batches.
3 participants