@@ -264,17 +264,14 @@ async fn store_heartbeats_in_db_internal(
264264 let pool = pool. clone ( ) ;
265265
266266 tokio:: task:: spawn_blocking ( move || {
267- let mut connection = pool. get ( ) . expect ( "Failed to get DB connection from pool" ) ;
267+ let mut conn = pool. get ( ) . expect ( "Failed to get DB connection from pool" ) ;
268268
269- db_transaction_result ! ( connection, |conn| {
270- let mut heartbeat_keys = if include_responses {
271- Some ( Vec :: with_capacity( new_heartbeats. len( ) ) )
272- } else {
273- None
274- } ;
269+ db_transaction_result ! ( conn, |conn| {
275270 let mut seen: HashMap <( i32 , chrono:: DateTime <Utc >) , ( ) > =
276271 HashMap :: with_capacity( new_heartbeats. len( ) ) ;
277- let mut deduplicated = Vec :: with_capacity( new_heartbeats. len( ) ) ;
272+ let mut ordered_keys: Vec <( i32 , chrono:: DateTime <Utc >) > =
273+ Vec :: with_capacity( new_heartbeats. len( ) ) ;
274+ let mut deduplicated: Vec <NewHeartbeat > = Vec :: with_capacity( new_heartbeats. len( ) ) ;
278275
279276 for mut heartbeat in new_heartbeats {
280277 if heartbeat. project_id. is_none( )
@@ -286,47 +283,63 @@ async fn store_heartbeats_in_db_internal(
286283 }
287284
288285 let key = ( heartbeat. user_id, heartbeat. time) ;
289- if let Some ( keys) = heartbeat_keys. as_mut( ) {
290- keys. push( key) ;
291- }
286+ ordered_keys. push( key) ;
292287
293288 if let hash_map:: Entry :: Vacant ( e) = seen. entry( key) {
294289 e. insert( ( ) ) ;
295290 deduplicated. push( heartbeat) ;
296291 }
297292 }
298293
294+ let mut inserted_map: HashMap <( i32 , chrono:: DateTime <Utc >) , Heartbeat > = HashMap :: new( ) ;
299295 let mut inserted_total = 0usize ;
296+
300297 for chunk in deduplicated. chunks( HEARTBEAT_INSERT_BATCH_SIZE ) {
301- inserted_total += instrumented:: execute( "Heartbeat::batch_insert" , || {
302- diesel:: insert_into( heartbeats:: table)
303- . values( chunk)
304- . on_conflict( ( heartbeats:: user_id, heartbeats:: time) )
305- . do_nothing( )
306- . execute( conn)
307- } ) ?;
298+ let returned: Vec <Heartbeat > =
299+ instrumented:: load( "Heartbeat::batch_insert" , || {
300+ diesel:: insert_into( heartbeats:: table)
301+ . values( chunk)
302+ . on_conflict( ( heartbeats:: user_id, heartbeats:: time) )
303+ . do_nothing( )
304+ . returning( heartbeats:: all_columns)
305+ . load:: <Heartbeat >( conn)
306+ } ) ?;
307+
308+ inserted_total += returned. len( ) ;
309+ for hb in returned {
310+ inserted_map. insert( ( hb. user_id, hb. time) , hb) ;
311+ }
308312 }
309313
310314 let responses = if include_responses {
311- let unique_keys: Vec <_> = seen. keys( ) . copied( ) . collect( ) ;
312-
313- let mut heartbeat_cache: HashMap <( i32 , chrono:: DateTime <Utc >) , Heartbeat > =
314- HashMap :: new( ) ;
315- for ( uid, t) in unique_keys {
316- let hb = instrumented:: first( "Heartbeat::fetch_after_insert" , || {
317- heartbeats:: table
318- . filter( heartbeats:: user_id. eq( uid) )
319- . filter( heartbeats:: time. eq( t) )
320- . first:: <Heartbeat >( conn)
321- } ) ?;
322- heartbeat_cache. insert( ( uid, t) , hb) ;
315+ let missing_keys: Vec <( i32 , chrono:: DateTime <Utc >) > = seen
316+ . keys( )
317+ . filter( |k| !inserted_map. contains_key( * k) )
318+ . copied( )
319+ . collect( ) ;
320+
321+ if !missing_keys. is_empty( ) {
322+ let user_ids: Vec <i32 > = missing_keys. iter( ) . map( |( uid, _) | * uid) . collect( ) ;
323+ let times: Vec <chrono:: DateTime <Utc >> =
324+ missing_keys. iter( ) . map( |( _, t) | * t) . collect( ) ;
325+
326+ let fetched: Vec <Heartbeat > =
327+ instrumented:: load( "Heartbeat::fetch_existing" , || {
328+ heartbeats:: table
329+ . filter( heartbeats:: user_id. eq_any( & user_ids) )
330+ . filter( heartbeats:: time. eq_any( & times) )
331+ . load:: <Heartbeat >( conn)
332+ } ) ?;
333+
334+ for hb in fetched {
335+ inserted_map. insert( ( hb. user_id, hb. time) , hb) ;
336+ }
323337 }
324338
325- heartbeat_keys
326- . unwrap( )
339+ ordered_keys
327340 . into_iter( )
328341 . map( |key| {
329- let hb = heartbeat_cache . get( & key) . unwrap( ) . clone( ) ;
342+ let hb = inserted_map . get( & key) . unwrap( ) . clone( ) ;
330343 HeartbeatResponse :: from( hb)
331344 } )
332345 . collect( )
0 commit comments