Skip to content

Commit

Permalink
Revert "Merge pull request #19164 from fdupont-redhat/v2v_manage_hand…
Browse files Browse the repository at this point in the history
…over_between_core_and_automate_part2"

This reverts commit 287da1f.

https://bugzilla.redhat.com/show_bug.cgi?id=1767637
  • Loading branch information
simaishi committed Dec 3, 2019
1 parent 569331f commit 08bc7b0
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 75 deletions.
134 changes: 87 additions & 47 deletions app/models/infra_conversion_job.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
class InfraConversionJob < Job
def self.create_job(options = {})
# TODO: from settings/user plan settings
options[:conversion_polling_interval] ||= Settings.transformation.limits.conversion_polling_interval # in seconds
options[:poll_conversion_max] ||= Settings.transformation.limits.poll_conversion_max
options[:poll_post_stage_max] ||= Settings.transformation.limits.poll_post_stage_max
super(name, options)
end

Expand Down Expand Up @@ -29,30 +33,21 @@ def load_transitions
self.state ||= 'initialize'

{
:initializing => {'initialize' => 'waiting_to_start'},
:start => {'waiting_to_start' => 'started'},
:poll_automate_state_machine => {
'started' => 'running_in_automate',
'running_in_automate' => 'running_in_automate'
},
:finish => {'*' => 'finished'},
:abort_job => {'*' => 'aborting'},
:cancel => {'*' => 'canceling'},
:error => {'*' => '*'}
}
end

def state_settings
@state_settings ||= {
:running_in_automate => {
:max_retries => 8640 # 36 hours with a retry interval of 15 seconds
}
:initializing => {'initialize' => 'waiting_to_start'},
:start => {'waiting_to_start' => 'running'},
:poll_conversion => {'running' => 'running'},
:start_post_stage => {'running' => 'post_conversion'},
:poll_post_stage => {'post_conversion' => 'post_conversion'},
:finish => {'*' => 'finished'},
:abort_job => {'*' => 'aborting'},
:cancel => {'*' => 'canceling'},
:error => {'*' => '*'}
}
end

def migration_task
@migration_task ||= target_entity
# valid states: %w(migrate pending finished active queued)
# valid states: %w(migrated pending finished active queued)
end

# Temporary method to allow switching from InfraConversionJob to Automate.
Expand All @@ -61,18 +56,85 @@ def handover_to_automate
migration_task.update_options(:workflow_runner => 'automate')
end

def start
migration_task.update!(:state => 'migrate')
handover_to_automate
queue_signal(:poll_conversion)
end

def abort_conversion(message, status)
migration_task.cancel
queue_signal(:abort_job, message, status)
end

def polling_timeout
options[:retry_interval] ||= Settings.transformation.job.retry_interval # in seconds
return false if state_settings[state.to_sym][:max_retries].nil?
def polling_timeout(poll_type)
count = "#{poll_type}_count".to_sym
max = "#{poll_type}_max".to_sym
context[count] = (context[count] || 0) + 1
context[count] > options[max]
end

def poll_conversion
return abort_conversion("Polling times out", 'error') if polling_timeout(:poll_conversion)

message = "Getting conversion state"
_log.info(prep_message(message))

unless migration_task.options.fetch_path(:virtv2v_wrapper, 'state_file')
message = "Virt v2v state file not available, continuing poll_conversion"
_log.info(prep_message(message))
update_attributes(:message => message)
return queue_signal(:poll_conversion, :deliver_on => Time.now.utc + options[:conversion_polling_interval])
end

begin
migration_task.get_conversion_state # migration_task.options will be updated
rescue => exception
_log.log_backtrace(exception)
return abort_conversion("Conversion error: #{exception}", 'error')
end

v2v_status = migration_task.options[:virtv2v_status]
message = "virtv2v_status=#{v2v_status}"
_log.info(prep_message(message))
update_attributes(:message => message)

case v2v_status
when 'active'
queue_signal(:poll_conversion, :deliver_on => Time.now.utc + options[:conversion_polling_interval])
when 'failed'
message = "disk conversion failed"
abort_conversion(prep_message(message), 'error')
when 'succeeded'
message = "disk conversion succeeded"
_log.info(prep_message(message))
queue_signal(:start_post_stage)
else
message = prep_message("Unknown converstion status: #{v2v_status}")
abort_conversion(message, 'error')
end
end

def start_post_stage
# once we refactor Automate's PostTransformation into a job, we kick start it here
message = "To wait for Post-Transformation progress"
_log.info(prep_message(message))
update_attributes(:message => message)
queue_signal(:poll_post_stage, :deliver_on => Time.now.utc + options[:conversion_polling_interval])
end

def poll_post_stage
return abort_conversion("Polling times out", 'error') if polling_timeout(:poll_post_stage)

retries = "retries_#{state}".to_sym
context[retries] = (context[retries] || 0) + 1
context[retries] > state_settings[state.to_sym][:max_retries]
message = "PostTransformation state=#{migration_task.state}, status=#{migration_task.status}"
_log.info(prep_message(message))
update_attributes(:message => message)
if migration_task.state == 'finished'
self.status = migration_task.status
queue_signal(:finish)
else
queue_signal(:poll_post_stage, :deliver_on => Time.now.utc + options[:conversion_polling_interval])
end
end

def queue_signal(*args, deliver_on: nil)
Expand All @@ -91,26 +153,4 @@ def queue_signal(*args, deliver_on: nil)
def prep_message(contents)
"MiqRequestTask id=#{migration_task.id}, InfraConversionJob id=#{id}. #{contents}"
end

# --- Methods that implement the state machine transitions --- #

def start
migration_task.update!(:state => 'migrate')
handover_to_automate
queue_signal(:poll_automate_state_machine)
end

def poll_automate_state_machine
return abort_conversion('Polling timed out', 'error') if polling_timeout

message = "Migration Task vm=#{migration_task.source.name}, state=#{migration_task.state}, status=#{migration_task.status}"
_log.info(prep_message(message))
update(:message => message)
if migration_task.state == 'finished'
self.status = migration_task.status
queue_signal(:finish)
else
queue_signal(:poll_automate_state_machine, :deliver_on => Time.now.utc + options[:retry_interval])
end
end
end
5 changes: 3 additions & 2 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1090,13 +1090,14 @@
:keep_tasks: 1.week
:purge_window_size: 1000
:transformation:
:job:
:retry_interval: 15 # in seconds
:limits:
:max_concurrent_tasks_per_ems: 10
:max_concurrent_tasks_per_host: 10
:cpu_limit_per_host: unlimited
:network_limit_per_host: unlimited
:conversion_polling_interval: 15 # in seconds
:poll_conversion_max: 86400 # i.e. default 24 hours (15s per-interval)
:poll_post_stage_max: 25200 # i.e. default 7 hours (15s per-interval)
:ui:
:mark_translated_strings: false
:display_ops_database: false
Expand Down
139 changes: 113 additions & 26 deletions spec/models/infra_conversion_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
end

context 'state transitions' do
%w[start poll_automate_state_machine finish abort_job cancel error].each do |signal|
%w(start poll_conversion start_post_stage poll_post_stage finish abort_job cancel error).each do |signal|
shared_examples_for "allows #{signal} signal" do
it signal.to_s do
expect(job).to receive(signal.to_sym)
Expand All @@ -22,7 +22,7 @@
end
end

%w[start poll_automate_state_machine].each do |signal|
%w(start poll_conversion start_post_stage poll_post_stage).each do |signal|
shared_examples_for "doesn't allow #{signal} signal" do
it signal.to_s do
expect { job.signal(signal.to_sym) }.to raise_error(RuntimeError, /#{signal} is not permitted at state #{job.state}/)
Expand All @@ -34,83 +34,170 @@
before do
job.state = 'waiting_to_start'
end

it_behaves_like 'allows start signal'
it_behaves_like 'allows finish signal'
it_behaves_like 'allows abort_job signal'
it_behaves_like 'allows cancel signal'
it_behaves_like 'allows error signal'

it_behaves_like 'doesn\'t allow poll_automate_state_machine signal'
it_behaves_like 'doesn\'t allow poll_conversion signal'
it_behaves_like 'doesn\'t allow start_post_stage signal'
it_behaves_like 'doesn\'t allow poll_post_stage signal'
end

context 'started' do
context 'running' do
before do
job.state = 'started'
job.state = 'running'
end

it_behaves_like 'allows poll_automate_state_machine signal'
it_behaves_like 'allows poll_conversion signal'
it_behaves_like 'allows start_post_stage signal'
it_behaves_like 'allows finish signal'
it_behaves_like 'allows abort_job signal'
it_behaves_like 'allows cancel signal'
it_behaves_like 'allows error signal'

it_behaves_like 'doesn\'t allow start signal'
it_behaves_like 'doesn\'t allow poll_post_stage signal'
end

context 'running_in_automate' do
context 'post_conversion' do
before do
job.state = 'running_in_automate'
job.state = 'post_conversion'
end

it_behaves_like 'allows poll_automate_state_machine signal'
it_behaves_like 'allows poll_post_stage signal'
it_behaves_like 'allows finish signal'
it_behaves_like 'allows abort_job signal'
it_behaves_like 'allows cancel signal'
it_behaves_like 'allows error signal'

it_behaves_like 'doesn\'t allow start signal'
it_behaves_like 'doesn\'t allow poll_conversion signal'
it_behaves_like 'doesn\'t allow start_post_stage signal'
end
end

context 'operations' do
let(:poll_interval) { Settings.transformation.job.retry_interval }
let(:poll_interval) { Settings.transformation.limits.conversion_polling_interval }

before do
allow(job).to receive(:migration_task).and_return(task)
end

context '#start' do
it 'to poll_automate_state_machine when preflight_check passes' do
expect(job).to receive(:queue_signal).with(:poll_automate_state_machine)
it 'to poll_conversion when preflight_check passes' do
expect(job).to receive(:queue_signal).with(:poll_conversion)
job.signal(:start)
expect(task.reload.state).to eq('migrate')
expect(task.state).to eq('migrate')
expect(task.options[:workflow_runner]).to eq('automate')
end
end

context '#poll_automate_state_machine' do
context '#poll_conversion' do
before do
job.state = 'running'
task.options[:virtv2v_wrapper] = {'state_file' => 'something'}
end

it 'to poll_conversion when migration_task.options[:virtv2v_wrapper] is nil' do
task.options[:virtv2v_wrapper] = nil
Timecop.freeze(2019, 2, 6) do
expect(job).to receive(:queue_signal).with(:poll_conversion, :deliver_on => Time.now.utc + poll_interval)
job.signal(:poll_conversion)
end
end

it 'to poll_conversion when migration_task.options[:virtv2v_wrapper][:state_file] is nil' do
task.options[:virtv2v_wrapper] = {'state_file' => nil}
Timecop.freeze(2019, 2, 6) do
expect(job).to receive(:queue_signal).with(:poll_conversion, :deliver_on => Time.now.utc + poll_interval)
job.signal(:poll_conversion)
end
end

it 'abort_conversion when get_conversion_state fails' do
expect(task).to receive(:get_conversion_state).and_raise
expect(job).to receive(:abort_conversion)
job.signal(:poll_conversion)
end

it 'to poll_conversion when migration_task.options[:virtv2v_status] is active' do
task.options[:virtv2v_status] = 'active'
Timecop.freeze(2019, 2, 6) do
expect(task).to receive(:get_conversion_state)
expect(job).to receive(:queue_signal).with(:poll_conversion, :deliver_on => Time.now.utc + poll_interval)
job.signal(:poll_conversion)
end
end

it 'abort_conversion when migration_task.options[:virtv2v_status] is failed' do
task.options[:virtv2v_status] = 'failed'
expect(task).to receive(:get_conversion_state)
expect(job).to receive(:abort_conversion)
job.signal(:poll_conversion)
end

it 'to start_post_stage when migration_task.options[:virtv2v_status] is succeeded' do
task.options[:virtv2v_status] = 'succeeded'
expect(task).to receive(:get_conversion_state)
expect(job).to receive(:queue_signal).with(:start_post_stage)
job.signal(:poll_conversion)
end

it 'abort_conversion when migration_task.options[:virtv2v_status] is unknown' do
task.options[:virtv2v_status] = '_'
expect(task).to receive(:get_conversion_state)
expect(job).to receive(:abort_conversion)
job.signal(:poll_conversion)
end

it 'abort_conversion when poll_conversion times out' do
job.options[:poll_conversion_max] = 24 * 60
job.context[:poll_conversion_count] = 24 * 60
expect(job).to receive(:abort_conversion)
job.signal(:poll_conversion)
end
end

context '#start_post_stage' do
it 'to poll_post_stage when signaled :start_post_stage' do
job.state = 'running'
Timecop.freeze(2019, 2, 6) do
expect(job).to receive(:queue_signal).with(:poll_post_stage, :deliver_on => Time.now.utc + poll_interval)
job.signal(:start_post_stage)
end
end
end

context '#poll_post_stage' do
before do
job.state = 'running_in_automate'
job.state = 'post_conversion'
end

it 'to poll_automate_state_machine when migration_task.state is not finished' do
task.update!(:state => 'migrate')
it 'to poll_post_stage when migration_task.state is not finished' do
task.state = 'not-finished'
Timecop.freeze(2019, 2, 6) do
expect(job).to receive(:queue_signal).with(:poll_automate_state_machine, :deliver_on => Time.now.utc + poll_interval)
job.signal(:poll_automate_state_machine)
expect(job).to receive(:queue_signal).with(:poll_post_stage, :deliver_on => Time.now.utc + poll_interval)
job.signal(:poll_post_stage)
end
end

it 'to finish when migration_task.state is finished' do
task.update!(:state => 'finished', :status => 'Ok')
task.state = 'finished'
task.status = 'whatever'
Timecop.freeze(2019, 2, 6) do
expect(job).to receive(:queue_signal).with(:finish)
job.signal(:poll_automate_state_machine)
job.signal(:poll_post_stage)
expect(job.status).to eq(task.status)
end
end

it 'abort_conversion when poll_automate_state_machine times out' do
job.context[:retries_running_in_automate] = 8640
expect(job).to receive(:abort_conversion).with('Polling timed out', 'error')
job.signal(:poll_automate_state_machine)
it 'abort_conversion when poll_post_stage times out' do
job.options[:poll_post_stage_max] = 30
job.context[:poll_post_stage_count] = 30
expect(job).to receive(:abort_conversion)
job.signal(:poll_post_stage)
end
end
end
Expand Down

0 comments on commit 08bc7b0

Please sign in to comment.