@@ -77,34 +77,9 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil,
7777 updated_at : current_utc_time
7878 )
7979
80- begin
81- ActiveRecord ::Base . transaction ( requires_new : true ) do
82- Models ::Thread . create! (
83- hostname : hostname ,
84- process_id : process_id ,
85- thread_id : thread_id ,
86- queued_count : 0 ,
87- queued_count_last_updated_at : current_utc_time ,
88- publishing_count : 0 ,
89- publishing_count_last_updated_at : current_utc_time ,
90- published_count : 0 ,
91- published_count_last_updated_at : current_utc_time ,
92- failed_count : 0 ,
93- failed_count_last_updated_at : current_utc_time ,
94- created_at : current_utc_time ,
95- updated_at : current_utc_time )
96- end
97- rescue ActiveRecord ::RecordNotUnique
98- # no op
99- end
100-
101- thread = Models ::Thread . lock . find_by! (
102- hostname : hostname , process_id : process_id , thread_id : thread_id )
103-
104- thread . update! (
105- queued_count : thread . queued_count + 1 ,
106- queued_count_last_updated_at : current_utc_time ,
107- updated_at : current_utc_time )
80+ Models ::Thread . update_message_counts_by! (
81+ hostname : hostname , process_id : process_id , thread_id : thread_id ,
82+ queued_count : 1 , current_utc_time : current_utc_time )
10883
10984 { id : message . id , lock_version : message . lock_version }
11085 end
@@ -262,15 +237,9 @@ def publishing(hostname: Socket.gethostname,
262237 Status ::PUBLISHING , current_utc_time , current_utc_time
263238 ] )
264239
265- Models ::Thread
266- . where ( hostname : hostname , process_id : process_id , thread_id : thread_id )
267- . update_all ( [
268- "queued_count = queued_count - 1, " \
269- "publishing_count = publishing_count + 1, " \
270- "publishing_count_last_updated_at = ?, " \
271- "updated_at = ?" ,
272- current_utc_time , current_utc_time
273- ] )
240+ Models ::Thread . update_message_counts_by! (
241+ hostname : hostname , process_id : process_id , thread_id : thread_id ,
242+ queued_count : -1 , publishing_count : 1 , current_utc_time : current_utc_time )
274243
275244 {
276245 id : message_row [ 0 ] ,
@@ -320,16 +289,9 @@ def published(id:, lock_version:,
320289 Models ::Exception . where ( message_id : id ) . delete_all
321290 Models ::Message . where ( id : id ) . delete_all
322291
323- Models ::Thread
324- . where ( hostname : hostname , process_id : process_id , thread_id : thread_id )
325- . update_all ( [
326- "publishing_count = publishing_count - 1, " \
327- "published_count = published_count + 1, " \
328- "published_count_last_updated_at = ?, " \
329- "updated_at = ?" ,
330- current_utc_time ,
331- current_utc_time
332- ] )
292+ Models ::Thread . update_message_counts_by! (
293+ hostname : hostname , process_id : process_id , thread_id : thread_id ,
294+ publishing_count : -1 , published_count : 1 , current_utc_time : current_utc_time )
333295
334296 { id : id }
335297 end
@@ -380,16 +342,9 @@ def publishing_failed(id:, lock_version:, error: nil,
380342 end
381343 end
382344
383- Models ::Thread
384- . where ( hostname : hostname , process_id : process_id , thread_id : thread_id )
385- . update_all ( [
386- "publishing_count = publishing_count - 1, " \
387- "failed_count = failed_count + 1, " \
388- "failed_count_last_updated_at = ?, " \
389- "updated_at = ?" ,
390- current_utc_time ,
391- current_utc_time
392- ] )
345+ Models ::Thread . update_message_counts_by! (
346+ hostname : hostname , process_id : process_id , thread_id : thread_id ,
347+ publishing_count : -1 , failed_count : 1 , current_utc_time : current_utc_time )
393348
394349 {
395350 id : message . id ,
@@ -505,43 +460,18 @@ def delete(id:, lock_version:,
505460 message . exceptions . delete_all
506461 message . delete
507462
508- thread = Models ::Thread . lock . find_by (
509- hostname : hostname , process_id : process_id , thread_id : thread_id )
510-
511- if thread
512- thread . update! (
513- queued_count :
514- thread . queued_count - ( message . status == Status ::QUEUED ? 1 : 0 ) ,
515- publishing_count :
516- thread . publishing_count - ( message . status == Status ::PUBLISHING ? 1 : 0 ) ,
517- published_count :
518- thread . published_count - ( message . status == Status ::PUBLISHED ? 1 : 0 ) ,
519- failed_count :
520- thread . failed_count - ( message . status == Status ::FAILED ? 1 : 0 ) ,
521- updated_at : current_utc_time )
522- else
523- Models ::Thread . create! (
524- hostname : hostname ,
525- process_id : process_id ,
526- thread_id : thread_id ,
527- queued_count : 0 ,
528- publishing_count : 0 ,
529- published_count : 0 ,
530- failed_count : 0 ,
531- created_at : current_utc_time ,
532- updated_at : current_utc_time )
533- end
463+ Models ::Thread . update_message_counts_by! (
464+ hostname : hostname ,
465+ process_id : process_id ,
466+ thread_id : thread_id ,
467+ queued_count : ( message . status == Status ::QUEUED ? -1 : 0 ) ,
468+ publishing_count : ( message . status == Status ::PUBLISHING ? -1 : 0 ) ,
469+ published_count : ( message . status == Status ::PUBLISHED ? -1 : 0 ) ,
470+ failed_count : ( message . status == Status ::FAILED ? -1 : 0 ) ,
471+ current_utc_time : current_utc_time )
534472
535473 { id : id }
536474 end
537- rescue ActiveRecord ::RecordNotUnique
538- delete (
539- id : id ,
540- lock_version : lock_version ,
541- hostname : hostname ,
542- process_id : process_id ,
543- thread_id : thread_id ,
544- time : time )
545475 end
546476 end
547477
@@ -584,52 +514,21 @@ def requeue(id:, lock_version:,
584514 publisher_id : publisher_id ,
585515 publisher_name : publisher_name )
586516
587- thread = Models ::Thread . lock . find_by (
588- hostname : hostname , process_id : process_id , thread_id : thread_id )
589-
590- if thread
591- thread . update! (
592- queued_count :
593- thread . queued_count + 1 ,
594- publishing_count :
595- thread . publishing_count - ( original_status == Status ::PUBLISHING ? 1 : 0 ) ,
596- published_count :
597- thread . published_count - ( original_status == Status ::PUBLISHED ? 1 : 0 ) ,
598- failed_count :
599- thread . failed_count - ( original_status == Status ::FAILED ? 1 : 0 ) ,
600- queued_count_last_updated_at :
601- current_utc_time ,
602- updated_at :
603- current_utc_time )
604- else
605- Models ::Thread . create! (
606- hostname : hostname ,
607- process_id : process_id ,
608- thread_id : thread_id ,
609- queued_count : 1 ,
610- publishing_count : 0 ,
611- published_count : 0 ,
612- failed_count : 0 ,
613- queued_count_last_updated_at : current_utc_time ,
614- created_at : current_utc_time ,
615- updated_at : current_utc_time )
616- end
517+ Models ::Thread . update_message_counts_by! (
518+ hostname : hostname ,
519+ process_id : process_id ,
520+ thread_id : thread_id ,
521+ queued_count : 1 ,
522+ publishing_count : ( original_status == Status ::PUBLISHING ? -1 : 0 ) ,
523+ published_count : ( original_status == Status ::PUBLISHED ? -1 : 0 ) ,
524+ failed_count : ( original_status == Status ::FAILED ? -1 : 0 ) ,
525+ current_utc_time : current_utc_time )
617526
618527 {
619528 id : id ,
620529 lock_version : lock_version
621530 }
622531 end
623- rescue ActiveRecord ::RecordNotUnique
624- requeue (
625- id : id ,
626- lock_version : lock_version ,
627- publisher_id : publisher_id ,
628- publisher_name : publisher_name ,
629- hostname : hostname ,
630- process_id : process_id ,
631- thread_id : thread_id ,
632- time : time )
633532 end
634533 end
635534
0 commit comments