Skip to content

Commit

Permalink
Support restart task for statefulsets and daemonsets (#836)
Browse files Browse the repository at this point in the history
  • Loading branch information
timothysmith0609 committed Sep 15, 2021
1 parent 340e508 commit c9562f7
Show file tree
Hide file tree
Showing 16 changed files with 551 additions and 128 deletions.
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
inherit_gem:
rubocop-shopify: rubocop.yml

Style/DateTime:
Enabled: false

AllCops:
TargetRubyVersion: 2.4
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ resource to deploy.

# krane restart

`krane restart` is a tool for restarting all of the pods in one or more deployments. It triggers the restart by touching the `RESTARTED_AT` environment variable in the deployment's podSpec. The rollout strategy defined for each deployment will be respected by the restart.
`krane restart` is a tool for restarting all of the pods in one or more deployments, statefuls sets, and/or daemon sets. It triggers the restart by patching template metadata with the `kubectl.kubernetes.io/restartedAt` annotation (with the value being an RFC 3339 representation of the current time). Note this is the manner in which `kubectl rollout restart` itself triggers restarts.

## Usage

Expand Down
8 changes: 7 additions & 1 deletion lib/krane/cli/restart_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ class RestartCommand
DEFAULT_RESTART_TIMEOUT = '300s'
OPTIONS = {
"deployments" => { type: :array, banner: "list of deployments",
desc: "List of workload names to restart" },
desc: "List of deployment names to restart", default: [] },
"statefulsets" => { type: :array, banner: "list of statefulsets",
desc: "List of statefulset names to restart", default: [] },
"daemonsets" => { type: :array, banner: "list of daemonsets",
desc: "List of daemonset names to restart", default: [] },
"global-timeout" => { type: :string, banner: "duration", default: DEFAULT_RESTART_TIMEOUT,
desc: "Max duration to monitor workloads correctly restarted" },
"selector" => { type: :string, banner: "'label=value'",
Expand All @@ -25,6 +29,8 @@ def self.from_options(namespace, context, options)
)
restart.run!(
deployments: options[:deployments],
statefulsets: options[:statefulsets],
daemonsets: options[:daemonsets],
selector: selector,
verify_result: options["verify-result"]
)
Expand Down
200 changes: 152 additions & 48 deletions lib/krane/restart_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def initialize(deployment_name, response)
HTTP_OK_RANGE = 200..299
ANNOTATION = "shipit.shopify.io/restart"

RESTART_TRIGGER_ANNOTATION = "kubectl.kubernetes.io/restartedAt"

attr_reader :task_config

delegate :kubeclient_builder, to: :task_config
Expand Down Expand Up @@ -58,33 +60,41 @@ def run(**args)
# @param verify_result [Boolean] Wait for completion and verify success
#
# @return [nil]
def run!(deployments: nil, selector: nil, verify_result: true)
def run!(deployments: [], statefulsets: [], daemonsets: [], selector: nil, verify_result: true)
start = Time.now.utc
@logger.reset

@logger.phase_heading("Initializing restart")
verify_config!
deployments = identify_target_deployments(deployments, selector: selector)
deployments, statefulsets, daemonsets = identify_target_workloads(deployments, statefulsets,
daemonsets, selector: selector)

@logger.phase_heading("Triggering restart by touching ENV[RESTARTED_AT]")
@logger.phase_heading("Triggering restart by annotating pod template #{RESTART_TRIGGER_ANNOTATION} annotation")
patch_kubeclient_deployments(deployments)
patch_kubeclient_statefulsets(statefulsets)
patch_kubeclient_daemonsets(daemonsets)

if verify_result
@logger.phase_heading("Waiting for rollout")
resources = build_watchables(deployments, start)
resources = build_watchables(deployments, start, Deployment)
resources += build_watchables(statefulsets, start, StatefulSet)
resources += build_watchables(daemonsets, start, DaemonSet)
verify_restart(resources)
else
warning = "Result verification is disabled for this task"
@logger.summary.add_paragraph(ColorizedString.new(warning).yellow)
end
StatsD.client.distribution('restart.duration', StatsD.duration(start), tags: tags('success', deployments))
StatsD.client.distribution('restart.duration', StatsD.duration(start),
tags: tags('success', deployments, statefulsets, daemonsets))
@logger.print_summary(:success)
rescue DeploymentTimeoutError
StatsD.client.distribution('restart.duration', StatsD.duration(start), tags: tags('timeout', deployments))
StatsD.client.distribution('restart.duration', StatsD.duration(start),
tags: tags('timeout', deployments, statefulsets, daemonsets))
@logger.print_summary(:timed_out)
raise
rescue FatalDeploymentError => error
StatsD.client.distribution('restart.duration', StatsD.duration(start), tags: tags('failure', deployments))
StatsD.client.distribution('restart.duration', StatsD.duration(start),
tags: tags('failure', deployments, statefulsets, daemonsets))
@logger.summary.add_action(error.message) if error.message != error.class.to_s
@logger.print_summary(:failure)
raise
Expand All @@ -93,66 +103,140 @@ def run!(deployments: nil, selector: nil, verify_result: true)

private

def tags(status, deployments)
%W(namespace:#{@namespace} context:#{@context} status:#{status} deployments:#{deployments.to_a.length}})
def tags(status, deployments, statefulsets, daemonsets)
%W(namespace:#{@namespace} context:#{@context} status:#{status} deployments:#{deployments.to_a.length}
statefulsets:#{statefulsets.to_a.length} daemonsets:#{daemonsets.to_a.length}})
end

def identify_target_deployments(deployment_names, selector: nil)
if deployment_names.nil?
deployments = if selector.nil?
@logger.info("Configured to restart all deployments with the `#{ANNOTATION}` annotation")
apps_v1_kubeclient.get_deployments(namespace: @namespace)
def identify_target_workloads(deployment_names, statefulset_names, daemonset_names, selector: nil)
if deployment_names.blank? && statefulset_names.blank? && daemonset_names.blank?
if selector.nil?
@logger.info("Configured to restart all workloads with the `#{ANNOTATION}` annotation")
else
selector_string = selector.to_s
@logger.info(
"Configured to restart all deployments with the `#{ANNOTATION}` annotation and #{selector_string} selector"
"Configured to restart all workloads with the `#{ANNOTATION}` annotation and #{selector} selector"
)
apps_v1_kubeclient.get_deployments(namespace: @namespace, label_selector: selector_string)
end
deployments.select! { |d| d.metadata.annotations[ANNOTATION] }
deployments = identify_target_deployments(selector: selector)
statefulsets = identify_target_statefulsets(selector: selector)
daemonsets = identify_target_daemonsets(selector: selector)

if deployments.none?
raise FatalRestartError, "no deployments with the `#{ANNOTATION}` annotation found in namespace #{@namespace}"
if deployments.none? && statefulsets.none? && daemonsets.none?
raise FatalRestartError, "no deployments, statefulsets, or daemonsets, with the `#{ANNOTATION}` " \
"annotation found in namespace #{@namespace}"
end
elsif deployment_names.empty?
raise FatalRestartError, "Configured to restart deployments by name, but list of names was blank"
elsif !selector.nil?
raise FatalRestartError, "Can't specify deployment names and selector at the same time"
raise FatalRestartError, "Can't specify workload names and selector at the same time"
else
deployment_names = deployment_names.uniq
list = deployment_names.join(', ')
@logger.info("Configured to restart deployments by name: #{list}")

deployments = fetch_deployments(deployment_names)
if deployments.none?
raise FatalRestartError, "no deployments with names #{list} found in namespace #{@namespace}"
deployments, statefulsets, daemonsets = identify_target_workloads_by_name(deployment_names,
statefulset_names, daemonset_names)
if deployments.none? && statefulsets.none? && daemonsets.none?
error_msgs = []
error_msgs << "no deployments with names #{list} found in namespace #{@namespace}" if deployment_names
error_msgs << "no statefulsets with names #{list} found in namespace #{@namespace}" if statefulset_names
error_msgs << "no daemonsets with names #{list} found in namespace #{@namespace}" if daemonset_names
raise FatalRestartError, error_msgs.join(', ')
end
end
deployments
[deployments, statefulsets, daemonsets]
end

def build_watchables(kubeclient_resources, started)
def identify_target_workloads_by_name(deployment_names, statefulset_names, daemonset_names)
deployment_names = deployment_names.uniq
statefulset_names = statefulset_names.uniq
daemonset_names = daemonset_names.uniq

if deployment_names.present?
@logger.info("Configured to restart deployments by name: #{deployment_names.join(', ')}")
end
if statefulset_names.present?
@logger.info("Configured to restart statefulsets by name: #{statefulset_names.join(', ')}")
end
if daemonset_names.present?
@logger.info("Configured to restart daemonsets by name: #{daemonset_names.join(', ')}")
end

[fetch_deployments(deployment_names), fetch_statefulsets(statefulset_names), fetch_daemonsets(daemonset_names)]
end

def identify_target_deployments(selector: nil)
deployments = if selector.nil?
apps_v1_kubeclient.get_deployments(namespace: @namespace)
else
selector_string = selector.to_s
apps_v1_kubeclient.get_deployments(namespace: @namespace, label_selector: selector_string)
end
deployments.select { |d| d.metadata.annotations[ANNOTATION] }
end

def identify_target_statefulsets(selector: nil)
statefulsets = if selector.nil?
apps_v1_kubeclient.get_stateful_sets(namespace: @namespace)
else
selector_string = selector.to_s
apps_v1_kubeclient.get_stateful_sets(namespace: @namespace, label_selector: selector_string)
end
statefulsets.select { |d| d.metadata.annotations[ANNOTATION] }
end

def identify_target_daemonsets(selector: nil)
daemonsets = if selector.nil?
apps_v1_kubeclient.get_daemon_sets(namespace: @namespace)
else
selector_string = selector.to_s
apps_v1_kubeclient.get_daemon_sets(namespace: @namespace, label_selector: selector_string)
end
daemonsets.select { |d| d.metadata.annotations[ANNOTATION] }
end

def build_watchables(kubeclient_resources, started, klass)
kubeclient_resources.map do |d|
definition = d.to_h.deep_stringify_keys
r = Deployment.new(namespace: @namespace, context: @context, definition: definition, logger: @logger)
r = klass.new(namespace: @namespace, context: @context, definition: definition, logger: @logger)
r.deploy_started_at = started # we don't care what happened to the resource before the restart cmd ran
r
end
end

def patch_deployment_with_restart(record)
apps_v1_kubeclient.patch_deployment(
record.metadata.name,
build_patch_payload(record),
@namespace
)
apps_v1_kubeclient.patch_deployment(record.metadata.name, build_patch_payload(record), @namespace)
end

def patch_statefulset_with_restart(record)
apps_v1_kubeclient.patch_stateful_set(record.metadata.name, build_patch_payload(record), @namespace)
end

def patch_daemonset_with_restart(record)
apps_v1_kubeclient.patch_daemon_set(record.metadata.name, build_patch_payload(record), @namespace)
end

def patch_kubeclient_deployments(deployments)
deployments.each do |record|
begin
patch_deployment_with_restart(record)
@logger.info("Triggered `#{record.metadata.name}` restart")
@logger.info("Triggered `Deployment/#{record.metadata.name}` restart")
rescue Kubeclient::HttpError => e
raise RestartAPIError.new(record.metadata.name, e.message)
end
end
end

def patch_kubeclient_statefulsets(statefulsets)
statefulsets.each do |record|
begin
patch_statefulset_with_restart(record)
@logger.info("Triggered `StatefulSet/#{record.metadata.name}` restart")
rescue Kubeclient::HttpError => e
raise RestartAPIError.new(record.metadata.name, e.message)
end
end
end

def patch_kubeclient_daemonsets(daemonsets)
daemonsets.each do |record|
begin
patch_daemonset_with_restart(record)
@logger.info("Triggered `DaemonSet/#{record.metadata.name}` restart")
rescue Kubeclient::HttpError => e
raise RestartAPIError.new(record.metadata.name, e.message)
end
Expand All @@ -171,18 +255,38 @@ def fetch_deployments(list)
end
end

def build_patch_payload(deployment)
containers = deployment.spec.template.spec.containers
def fetch_statefulsets(list)
list.map do |name|
record = nil
begin
record = apps_v1_kubeclient.get_stateful_set(name, @namespace)
rescue Kubeclient::ResourceNotFoundError
raise FatalRestartError, "StatefulSet `#{name}` not found in namespace `#{@namespace}`"
end
record
end
end

def fetch_daemonsets(list)
list.map do |name|
record = nil
begin
record = apps_v1_kubeclient.get_daemon_set(name, @namespace)
rescue Kubeclient::ResourceNotFoundError
raise FatalRestartError, "DaemonSet `#{name}` not found in namespace `#{@namespace}`"
end
record
end
end

def build_patch_payload(_deployment)
{
spec: {
template: {
spec: {
containers: containers.map do |container|
{
name: container.name,
env: [{ name: "RESTARTED_AT", value: Time.now.to_i.to_s }],
}
end,
metadata: {
annotations: {
RESTART_TRIGGER_ANNOTATION => Time.now.utc.to_datetime.rfc3339,
},
},
},
},
Expand Down
25 changes: 24 additions & 1 deletion test/exe/restart_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,27 @@ def test_restart_passes_deployments_transparently
krane_restart!(flags: '--deployments web jobs')
end

def test_restart_passes_statefulsets_transparently
set_krane_restart_expectations(run_args: { statefulsets: ['ss'] })
krane_restart!(flags: '--statefulsets ss')
set_krane_restart_expectations(run_args: { statefulsets: ['ss', 'ss-2'] })
krane_restart!(flags: '--statefulsets ss ss-2')
end

def test_restart_passes_daemonsets_transparently
set_krane_restart_expectations(run_args: { daemonsets: ['ds'] })
krane_restart!(flags: '--daemonsets ds')
set_krane_restart_expectations(run_args: { daemonsets: ['ds', 'ds-2'] })
krane_restart!(flags: '--daemonsets ds ds-2')
end

def test_restart_passes_multiple_workload_types_transparently
set_krane_restart_expectations(
run_args: { deployments: ['web', 'jobs'], statefulsets: ['ss', 'ss-2'], daemonsets: ['ds', 'ds-2'] }
)
krane_restart!(flags: '--deployments web jobs --statefulsets ss ss-2 --daemonsets ds ds-2')
end

def test_restart_parses_selector
options = default_options
response = mock('RestartTask')
Expand Down Expand Up @@ -74,7 +95,9 @@ def default_options(new_args = {}, run_args = {})
global_timeout: 300,
}.merge(new_args),
run_args: {
deployments: nil,
deployments: [],
statefulsets: [],
daemonsets: [],
selector: nil,
verify_result: true,
}.merge(run_args),
Expand Down
30 changes: 30 additions & 0 deletions test/fixtures/branched/daemon_set.yml.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: <%= branch %>-ds-app
labels:
app: branched
branch: <%= branch %>
name: ds-app
annotations:
shipit.shopify.io/restart: "true"
spec:
selector:
matchLabels:
app: branched
branch: <%= branch %>
name: ds-app
template:
metadata:
labels:
app: branched
branch: <%= branch %>
name: ds-app
spec:
containers:
- name: app
image: busybox
imagePullPolicy: IfNotPresent
command: ["tail", "-f", "/dev/null"]
ports:
- containerPort: 80
Loading

0 comments on commit c9562f7

Please sign in to comment.