Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batched fetching #251

Merged
merged 8 commits into from
Apr 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,9 @@ This gem uses subclasses of `KubernetesResource` to implement custom success/fai

1. Create a the file for your type in `lib/kubernetes-deploy/kubernetes_resource/`
2. Create a new class that inherits from `KubernetesResource`. Minimally, it should implement the following methods:
* `sync` -- gather/update the data you'll need to determine `deploy_succeeded?` and `deploy_failed?`
* `sync` -- Gather the data you'll need to determine `deploy_succeeded?` and `deploy_failed?`. The superclass's implementation fetches the corresponding resource, parses it and stores it in `@instance_data`. You can define your own implementation if you need something else.
* `deploy_succeeded?`
* `deploy_failed?`
* `exists?`
3. Adjust the `TIMEOUT` constant to an appropriate value for this type.
4. Add the a basic example of the type to the hello-cloud [fixture set](https://github.com/Shopify/kubernetes-deploy/tree/master/test/fixtures/hello-cloud) and appropriate assertions to `#assert_all_up` in [`hello_cloud.rb`](https://github.com/Shopify/kubernetes-deploy/blob/master/test/helpers/fixture_sets/hello_cloud.rb). This will get you coverage in several existing tests, such as `test_full_hello_cloud_set_deploy_succeeds`.
5. Add tests for any edge cases you foresee.
Expand Down
1 change: 1 addition & 0 deletions lib/kubernetes-deploy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
require 'kubernetes-deploy/concurrency'
require 'kubernetes-deploy/bindings_parser'
require 'kubernetes-deploy/duration_parser'
require 'kubernetes-deploy/sync_mediator'

module KubernetesDeploy
MIN_KUBE_VERSION = '1.7.0'
Expand Down
13 changes: 8 additions & 5 deletions lib/kubernetes-deploy/deploy_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
memcached
service
pod_template
bugsnag
pod_disruption_budget
replica_set
service_account
Expand Down Expand Up @@ -46,7 +45,6 @@ class DeployTask
Cloudsql
Redis
Memcached
Bugsnag
ConfigMap
PersistentVolumeClaim
ServiceAccount
Expand Down Expand Up @@ -103,6 +101,7 @@ def initialize(namespace:, context:, current_sha:, template_dir:, logger:, kubec
logger: @logger,
bindings: bindings,
)
@sync_mediator = SyncMediator.new(namespace: @namespace, context: @context, logger: @logger)
end

def run(*args)
Expand All @@ -124,7 +123,7 @@ def run!(verify_result: true, allow_protected_ns: false, prune: true)
validate_definitions(resources)

@logger.phase_heading("Checking initial resource statuses")
KubernetesDeploy::Concurrency.split_across_threads(resources, &:sync)
@sync_mediator.sync(resources)
resources.each { |r| @logger.info(r.pretty_status) }

ejson = EjsonSecretProvisioner.new(
Expand Down Expand Up @@ -207,6 +206,9 @@ def predeploy_priority_resources(resource_list)
failed_resources = matching_resources.reject(&:deploy_succeeded?)
fail_count = failed_resources.length
if fail_count > 0
KubernetesDeploy::Concurrency.split_across_threads(failed_resources) do |r|
r.sync_debug_info(@sync_mediator.kubectl)
end
failed_resources.each { |r| @logger.summary.add_paragraph(r.debug_message) }
raise FatalDeploymentError, "Failed to deploy #{fail_count} priority #{'resource'.pluralize(fail_count)}"
end
Expand All @@ -215,7 +217,7 @@ def predeploy_priority_resources(resource_list)
end

def validate_definitions(resources)
KubernetesDeploy::Concurrency.split_across_threads(resources, &:validate_definition)
KubernetesDeploy::Concurrency.split_across_threads(resources) { |r| r.validate_definition(kubectl) }
failed_resources = resources.select(&:validation_failed?)
return unless failed_resources.present?

Expand Down Expand Up @@ -373,7 +375,8 @@ def deploy_resources(resources, prune: false, verify:, record_summary: true)
apply_all(applyables, prune)

if verify
watcher = ResourceWatcher.new(resources, logger: @logger, deploy_started_at: deploy_started_at)
watcher = ResourceWatcher.new(resources: resources, sync_mediator: @sync_mediator,
logger: @logger, deploy_started_at: deploy_started_at)
watcher.run(record_summary: record_summary)
end
end
Expand Down
66 changes: 34 additions & 32 deletions lib/kubernetes-deploy/kubernetes_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
require 'json'
require 'open3'
require 'shellwords'
require 'kubernetes-deploy/kubectl'

module KubernetesDeploy
class KubernetesResource
Expand All @@ -28,20 +27,26 @@ class KubernetesResource

TIMEOUT_OVERRIDE_ANNOTATION = "kubernetes-deploy.shopify.io/timeout-override"

def self.build(namespace:, context:, definition:, logger:)
opts = { namespace: namespace, context: context, definition: definition, logger: logger }
if KubernetesDeploy.const_defined?(definition["kind"])
klass = KubernetesDeploy.const_get(definition["kind"])
klass.new(**opts)
else
inst = new(**opts)
inst.type = definition["kind"]
inst
class << self
def build(namespace:, context:, definition:, logger:)
opts = { namespace: namespace, context: context, definition: definition, logger: logger }
if KubernetesDeploy.const_defined?(definition["kind"])
klass = KubernetesDeploy.const_get(definition["kind"])
klass.new(**opts)
else
inst = new(**opts)
inst.type = definition["kind"]
inst
end
end

def timeout
self::TIMEOUT
end
end

def self.timeout
self::TIMEOUT
def kind
name.demodulize
end
end

def timeout
Expand Down Expand Up @@ -74,9 +79,10 @@ def initialize(namespace:, context:, definition:, logger:)
@definition = definition
@statsd_report_done = false
@validation_errors = []
@instance_data = {}
end

def validate_definition
def validate_definition(kubectl)
@validation_errors = []
validate_timeout_annotation

Expand All @@ -103,7 +109,8 @@ def file_path
file.path
end

def sync
def sync(mediator)
@instance_data = mediator.get_instance(type, name)
end

def deploy_failed?
Expand All @@ -115,23 +122,24 @@ def deploy_started?
end

def deploy_succeeded?
if deploy_started? && !@success_assumption_warning_shown
return false unless deploy_started?
unless @success_assumption_warning_shown
@logger.warn("Don't know how to monitor resources of type #{type}. Assuming #{id} deployed successfully.")
@success_assumption_warning_shown = true
end
true
end

def exists?
nil
@instance_data.present?
end

def status
@status ||= "Unknown"
exists? ? "Exists" : "Unknown"
end

def type
@type || self.class.name.demodulize
@type || self.class.kind
end

def deploy_timed_out?
Expand All @@ -144,15 +152,13 @@ def deploy_method
:apply
end

def sync_debug_info
@events = fetch_events unless ENV[DISABLE_FETCHING_EVENT_INFO]
@logs = fetch_logs if supports_logs? && !ENV[DISABLE_FETCHING_EVENT_INFO]
def sync_debug_info(kubectl)
@events = fetch_events(kubectl) unless ENV[DISABLE_FETCHING_EVENT_INFO]
@logs = fetch_logs(kubectl) if supports_logs? && !ENV[DISABLE_FETCHING_EVENT_INFO]
@debug_info_synced = true
end

def debug_message
sync_debug_info unless @debug_info_synced

helpful_info = []
if deploy_failed?
helpful_info << ColorizedString.new("#{id}: FAILED").red
Expand Down Expand Up @@ -210,9 +216,10 @@ def debug_message
# "Pulled: Successfully pulled image "hello-world:latest" (1 events)"
# ]
# }
def fetch_events
def fetch_events(kubectl)
return {} unless exists?
out, _err, st = kubectl.run("get", "events", "--output=go-template=#{Event.go_template_for(type, name)}")
out, _err, st = kubectl.run("get", "events", "--output=go-template=#{Event.go_template_for(type, name)}",
log_failure: false)
return {} unless st.success?

event_collector = Hash.new { |hash, key| hash[key] = [] }
Expand All @@ -230,12 +237,7 @@ def failure_message

def pretty_status
padding = " " * [50 - id.length, 1].max
msg = exists? ? status : "not found"
"#{id}#{padding}#{msg}"
end

def kubectl
@kubectl ||= Kubectl.new(namespace: @namespace, context: @context, logger: @logger, log_failure_by_default: false)
"#{id}#{padding}#{status}"
end

def report_status_to_statsd(watch_time)
Expand Down
9 changes: 2 additions & 7 deletions lib/kubernetes-deploy/kubernetes_resource/bucket.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
# frozen_string_literal: true
module KubernetesDeploy
class Bucket < KubernetesResource
def sync
_, _err, st = kubectl.run("get", type, @name)
@found = st.success?
end

def deploy_succeeded?
return false unless deploy_started?

Expand All @@ -16,8 +11,8 @@ def deploy_succeeded?
true
end

def exists?
@found
def status
exists? ? "Available" : "Unknown"
end

def deploy_failed?
Expand Down
33 changes: 0 additions & 33 deletions lib/kubernetes-deploy/kubernetes_resource/bugsnag.rb

This file was deleted.

69 changes: 20 additions & 49 deletions lib/kubernetes-deploy/kubernetes_resource/cloudsql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,75 +3,46 @@ module KubernetesDeploy
class Cloudsql < KubernetesResource
TIMEOUT = 10.minutes

def sync
_, _err, st = kubectl.run("get", type, @name)
@found = st.success?
@deployment_exists = cloudsql_proxy_deployment_exists?
@service_exists = mysql_service_exists?
@status = if @deployment_exists && @service_exists
"Provisioned"
else
"Unknown"
end
SYNC_DEPENDENCIES = %w(Deployment Service)
def sync(mediator)
super
@proxy_deployment = mediator.get_instance(Deployment.kind, "cloudsql-#{cloudsql_resource_uuid}")
@proxy_service = mediator.get_instance(Service.kind, "cloudsql-#{@name}")
end

def status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the same as deploy_succeeded? could therefore be refactored into trinary using that function.

deploy_succeeded? ? "Provisioned" : "Unknown"
end

def deploy_succeeded?
@service_exists && @deployment_exists
proxy_deployment_ready? && proxy_service_ready?
end

def deploy_failed?
false
end

def exists?
@found
end

def deploy_method
:replace
end

private

def cloudsql_proxy_deployment_exists?
deployment, _err, st = kubectl.run("get", "deployments", "cloudsql-#{cloudsql_resource_uuid}", "-o=json")

if st.success?
parsed = JSON.parse(deployment)

if parsed.fetch("status", {}).fetch("availableReplicas", -1) == parsed.fetch("status", {}).fetch("replicas", 0)
# all cloudsql-proxy pods are running
return true
end
end

false
def proxy_deployment_ready?
return false unless status = @proxy_deployment["status"]
# all cloudsql-proxy pods are running
status.fetch("availableReplicas", -1) == status.fetch("replicas", 0)
end

def mysql_service_exists?
service, _err, st = kubectl.run("get", "services", "cloudsql-#{@name}", "-o=json")

if st.success?
parsed = JSON.parse(service)

if parsed.dig("spec", "clusterIP").present?
# the service has an assigned cluster IP and is therefore functioning
return true
end
end

false
def proxy_service_ready?
return false unless @proxy_service.present?
# the service has an assigned cluster IP and is therefore functioning
@proxy_service.dig("spec", "clusterIP").present?
end

def cloudsql_resource_uuid
return @cloudsql_resource_uuid if defined?(@cloudsql_resource_uuid) && @cloudsql_resource_uuid

cloudsql, _err, st = kubectl.run("get", "cloudsqls", @name, "-o=json")
if st.success?
parsed = JSON.parse(cloudsql)

@cloudsql_resource_uuid = parsed.dig("metadata", "uid")
end
return unless @instance_data
@instance_data.dig("metadata", "uid")
end
end
end
Loading