Skip to content

Commit 5ebf675

Browse files
committed
fix: heartbeat ingestion responses
(i think) (hopefully)
1 parent 6856a89 commit 5ebf675

1 file changed

Lines changed: 13 additions & 45 deletions

File tree

rustytime/src/handlers/api/user.rs

Lines changed: 13 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use axum::http::{HeaderMap, StatusCode, Uri};
66
use axum::response::{IntoResponse, Response};
77
use chrono::Utc;
88
use diesel::prelude::*;
9+
use diesel::upsert::excluded;
910
use ipnetwork::IpNetwork;
1011
use schemars::JsonSchema;
1112
use serde::Serialize;
@@ -27,7 +28,7 @@ use crate::utils::time::{
2728
TimeFormat, get_day_end_utc, get_day_start_utc, get_today_in_timezone, human_readable_duration,
2829
parse_timezone,
2930
};
30-
use std::collections::{HashMap, hash_map};
31+
use std::collections::HashMap;
3132

3233
const MAX_HEARTBEATS_PER_REQUEST: usize = 100;
3334
const HEARTBEAT_INSERT_BATCH_SIZE: usize = 1_000; // avoids hitting Postgres' 65k parameter limit
@@ -260,7 +261,7 @@ pub async fn store_heartbeats_in_db_count_only(
260261

261262
async fn store_heartbeats_in_db_internal(
262263
pool: &DbPool,
263-
new_heartbeats: Vec<NewHeartbeat>,
264+
mut new_heartbeats: Vec<NewHeartbeat>,
264265
include_responses: bool,
265266
) -> Result<(Vec<HeartbeatResponse>, usize), diesel::result::Error> {
266267
let pool = pool.clone();
@@ -269,13 +270,10 @@ async fn store_heartbeats_in_db_internal(
269270
let mut conn = pool.get().expect("Failed to get DB connection from pool");
270271

271272
db_transaction_result!(conn, |conn| {
272-
let mut seen: HashMap<(i32, chrono::DateTime<Utc>), ()> =
273-
HashMap::with_capacity(new_heartbeats.len());
274-
let mut ordered_keys: Vec<(i32, chrono::DateTime<Utc>)> =
273+
let mut keys: Vec<(i32, chrono::DateTime<Utc>)> =
275274
Vec::with_capacity(new_heartbeats.len());
276-
let mut deduplicated: Vec<NewHeartbeat> = Vec::with_capacity(new_heartbeats.len());
277275

278-
for mut heartbeat in new_heartbeats {
276+
for heartbeat in &mut new_heartbeats {
279277
if heartbeat.project_id.is_none()
280278
&& let Some(project_name) = heartbeat.project.as_ref()
281279
{
@@ -284,65 +282,35 @@ async fn store_heartbeats_in_db_internal(
284282
heartbeat.project_id = Some(project_id);
285283
}
286284

287-
let key = (heartbeat.user_id, heartbeat.time);
288-
ordered_keys.push(key);
289-
290-
if let hash_map::Entry::Vacant(e) = seen.entry(key) {
291-
e.insert(());
292-
deduplicated.push(heartbeat);
293-
}
285+
keys.push((heartbeat.user_id, heartbeat.time));
294286
}
295287

296288
let mut inserted_map: HashMap<(i32, chrono::DateTime<Utc>), Heartbeat> = HashMap::new();
297289
let mut inserted_total = 0usize;
298290

299-
for chunk in deduplicated.chunks(HEARTBEAT_INSERT_BATCH_SIZE) {
291+
for chunk in new_heartbeats.chunks(HEARTBEAT_INSERT_BATCH_SIZE) {
292+
let chunk_len = chunk.len();
300293
let returned: Vec<Heartbeat> =
301294
instrumented::load("Heartbeat::batch_insert", || {
302295
diesel::insert_into(heartbeats::table)
303296
.values(chunk)
304297
.on_conflict((heartbeats::user_id, heartbeats::time))
305-
.do_nothing()
298+
.do_update()
299+
.set(heartbeats::time.eq(excluded(heartbeats::time)))
306300
.returning(heartbeats::all_columns)
307301
.load::<Heartbeat>(conn)
308302
})?;
309303

310-
inserted_total += returned.len();
304+
inserted_total += returned.len().min(chunk_len);
311305
for hb in returned {
312306
inserted_map.insert((hb.user_id, hb.time), hb);
313307
}
314308
}
315309

316310
let responses = if include_responses {
317-
let missing_keys: Vec<(i32, chrono::DateTime<Utc>)> = seen
318-
.keys()
319-
.filter(|k| !inserted_map.contains_key(*k))
320-
.copied()
321-
.collect();
322-
323-
if !missing_keys.is_empty() {
324-
let user_ids: Vec<i32> = missing_keys.iter().map(|(uid, _)| *uid).collect();
325-
let times: Vec<chrono::DateTime<Utc>> =
326-
missing_keys.iter().map(|(_, t)| *t).collect();
327-
328-
let fetched: Vec<Heartbeat> =
329-
instrumented::load("Heartbeat::fetch_existing", || {
330-
heartbeats::table
331-
.filter(heartbeats::user_id.eq_any(&user_ids))
332-
.filter(heartbeats::time.eq_any(&times))
333-
.load::<Heartbeat>(conn)
334-
})?;
335-
336-
for hb in fetched {
337-
inserted_map.insert((hb.user_id, hb.time), hb);
338-
}
339-
}
340-
341-
ordered_keys
342-
.into_iter()
311+
keys.iter()
343312
.map(|key| {
344-
let hb = inserted_map.get(&key).unwrap().clone();
345-
HeartbeatResponse::from(hb)
313+
HeartbeatResponse::from(inserted_map[key].clone())
346314
})
347315
.collect()
348316
} else {

0 commit comments

Comments
 (0)