diff --git a/Banyan/src/processes.jl b/Banyan/src/processes.jl index 15c26cca..1d0c1f6b 100644 --- a/Banyan/src/processes.jl +++ b/Banyan/src/processes.jl @@ -229,7 +229,8 @@ function create_process(process_name, script; cron_schedule = "rate(24 hours)", if is_debug_on() res_code = res_code * "set_banyan_api_endpoint(\"$(BANYAN_API_ENDPOINT)\")\n" end - res_code = res_code * "ENV[\"JULIA_DEBUG\"] = $(is_debug_on())\n" + julia_debug = get(ENV, "JULIA_DEBUG", "") + res_code = res_code * "ENV[\"JULIA_DEBUG\"] = $julia_debug\n" res_code = res_code * "self_session_id = get_session_id()\n" # Generate code to configure with Banyan credentials @@ -275,7 +276,7 @@ function create_process(process_name, script; cron_schedule = "rate(24 hours)", "aws_region" => aws_region ), ) - + nothing end @@ -331,8 +332,15 @@ function run_process(process_name, args) ), ) session_id = response["session_id"] - wait_for_session(session_id) - get_session_results(session_id) + try + wait_for_session(session_id) + get_session_results(session_id) + catch + # If an error happens while trying to wait for the session, we need to end the session + end_session(session_id) + @warn "Please check the dashboard in case this process started a session that has not ended." + rethrow() + end end function start_process(process_name, cron_schedule="daily") diff --git a/Banyan/src/queues.jl b/Banyan/src/queues.jl index 417ff789..e8ed44be 100644 --- a/Banyan/src/queues.jl +++ b/Banyan/src/queues.jl @@ -133,7 +133,7 @@ function sqs_send_message(queue_url, message) ) end -function send_to_client(value_id::ValueId, value, worker_memory_used = 0) +function send_to_client(value_id::ValueId, value, worker_memory_used = 0; gather_queue_url="") MAX_MESSAGE_LENGTH = 220_000 message = to_jl_string(value)::String generated_message_id = generate_message_id() @@ -161,7 +161,7 @@ function send_to_client(value_id::ValueId, value, worker_memory_used = 0) end # Launch asynchronous threads to send SQS messages - gather_q_url = gather_queue_url() + gather_q_url = isempty(gather_queue_url) ? gather_queue_url() : gather_queue_url num_chunks = length(message_ranges) if num_chunks > 1 Threads.@threads for i = 1:num_chunks diff --git a/Banyan/src/sessions.jl b/Banyan/src/sessions.jl index ff184378..da7da6ca 100644 --- a/Banyan/src/sessions.jl +++ b/Banyan/src/sessions.jl @@ -29,8 +29,12 @@ function _get_session_id_no_error()::SessionId global sessions !haskey(sessions, current_session_id) ? "" : current_session_id end - -has_session_id() = !isempty(_get_session_id_no_error()) +session_id +function has_session_id(session_id=_get_session_id_no_error()) + global sessions + global start_session_tasks + !isempty(session_id) && (haskey(sessions, session_id) || haskey(start_session_tasks, session_id)) +end function get_session_id(session_id="")::SessionId global current_session_id @@ -499,6 +503,8 @@ function start_session(; configure(; kwargs...) nworkers = nworkers == -1 ? (is_debug_on() ? 2 : 150) : nworkers configure_sampling(; nworkers=nworkers, kwargs...) + + @show stacktrace() # Create task for starting session new_start_session_task_id = "start-session-$(length(start_session_tasks) + 1)" @@ -612,16 +618,20 @@ function end_session(session_id::SessionId = ""; print_logs=nothing, failed = fa end # Print logs if needed + @show print_logs if print_logs print_session_logs(session_id, cluster_name, wait=true) end # Remove from global state set_session("") + @show session_id delete!(sessions, session_id) + @show destroy_cluster # Destroy cluster if desired if destroy_cluster + @show resp if isnothing(resp) || !haskey(resp, "cluster_name") @warn "Unable to destroy cluster for session with ID $session_id" else @@ -629,6 +639,8 @@ function end_session(session_id::SessionId = ""; print_logs=nothing, failed = fa end end + @show session_id + session_id end @@ -739,8 +751,11 @@ end function print_session_logs(session_id, cluster_name; delete_from_s3=false, wait=false, kwargs...) configure(; kwargs...) + @show session_id session_id = get_session_id(session_id) + @show cluster_name s3_bucket_name = get_cluster_s3_bucket_name(cluster_name) + @show s3_bucket_name log_file_name = "banyan-log-for-session-$(session_id)" logs::String = "" p::ProgressUnknown = ProgressUnknown("Waiting for logs for session with ID $session_id") @@ -848,7 +863,7 @@ function _wait_for_session(session_id::SessionId, show_progress; kwargs...) session_status = get_session_status(session_id; kwargs...) end if p.enabled - finish!(p, spinner = session_status == "running" ? '✓' : '✗') + finish!(p, spinner = (session_status == "running" || session_status == "completed") ? '✓' : '✗') end if session_status == "running" @debug "Session with ID $session_id is ready" @@ -856,7 +871,8 @@ function _wait_for_session(session_id::SessionId, show_progress; kwargs...) sessions_dict[session_id].is_session_ready = true end elseif session_status == "completed" - error("Session with ID $session_id has already completed") + # error("Session with ID $session_id has already completed") + session_status_tuple[2] elseif session_status == "failed" error("Session with ID $session_id has failed.") else diff --git a/Banyan/src/utils.jl b/Banyan/src/utils.jl index 067f518a..dab4c9a1 100644 --- a/Banyan/src/utils.jl +++ b/Banyan/src/utils.jl @@ -264,11 +264,9 @@ method_to_string(method::Symbol)::String = begin elseif method == :describe_processes "describe-processes" elseif method == :run_process - "run_process" + "run-process" elseif method == :stop_process - "stop_process" - - + "stop-process" end end