Skip to content

Commit

Permalink
Add debug statements
Browse files Browse the repository at this point in the history
  • Loading branch information
cleahwin authored and calebwin committed Sep 16, 2022
1 parent 869e95b commit 5b09749
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 14 deletions.
16 changes: 12 additions & 4 deletions Banyan/src/processes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ function create_process(process_name, script; cron_schedule = "rate(24 hours)",
if is_debug_on()
res_code = res_code * "set_banyan_api_endpoint(\"$(BANYAN_API_ENDPOINT)\")\n"
end
res_code = res_code * "ENV[\"JULIA_DEBUG\"] = $(is_debug_on())\n"
julia_debug = get(ENV, "JULIA_DEBUG", "")
res_code = res_code * "ENV[\"JULIA_DEBUG\"] = $julia_debug\n"
res_code = res_code * "self_session_id = get_session_id()\n"

# Generate code to configure with Banyan credentials
Expand Down Expand Up @@ -275,7 +276,7 @@ function create_process(process_name, script; cron_schedule = "rate(24 hours)",
"aws_region" => aws_region
),
)

nothing
end


Expand Down Expand Up @@ -331,8 +332,15 @@ function run_process(process_name, args)
),
)
session_id = response["session_id"]
wait_for_session(session_id)
get_session_results(session_id)
try
wait_for_session(session_id)
get_session_results(session_id)
catch
# If an error happens while trying to wait for the session, we need to end the session
end_session(session_id)
@warn "Please check the dashboard in case this process started a session that has not ended."
rethrow()
end
end

function start_process(process_name, cron_schedule="daily")
Expand Down
4 changes: 2 additions & 2 deletions Banyan/src/queues.jl
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ function sqs_send_message(queue_url, message)
)
end

function send_to_client(value_id::ValueId, value, worker_memory_used = 0)
function send_to_client(value_id::ValueId, value, worker_memory_used = 0; gather_queue_url="")
MAX_MESSAGE_LENGTH = 220_000
message = to_jl_string(value)::String
generated_message_id = generate_message_id()
Expand Down Expand Up @@ -161,7 +161,7 @@ function send_to_client(value_id::ValueId, value, worker_memory_used = 0)
end

# Launch asynchronous threads to send SQS messages
gather_q_url = gather_queue_url()
gather_q_url = isempty(gather_queue_url) ? gather_queue_url() : gather_queue_url
num_chunks = length(message_ranges)
if num_chunks > 1
Threads.@threads for i = 1:num_chunks
Expand Down
24 changes: 20 additions & 4 deletions Banyan/src/sessions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ function _get_session_id_no_error()::SessionId
global sessions
!haskey(sessions, current_session_id) ? "" : current_session_id
end

has_session_id() = !isempty(_get_session_id_no_error())
session_id
function has_session_id(session_id=_get_session_id_no_error())
global sessions
global start_session_tasks
!isempty(session_id) && (haskey(sessions, session_id) || haskey(start_session_tasks, session_id))
end

function get_session_id(session_id="")::SessionId
global current_session_id
Expand Down Expand Up @@ -499,6 +503,8 @@ function start_session(;
configure(; kwargs...)
nworkers = nworkers == -1 ? (is_debug_on() ? 2 : 150) : nworkers
configure_sampling(; nworkers=nworkers, kwargs...)

@show stacktrace()

# Create task for starting session
new_start_session_task_id = "start-session-$(length(start_session_tasks) + 1)"
Expand Down Expand Up @@ -612,23 +618,29 @@ function end_session(session_id::SessionId = ""; print_logs=nothing, failed = fa
end

# Print logs if needed
@show print_logs
if print_logs
print_session_logs(session_id, cluster_name, wait=true)
end

# Remove from global state
set_session("")
@show session_id
delete!(sessions, session_id)
@show destroy_cluster

# Destroy cluster if desired
if destroy_cluster
@show resp
if isnothing(resp) || !haskey(resp, "cluster_name")
@warn "Unable to destroy cluster for session with ID $session_id"
else
destroy_cluster(resp["cluster_name"])
end
end

@show session_id

session_id
end

Expand Down Expand Up @@ -739,8 +751,11 @@ end

function print_session_logs(session_id, cluster_name; delete_from_s3=false, wait=false, kwargs...)
configure(; kwargs...)
@show session_id
session_id = get_session_id(session_id)
@show cluster_name
s3_bucket_name = get_cluster_s3_bucket_name(cluster_name)
@show s3_bucket_name
log_file_name = "banyan-log-for-session-$(session_id)"
logs::String = ""
p::ProgressUnknown = ProgressUnknown("Waiting for logs for session with ID $session_id")
Expand Down Expand Up @@ -848,15 +863,16 @@ function _wait_for_session(session_id::SessionId, show_progress; kwargs...)
session_status = get_session_status(session_id; kwargs...)
end
if p.enabled
finish!(p, spinner = session_status == "running" ? '' : '')
finish!(p, spinner = (session_status == "running" || session_status == "completed") ? '' : '')
end
if session_status == "running"
@debug "Session with ID $session_id is ready"
if haskey(sessions_dict, session_id)
sessions_dict[session_id].is_session_ready = true
end
elseif session_status == "completed"
error("Session with ID $session_id has already completed")
# error("Session with ID $session_id has already completed")
session_status_tuple[2]
elseif session_status == "failed"
error("Session with ID $session_id has failed.")
else
Expand Down
6 changes: 2 additions & 4 deletions Banyan/src/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,9 @@ method_to_string(method::Symbol)::String = begin
elseif method == :describe_processes
"describe-processes"
elseif method == :run_process
"run_process"
"run-process"
elseif method == :stop_process
"stop_process"


"stop-process"
end
end

Expand Down

0 comments on commit 5b09749

Please sign in to comment.