Skip to content

Commit

Permalink
Merge pull request #353 from Shopify/unmanaged_pod_fail_faster
Browse files Browse the repository at this point in the history
Unmanaged pods should fail fast when evicted/preempted/deleted
  • Loading branch information
KnVerey committed Oct 24, 2018
2 parents d1034ec + 02b7a93 commit 401fa6b
Show file tree
Hide file tree
Showing 17 changed files with 337 additions and 86 deletions.
6 changes: 2 additions & 4 deletions lib/kubernetes-deploy/deploy_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ def server_version
kubectl.server_version
end

NOT_FOUND_ERROR = 'NotFound'

def initialize(namespace:, context:, current_sha:, template_dir:, logger:, kubectl_instance: nil, bindings: {},
max_watch_seconds: nil)
@namespace = namespace
Expand Down Expand Up @@ -399,7 +397,7 @@ def deploy_resources(resources, prune: false, verify:, record_summary: true)

def apply_all(resources, prune)
return unless resources.present?
command = ["apply"]
command = %w(apply)

Dir.mktmpdir do |tmp_dir|
resources.each do |r|
Expand Down Expand Up @@ -500,7 +498,7 @@ def confirm_namespace_exists
st, err = nil
with_retries(2) do
_, err, st = kubectl.run("get", "namespace", @namespace, use_namespace: false, log_failure: true)
st.success? || err.include?(NOT_FOUND_ERROR)
st.success? || err.include?(KubernetesDeploy::Kubectl::NOT_FOUND_ERROR_TEXT)
end
raise FatalDeploymentError, "Failed to find namespace. #{err}" unless st.success?
@logger.info("Namespace #{@namespace} found")
Expand Down
18 changes: 14 additions & 4 deletions lib/kubernetes-deploy/kubectl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
module KubernetesDeploy
class Kubectl
DEFAULT_TIMEOUT = 30
NOT_FOUND_ERROR_TEXT = 'NotFound'

class ResourceNotFoundError < StandardError; end

def initialize(namespace:, context:, logger:, log_failure_by_default:, default_timeout: DEFAULT_TIMEOUT,
output_is_sensitive: false)
Expand All @@ -17,7 +20,7 @@ def initialize(namespace:, context:, logger:, log_failure_by_default:, default_t
raise ArgumentError, "context is required" if context.blank?
end

def run(*args, log_failure: nil, use_context: true, use_namespace: true)
def run(*args, log_failure: nil, use_context: true, use_namespace: true, raise_if_not_found: false)
log_failure = @log_failure_by_default if log_failure.nil?

args = args.unshift("kubectl")
Expand All @@ -29,10 +32,17 @@ def run(*args, log_failure: nil, use_context: true, use_namespace: true)
out, err, st = Open3.capture3(*args)
@logger.debug(out.shellescape) unless output_is_sensitive?

if !st.success? && log_failure
@logger.warn("The following command failed: #{Shellwords.join(args)}")
@logger.warn(err) unless output_is_sensitive?
unless st.success?
if log_failure
@logger.warn("The following command failed: #{Shellwords.join(args)}")
@logger.warn(err) unless output_is_sensitive?
end

if raise_if_not_found && err.match(NOT_FOUND_ERROR_TEXT)
raise ResourceNotFoundError, err
end
end

[out.chomp, err.chomp, st]
end

Expand Down
28 changes: 22 additions & 6 deletions lib/kubernetes-deploy/kubernetes_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def initialize(namespace:, context:, definition:, logger:, statsd_tags: [])
@logger = logger
@definition = definition
@statsd_report_done = false
@disappeared = false
@validation_errors = []
@instance_data = {}
end
Expand Down Expand Up @@ -121,12 +122,23 @@ def file_path
end

def sync(mediator)
@instance_data = mediator.get_instance(kubectl_resource_type, name)
@instance_data = mediator.get_instance(kubectl_resource_type, name, raise_if_not_found: true)
rescue KubernetesDeploy::Kubectl::ResourceNotFoundError
@disappeared = true if deploy_started?
@instance_data = {}
end

def after_sync
end

def terminating?
@instance_data.dig('metadata', 'deletionTimestamp').present?
end

def disappeared?
@disappeared
end

def deploy_failed?
false
end
Expand Down Expand Up @@ -189,22 +201,26 @@ def sync_debug_info(kubectl)
def debug_message(cause = nil, info_hash = {})
helpful_info = []
if cause == :gave_up
helpful_info << ColorizedString.new("#{id}: GLOBAL WATCH TIMEOUT (#{info_hash[:timeout]} seconds)").yellow
debug_heading = ColorizedString.new("#{id}: GLOBAL WATCH TIMEOUT (#{info_hash[:timeout]} seconds)").yellow
helpful_info << "If you expected it to take longer than #{info_hash[:timeout]} seconds for your deploy"\
" to roll out, increase --max-watch-seconds."
elsif deploy_failed?
helpful_info << ColorizedString.new("#{id}: FAILED").red
debug_heading = ColorizedString.new("#{id}: FAILED").red
helpful_info << failure_message if failure_message.present?
elsif deploy_timed_out?
helpful_info << ColorizedString.new("#{id}: TIMED OUT (#{pretty_timeout_type})").yellow
debug_heading = ColorizedString.new("#{id}: TIMED OUT (#{pretty_timeout_type})").yellow
helpful_info << timeout_message if timeout_message.present?
else
# Arriving in debug_message when we neither failed nor timed out is very unexpected. Dump all available info.
helpful_info << ColorizedString.new("#{id}: MONITORING ERROR").red
debug_heading = ColorizedString.new("#{id}: MONITORING ERROR").red
helpful_info << failure_message if failure_message.present?
helpful_info << timeout_message if timeout_message.present? && timeout_message != STANDARD_TIMEOUT_MESSAGE
end
helpful_info << " - Final status: #{status}"

final_status = " - Final status: #{status}"
final_status = "\n#{final_status}" if helpful_info.present? && !helpful_info.last.end_with?("\n")
helpful_info.prepend(debug_heading)
helpful_info << final_status

if @debug_events.present?
helpful_info << " - Events (common success events excluded):"
Expand Down
31 changes: 25 additions & 6 deletions lib/kubernetes-deploy/kubernetes_resource/pod.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,10 @@ def timeout_message
return STANDARD_TIMEOUT_MESSAGE unless readiness_probe_failure?
probe_failure_msgs = @containers.map(&:readiness_fail_reason).compact
header = "The following containers have not passed their readiness probes on at least one pod:\n"
header + probe_failure_msgs.join("\n") + "\n"
header + probe_failure_msgs.join("\n")
end

def failure_message
if phase == FAILED_PHASE_NAME && !TRANSIENT_FAILURE_REASONS.include?(reason)
phase_problem = "Pod status: #{status}. "
end

doomed_containers = @containers.select(&:doomed?)
if doomed_containers.present?
container_problems = if unmanaged?
Expand All @@ -86,7 +82,7 @@ def failure_message
container_problems += "> #{red_name}: #{c.doom_reason}\n"
end
end
"#{phase_problem}#{container_problems}".presence
"#{phase_failure_message} #{container_problems}".strip.presence
end

def fetch_debug_logs(kubectl)
Expand All @@ -100,6 +96,29 @@ def print_debug_logs?

private

def failed_phase?
phase == FAILED_PHASE_NAME
end

def transient_failure_reason?
return false if unmanaged?
TRANSIENT_FAILURE_REASONS.include?(reason)
end

def phase_failure_message
if failed_phase? && !transient_failure_reason?
return "Pod status: #{status}."
end

return unless unmanaged?

if terminating?
"Pod status: Terminating."
elsif disappeared?
"Pod status: Disappeared."
end
end

def logs
@logs ||= KubernetesDeploy::RemoteLogs.new(
logger: @logger,
Expand Down
27 changes: 18 additions & 9 deletions lib/kubernetes-deploy/sync_mediator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ def initialize(namespace:, context:, logger:)
clear_cache
end

def get_instance(kind, resource_name)
if @cache.key?(kind)
@cache.dig(kind, resource_name) || {}
else
request_instance(kind, resource_name)
def get_instance(kind, resource_name, raise_if_not_found: false)
unless @cache.key?(kind)
return request_instance(kind, resource_name, raise_if_not_found: raise_if_not_found)
end

cached_instance = @cache[kind].fetch(resource_name, {})
if cached_instance.blank? && raise_if_not_found
raise KubernetesDeploy::Kubectl::ResourceNotFoundError, "Resource does not exist (used cache for kind #{kind})"
end
cached_instance
end

def get_all(kind, selector = nil)
Expand Down Expand Up @@ -55,17 +59,22 @@ def clear_cache
@cache = {}
end

def request_instance(kind, iname)
raw_json, _, st = kubectl.run("get", kind, iname, "-a", "--output=json")
def request_instance(kind, iname, raise_if_not_found:)
raw_json, _err, st = kubectl.run("get", kind, iname, "-a", "--output=json",
raise_if_not_found: raise_if_not_found)
st.success? ? JSON.parse(raw_json) : {}
end

def fetch_by_kind(kind)
raw_json, _, st = kubectl.run("get", kind, "-a", "--output=json")
return unless st.success?
@cache[kind] = JSON.parse(raw_json)["items"].each_with_object({}) do |r, instances|
instances[r.dig("metadata", "name")] = r

instances = {}
JSON.parse(raw_json)["items"].each do |resource|
resource_name = resource.dig("metadata", "name")
instances[resource_name] = resource
end
@cache[kind] = instances
end
end
end
36 changes: 34 additions & 2 deletions test/integration/runner_task_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_run_without_verify_result_succeeds_as_soon_as_pod_is_successfully_creat
"Result: SUCCESS",
"Result verification is disabled for this task",
"The following status was observed immediately after pod creation:",
%r{Pod/task-runner-\w+\s+Pending},
%r{Pod/task-runner-\w+\s+(Pending|Running)},
], in_order: true)

pods = kubeclient.get_pods(namespace: @namespace)
Expand All @@ -37,7 +37,7 @@ def test_run_global_timeout_with_max_watch_seconds
"Result: TIMED OUT",
"Timed out waiting for 1 resource to run",
%r{Pod/task-runner-\w+: GLOBAL WATCH TIMEOUT \(5 seconds\)},
"Final status: Running"
/Final status\: (Pending|Running)/
], in_order: true)
end

Expand Down Expand Up @@ -88,6 +88,38 @@ def test_run_with_verify_result_success
assert_equal task_runner.pod_name, pods.first.metadata.name, "Pod name should be available after run"
end

def test_run_with_verify_result_fails_quickly_if_the_pod_is_deleted_out_of_band
deploy_task_template

task_runner = build_task_runner
deleter_thread = Thread.new do
loop do
if task_runner.pod_name.present?
begin
kubeclient.delete_pod(task_runner.pod_name, @namespace)
break
rescue Kubeclient::ResourceNotFoundError
sleep 0.1
retry
end
end
sleep 0.1
end
end
deleter_thread.abort_on_exception = true

result = task_runner.run(run_params(log_lines: 20, log_interval: 1))
assert_task_run_failure(result)

assert_logs_match_all([
"Pod creation succeeded",
"Result: FAILURE",
/Pod status\: (Terminating|Disappeared)/,
])
ensure
deleter_thread&.kill
end

def test_run_with_verify_result_neither_misses_nor_duplicates_logs_across_pollings
deploy_task_template
task_runner = build_task_runner
Expand Down
14 changes: 9 additions & 5 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,17 @@ def fixture_path(set_name)
source_dir
end

def stub_kubectl_response(*args, resp:, err: "", success: true, json: true, times: 1)
def stub_kubectl_response(*args, resp:, err: "", raise_if_not_found: nil, success: true, json: true, times: 1)
resp = resp.to_json if json
response = [resp, err, stub(success?: success)]
KubernetesDeploy::Kubectl.any_instance.expects(:run)
.with(*args)
.returns(response)
.times(times)

expectation = if raise_if_not_found.nil?
KubernetesDeploy::Kubectl.any_instance.expects(:run).with(*args)
else
KubernetesDeploy::Kubectl.any_instance.expects(:run).with(*args, raise_if_not_found: raise_if_not_found)
end

expectation.returns(response).times(times)
end

def build_runless_kubectl
Expand Down
16 changes: 16 additions & 0 deletions test/unit/kubernetes-deploy/kubectl_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,22 @@ def test_version_info_raises_if_command_fails
end
end

def test_run_with_raise_if_not_found_raises_the_correct_thing
err = 'Error from server (NotFound): pods "foobar" not found'
stub_open3(%w(kubectl get pod foobar --namespace=testn --context=testc --request-timeout=30),
resp: "", err: err, success: false)
assert_raises_message(KubernetesDeploy::Kubectl::ResourceNotFoundError, err) do
build_kubectl.run("get", "pod", "foobar", raise_if_not_found: true)
end
end

def test_run_with_raise_if_not_found_does_not_raise_on_other_errors
err = 'Error from server (TooManyRequests): Please try again later'
stub_open3(%w(kubectl get pod foobar --namespace=testn --context=testc --request-timeout=30),
resp: "", err: err, success: false)
build_kubectl.run("get", "pod", "foobar", raise_if_not_found: true)
end

private

def stub_version_request(client:, server:)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ def build_ds_template(status: {})
def build_synced_ds(template:)
ds = KubernetesDeploy::DaemonSet.new(namespace: "test", context: "nope", logger: logger, definition: template)
sync_mediator = build_sync_mediator
sync_mediator.kubectl.expects(:run).with("get", "DaemonSet", "ds-app", "-a", "--output=json").returns(
[template.to_json, "", SystemExit.new(0)]
)
sync_mediator.kubectl.expects(:run)
.with("get", "DaemonSet", "ds-app", "-a", "--output=json", raise_if_not_found: true)
.returns([template.to_json, "", SystemExit.new(0)])

sync_mediator.kubectl.expects(:run).with("get", "Pod", "-a", "--output=json", anything).returns(
['{ "items": [] }', "", SystemExit.new(0)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,9 @@ def build_rs_template(status: { 'replicas' => 3 })
def build_synced_deployment(template:, replica_sets:, server_version: Gem::Version.new("1.8"))
deploy = KubernetesDeploy::Deployment.new(namespace: "test", context: "nope", logger: logger, definition: template)
sync_mediator = build_sync_mediator
sync_mediator.kubectl.expects(:run).with("get", "Deployment", "web", "-a", "--output=json").returns(
[template.to_json, "", SystemExit.new(0)]
)
sync_mediator.kubectl.expects(:run)
.with("get", "Deployment", "web", "-a", "--output=json", raise_if_not_found: true)
.returns([template.to_json, "", SystemExit.new(0)])
sync_mediator.kubectl.expects(:server_version).returns(server_version)

if replica_sets.present?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ def build_synced_pdb(template:)
pdb = KubernetesDeploy::PodDisruptionBudget.new(namespace: "test", context: "nope",
logger: logger, definition: template)
sync_mediator = KubernetesDeploy::SyncMediator.new(namespace: 'test', context: 'minikube', logger: logger)
sync_mediator.kubectl.expects(:run).with("get", "PodDisruptionBudget", "test", "-a", "--output=json").returns(
[template.to_json, "", SystemExit.new(0)]
)
sync_mediator.kubectl.expects(:run)
.with("get", "PodDisruptionBudget", "test", "-a", "--output=json", raise_if_not_found: true)
.returns([template.to_json, "", SystemExit.new(0)])
pdb.sync(sync_mediator)
pdb
end
Expand Down
Loading

0 comments on commit 401fa6b

Please sign in to comment.