Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions app/grpc/flow_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,7 @@ class FlowHandler < Tucana::Sagittarius::FlowService::Service

grpc_stream :update

def self.update_started(runtime_id)
runtime = Runtime.find(runtime_id)
runtime.connected!
runtime.save

logger.info(message: 'Runtime connected', runtime_id: runtime.id)

def self.update_runtime(runtime)
flows = []
runtime.projects.each do |project|
project.flows.each do |flow|
Expand All @@ -33,6 +27,16 @@ def self.update_started(runtime_id)
)
end

def self.update_started(runtime_id)
runtime = Runtime.find(runtime_id)
runtime.connected!
runtime.save

logger.info(message: 'Runtime connected', runtime_id: runtime.id)

update_runtime(runtime)
end

def self.update_died(runtime_id)
runtime = Runtime.find(runtime_id)
runtime.disconnected!
Expand Down
37 changes: 20 additions & 17 deletions app/jobs/application_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,29 @@ class ApplicationJob < ActiveJob::Base
# Most jobs are safe to ignore if the underlying records are no longer available
# discard_on ActiveJob::DeserializationError

retry_on StandardError, wait: :polynomially_longer, attempts: 10
ActiveJob::Base.instance_eval do
retry_on StandardError, wait: :polynomially_longer, attempts: 10

before_enqueue do |job|
next if job.arguments.first&.key?(:sagittarius_context)
before_enqueue do |job|
possible_context = job.arguments.last
next if possible_context.is_a?(Hash) && possible_context&.try(:key?, :sagittarius_context)

job.arguments.unshift Code0::ZeroTrack::Context.current.to_h.merge(sagittarius_context: true)
end
job.arguments << Code0::ZeroTrack::Context.current.to_h.merge(sagittarius_context: true)
end

around_perform do |job, block|
context = job.arguments.shift
context.delete(:sagittarius_context)
source_application = context.fetch(Code0::ZeroTrack::Context.log_key(:application), nil)
Code0::ZeroTrack::Context.with_context(
**context,
application: 'good_job',
source_application: source_application,
job_id: job.job_id,
job_class: self.class.name
) do
block.call
around_perform do |job, block|
context = job.arguments.pop
context.delete(:sagittarius_context)
source_application = context.fetch(Code0::ZeroTrack::Context.log_key(:application), nil)
Code0::ZeroTrack::Context.with_context(
**context,
application: 'good_job',
source_application: source_application,
job_id: job.job_id,
job_class: self.class.name
) do
block.call
end
end
end
end
12 changes: 12 additions & 0 deletions app/jobs/update_runtimes_for_project_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

class UpdateRuntimesForProjectJob < ApplicationJob
def perform(project_id)
project = NamespaceProject.find_by(id: project_id)
return if project.nil?

project.runtimes.each do |runtime|
FlowHandler.update_runtime(runtime)
end
end
end
2 changes: 2 additions & 0 deletions app/services/namespaces/projects/flows/create_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def execute
)
end

UpdateRuntimesForProjectJob.perform_later(namespace_project.id)

AuditService.audit(
:flow_created,
author_id: current_authentication.user.id,
Expand Down
2 changes: 2 additions & 0 deletions app/services/namespaces/projects/flows/delete_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def execute
)
end

UpdateRuntimesForProjectJob.perform_later(flow.project.id)

AuditService.audit(
:flow_deleted,
author_id: current_authentication.user.id,
Expand Down
31 changes: 31 additions & 0 deletions spec/jobs/update_runtimes_for_project_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe UpdateRuntimesForProjectJob do
include ActiveJob::TestHelper

let(:flow) { create(:flow) }
let(:runtimes) { create_list(:runtime, 2, namespace: flow.project.namespace) }
let!(:other_runtime) { create(:runtime, namespace: flow.project.namespace) }

before do
runtimes.each do |runtime|
create(:namespace_project_runtime_assignment, namespace_project: flow.project, runtime: runtime)
end
end

it 'sends update to all relevant runtimes' do
allow(FlowHandler).to receive(:update_runtime)

perform_enqueued_jobs do
described_class.perform_later(flow.project.id)
end

runtimes.each do |runtime|
expect(FlowHandler).to have_received(:update_runtime).with(runtime)
end

expect(FlowHandler).not_to have_received(:update_runtime).with(other_runtime)
end
end
3 changes: 3 additions & 0 deletions spec/rails_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@
config.include AuthenticationHelpers, type: :policy
config.include AuthenticationHelpers, type: :service

# Load job helpers
config.include JobHelpers, type: :job

config.before eager_load: true do
Rails.application.eager_load!
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,13 @@
target_type: 'NamespaceProject'
)
end

it 'queues job to update runtimes' do
allow(UpdateRuntimesForProjectJob).to receive(:perform_later)

service_response

expect(UpdateRuntimesForProjectJob).to have_received(:perform_later).with(namespace_project.id)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,13 @@
target_type: 'NamespaceProject'
)
end

it 'queues job to update runtimes' do
allow(UpdateRuntimesForProjectJob).to receive(:perform_later)

service_response

expect(UpdateRuntimesForProjectJob).to have_received(:perform_later).with(namespace_project.id)
end
end
end
7 changes: 7 additions & 0 deletions spec/support/helpers/job_helpers.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

module JobHelpers
def queue_adapter_for_test
GoodJob::Adapter.new(execution_mode: :inline)
end
end