diff --git a/app/grpc/flow_handler.rb b/app/grpc/flow_handler.rb index 537d2711..1ab0deb7 100644 --- a/app/grpc/flow_handler.rb +++ b/app/grpc/flow_handler.rb @@ -7,13 +7,7 @@ class FlowHandler < Tucana::Sagittarius::FlowService::Service grpc_stream :update - def self.update_started(runtime_id) - runtime = Runtime.find(runtime_id) - runtime.connected! - runtime.save - - logger.info(message: 'Runtime connected', runtime_id: runtime.id) - + def self.update_runtime(runtime) flows = [] runtime.projects.each do |project| project.flows.each do |flow| @@ -33,6 +27,16 @@ def self.update_started(runtime_id) ) end + def self.update_started(runtime_id) + runtime = Runtime.find(runtime_id) + runtime.connected! + runtime.save + + logger.info(message: 'Runtime connected', runtime_id: runtime.id) + + update_runtime(runtime) + end + def self.update_died(runtime_id) runtime = Runtime.find(runtime_id) runtime.disconnected! diff --git a/app/jobs/application_job.rb b/app/jobs/application_job.rb index ca4e11d3..7535bb46 100644 --- a/app/jobs/application_job.rb +++ b/app/jobs/application_job.rb @@ -8,26 +8,29 @@ class ApplicationJob < ActiveJob::Base # Most jobs are safe to ignore if the underlying records are no longer available # discard_on ActiveJob::DeserializationError - retry_on StandardError, wait: :polynomially_longer, attempts: 10 + ActiveJob::Base.instance_eval do + retry_on StandardError, wait: :polynomially_longer, attempts: 10 - before_enqueue do |job| - next if job.arguments.first&.key?(:sagittarius_context) + before_enqueue do |job| + possible_context = job.arguments.last + next if possible_context.is_a?(Hash) && possible_context&.try(:key?, :sagittarius_context) - job.arguments.unshift Code0::ZeroTrack::Context.current.to_h.merge(sagittarius_context: true) - end + job.arguments << Code0::ZeroTrack::Context.current.to_h.merge(sagittarius_context: true) + end - around_perform do |job, block| - context = job.arguments.shift - context.delete(:sagittarius_context) - source_application = context.fetch(Code0::ZeroTrack::Context.log_key(:application), nil) - Code0::ZeroTrack::Context.with_context( - **context, - application: 'good_job', - source_application: source_application, - job_id: job.job_id, - job_class: self.class.name - ) do - block.call + around_perform do |job, block| + context = job.arguments.pop + context.delete(:sagittarius_context) + source_application = context.fetch(Code0::ZeroTrack::Context.log_key(:application), nil) + Code0::ZeroTrack::Context.with_context( + **context, + application: 'good_job', + source_application: source_application, + job_id: job.job_id, + job_class: self.class.name + ) do + block.call + end end end end diff --git a/app/jobs/update_runtimes_for_project_job.rb b/app/jobs/update_runtimes_for_project_job.rb new file mode 100644 index 00000000..ba5d2a48 --- /dev/null +++ b/app/jobs/update_runtimes_for_project_job.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +class UpdateRuntimesForProjectJob < ApplicationJob + def perform(project_id) + project = NamespaceProject.find_by(id: project_id) + return if project.nil? + + project.runtimes.each do |runtime| + FlowHandler.update_runtime(runtime) + end + end +end diff --git a/app/services/namespaces/projects/flows/create_service.rb b/app/services/namespaces/projects/flows/create_service.rb index f51639b6..ace15863 100644 --- a/app/services/namespaces/projects/flows/create_service.rb +++ b/app/services/namespaces/projects/flows/create_service.rb @@ -59,6 +59,8 @@ def execute ) end + UpdateRuntimesForProjectJob.perform_later(namespace_project.id) + AuditService.audit( :flow_created, author_id: current_authentication.user.id, diff --git a/app/services/namespaces/projects/flows/delete_service.rb b/app/services/namespaces/projects/flows/delete_service.rb index 8cf9e0d9..a8503105 100644 --- a/app/services/namespaces/projects/flows/delete_service.rb +++ b/app/services/namespaces/projects/flows/delete_service.rb @@ -28,6 +28,8 @@ def execute ) end + UpdateRuntimesForProjectJob.perform_later(flow.project.id) + AuditService.audit( :flow_deleted, author_id: current_authentication.user.id, diff --git a/spec/jobs/update_runtimes_for_project_job_spec.rb b/spec/jobs/update_runtimes_for_project_job_spec.rb new file mode 100644 index 00000000..08310746 --- /dev/null +++ b/spec/jobs/update_runtimes_for_project_job_spec.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe UpdateRuntimesForProjectJob do + include ActiveJob::TestHelper + + let(:flow) { create(:flow) } + let(:runtimes) { create_list(:runtime, 2, namespace: flow.project.namespace) } + let!(:other_runtime) { create(:runtime, namespace: flow.project.namespace) } + + before do + runtimes.each do |runtime| + create(:namespace_project_runtime_assignment, namespace_project: flow.project, runtime: runtime) + end + end + + it 'sends update to all relevant runtimes' do + allow(FlowHandler).to receive(:update_runtime) + + perform_enqueued_jobs do + described_class.perform_later(flow.project.id) + end + + runtimes.each do |runtime| + expect(FlowHandler).to have_received(:update_runtime).with(runtime) + end + + expect(FlowHandler).not_to have_received(:update_runtime).with(other_runtime) + end +end diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index bb12e91f..f58f8153 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -88,6 +88,9 @@ config.include AuthenticationHelpers, type: :policy config.include AuthenticationHelpers, type: :service + # Load job helpers + config.include JobHelpers, type: :job + config.before eager_load: true do Rails.application.eager_load! end diff --git a/spec/services/namespaces/projects/flows/create_service_spec.rb b/spec/services/namespaces/projects/flows/create_service_spec.rb index a4cd747b..06a6b550 100644 --- a/spec/services/namespaces/projects/flows/create_service_spec.rb +++ b/spec/services/namespaces/projects/flows/create_service_spec.rb @@ -66,5 +66,13 @@ target_type: 'NamespaceProject' ) end + + it 'queues job to update runtimes' do + allow(UpdateRuntimesForProjectJob).to receive(:perform_later) + + service_response + + expect(UpdateRuntimesForProjectJob).to have_received(:perform_later).with(namespace_project.id) + end end end diff --git a/spec/services/namespaces/projects/flows/delete_service_spec.rb b/spec/services/namespaces/projects/flows/delete_service_spec.rb index a8ba4d16..d2df3390 100644 --- a/spec/services/namespaces/projects/flows/delete_service_spec.rb +++ b/spec/services/namespaces/projects/flows/delete_service_spec.rb @@ -58,5 +58,13 @@ target_type: 'NamespaceProject' ) end + + it 'queues job to update runtimes' do + allow(UpdateRuntimesForProjectJob).to receive(:perform_later) + + service_response + + expect(UpdateRuntimesForProjectJob).to have_received(:perform_later).with(namespace_project.id) + end end end diff --git a/spec/support/helpers/job_helpers.rb b/spec/support/helpers/job_helpers.rb new file mode 100644 index 00000000..c739203d --- /dev/null +++ b/spec/support/helpers/job_helpers.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +module JobHelpers + def queue_adapter_for_test + GoodJob::Adapter.new(execution_mode: :inline) + end +end