Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update diego sync to not fetch every bit of every object, only completely fetch objects that need syncing #3503

Merged
merged 7 commits into from
Nov 6, 2023
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
93 changes: 66 additions & 27 deletions lib/cloud_controller/diego/processes_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,24 @@ def sync
@bump_freshness = true
diego_lrps = bbs_apps_client.fetch_scheduling_infos.index_by { |d| d.desired_lrp_key.process_guid }
logger.info('fetched-scheduling-infos')

batched_processes do |processes|
to_desire = []
to_update = {}
batched_processes_for_sync do |processes|
processes.each do |process|
process_guid = ProcessGuid.from_process(process)
diego_lrp = diego_lrps.delete(process_guid)

if diego_lrp.nil?
workpool.submit(process) do |p|
logger.info('desiring-lrp', process_guid: p.guid, app_guid: p.app_guid)
bbs_apps_client.desire_app(p)
logger.info('desire-lrp', process_guid: p.guid)
end
to_desire.append(process.id)
elsif process.updated_at.to_f.to_s != diego_lrp.annotation
workpool.submit(process, diego_lrp) do |p, l|
logger.info('updating-lrp', process_guid: p.guid, app_guid: p.app_guid)
bbs_apps_client.update_app(p, l)
logger.info('update-lrp', process_guid: p.guid)
end
to_update[process.id] = diego_lrp
end
end
end

diego_lrps.each_key do |process_guid_to_delete|
workpool.submit(process_guid_to_delete) do |guid|
logger.info('deleting-lrp', process_guid: guid)
bbs_apps_client.stop_app(guid)
logger.info('delete-lrp', process_guid: guid)
end
end
update_processes(to_update)
philippthun marked this conversation as resolved.
Show resolved Hide resolved
desire_processes(to_desire)
philippthun marked this conversation as resolved.
Show resolved Hide resolved
delete_lrps(diego_lrps)

workpool.drain

Expand Down Expand Up @@ -95,31 +84,69 @@ def process_workpool_exceptions(exceptions)
@statsd_updater.update_synced_invalid_lrps(invalid_lrps)
end

def update_processes(to_update)
philippthun marked this conversation as resolved.
Show resolved Hide resolved
batched_processes(to_update.keys) do |processes|
processes.each do |process|
workpool.submit(process, to_update[process.id]) do |p, l|
logger.info('updating-lrp', process_guid: p.guid, app_guid: p.app_guid)
bbs_apps_client.update_app(p, l)
logger.info('update-lrp', process_guid: p.guid)
end
end
end
end

def desire_processes(to_create)
philippthun marked this conversation as resolved.
Show resolved Hide resolved
batched_processes(to_create) do |processes|
philippthun marked this conversation as resolved.
Show resolved Hide resolved
processes.each do |process|
workpool.submit(process) do |p|
logger.info('desiring-lrp', process_guid: p.guid, app_guid: p.app_guid)
bbs_apps_client.desire_app(p)
logger.info('desire-lrp', process_guid: p.guid)
end
end
end
end

def delete_lrps(to_delete)
to_delete.each_key do |process_guid_to_delete|
workpool.submit(process_guid_to_delete) do |guid|
logger.info('deleting-lrp', process_guid: guid)
bbs_apps_client.stop_app(guid)
logger.info('delete-lrp', process_guid: guid)
end
end
end

def formatted_backtrace_from_error(error)
error.backtrace.present? ? error.backtrace.join("\n") + "\n..." : ''
end

def batched_processes
def batched_processes_for_sync
last_id = 0

loop do
processes = processes(last_id).all
processes = processes_for_sync(last_id).all
yield processes
return if processes.count < BATCH_SIZE

last_id = processes.last.id
end
end

def processes(last_id)
def batched_processes(ids)
ids.each_slice(BATCH_SIZE) do |id_chunk|
processes = processes(id_chunk).all
yield processes
end
end

def processes(ids)
processes = ProcessModel.
diego.
runnable.
where(Sequel.lit("#{ProcessModel.table_name}.id > ?", last_id)).
order("#{ProcessModel.table_name}__id".to_sym).
eager(:desired_droplet, :space, :service_bindings, { routes: :domain }, { app: :buildpack_lifecycle_data }).
limit(BATCH_SIZE)

where(Sequel.lit("#{ProcessModel.table_name}.id IN ?", ids)).
eager(:desired_droplet, :space, :service_bindings, { routes: :domain }, { app: :buildpack_lifecycle_data })
if FeatureFlag.enabled?(:diego_docker)
processes.select_all(ProcessModel.table_name)
else
Expand All @@ -128,6 +155,18 @@ def processes(last_id)
end
end

def processes_for_sync(last_id)
processes = ProcessModel.
diego.
runnable.
where(Sequel.lit("#{ProcessModel.table_name}.id > ?", last_id)).
order("#{ProcessModel.table_name}__id".to_sym).
limit(BATCH_SIZE)

processes.select("#{ProcessModel.table_name}__id".to_sym, "#{ProcessModel.table_name}__guid".to_sym, "#{ProcessModel.table_name}__version".to_sym,
"#{ProcessModel.table_name}__updated_at".to_sym)
end

def bbs_apps_client
CloudController::DependencyLocator.instance.bbs_apps_client
end
Expand Down