Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements for cluster creation and management #54

Merged
merged 15 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Banyan/src/Banyan.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ export Cluster,
create_cluster,
update_cluster,
destroy_cluster,
delete_cluster,
get_clusters,
get_running_clusters,
get_cluster,
get_cluster_status,
get_cluster_s3_bucket_name,
assert_cluster_is_ready
assert_cluster_is_ready,
wait_for_cluster

# Job management
export Job,
Expand Down
1 change: 1 addition & 0 deletions Banyan/src/cluster.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
struct Cluster
name::String
status::Symbol
status_explanation::String
num_jobs_running::Int32
s3_bucket_arn::String
end
100 changes: 80 additions & 20 deletions Banyan/src/clusters.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,23 @@
function create_cluster(;
name::Union{String,Nothing} = nothing,
instance_type::Union{String,Nothing} = "m4.4xlarge",
max_num_nodes::Union{Int,Nothing} = 8,
max_num_workers::Union{Int,Nothing} = 2048,
initial_num_workers::Union{Int,Nothing} = 16,
min_num_workers::Union{Int,Nothing} = 0,
iam_policy_arn::Union{String,Nothing} = nothing,
s3_bucket_arn::Union{String,Nothing} = nothing,
s3_bucket_name::Union{String,Nothing} = nothing,
scaledown_time = 25,
cailinw marked this conversation as resolved.
Show resolved Hide resolved
ec2_key_pair_name = nothing,
vpc_id = nothing,
subnet_id = nothing,
nowait=false,
kwargs...,
)

# Configure using parameters
c = configure(; kwargs...)

clusters = get_clusters(; kwargs...)
if isnothing(name)
name = "Cluster " * string(length(clusters) + 1)
Expand All @@ -18,47 +27,50 @@ function create_cluster(;
# Check if the configuration for this cluster name already exists
# If it does, then recreate cluster
if haskey(clusters, name)
if clusters[name][status] == "terminated"
if clusters[name].status == :terminated
@warn "Cluster configuration with name $name already exists. Ignoring new configuration and re-creating cluster."
send_request_get_response(
:create_cluster,
Dict("cluster_name" => name, "recreate" => true),
)
return
if !nowait
wait_for_cluster(name)
end
return get_cluster(name)
else
error("Cluster with name $name already exists")
error("Cluster with name $name already exists and has status $(string(clusters[name].status))")
end
end

# Construct arguments

# Configure using parameters
c = configure(; require_ec2_key_pair_name = true, kwargs...)

if isnothing(s3_bucket_arn) && isnothing(s3_bucket_name)
s3_bucket_arn =
"arn:aws:s3:::banyan-cluster-data-" * name * "-" * bytes2hex(rand(UInt8, 4))
s3_bucket_name = last(split(s3_bucket_arn, ":"))
s3_create_bucket(get_aws_config(), s3_bucket_name)
elseif isnothing(s3_bucket_arn)
if !isnothing(s3_bucket_name)
s3_bucket_arn = "arn:aws:s3:::$s3_bucket_name*"
elseif isnothing(s3_bucket_name)
elseif !isnothing(s3_bucket_arn)
s3_bucket_name = last(split(s3_bucket_arn, ":"))
end
if !(s3_bucket_name in s3_list_buckets(get_aws_config()))
if isnothing(s3_bucket_arn)
s3_bucket_arn = ""
elseif !(s3_bucket_name in s3_list_buckets(get_aws_config()))
error("Bucket $s3_bucket_name does not exist in connected AWS account")
end

# Construct cluster creation
cluster_config = Dict(
"cluster_name" => name,
"instance_type" => instance_type,
"num_nodes" => max_num_nodes,
"ec2_key_pair" => c["aws"]["ec2_key_pair_name"],
"max_num_workers" => max_num_workers,
"initial_num_workers" => initial_num_workers,
"min_num_workers" => min_num_workers,
"aws_region" => get_aws_config_region(),
"s3_read_write_resource" => s3_bucket_arn,
"scaledown_time" => scaledown_time,
"recreate" => false,
)
if !isnothing(ec2_key_pair_name)
cluster_config["ec2_key_pair"] = ec2_key_pair_name
elseif haskey(c["aws"], "ec2_key_pair_name")
cluster_config["ec2_key_pair"] = c["aws"]["ec2_key_pair_name"]
end
if !isnothing(iam_policy_arn)
cluster_config["additional_policy"] = iam_policy_arn
end
Expand All @@ -74,7 +86,11 @@ function create_cluster(;
# Send request to create cluster
send_request_get_response(:create_cluster, cluster_config)

return Cluster(name, :creating, 0, s3_bucket_arn)
if !nowait
wait_for_cluster(name)
end

return Cluster(name, get_cluster_status(name), "", 0, s3_bucket_arn)
end

function destroy_cluster(name::String; kwargs...)
Expand All @@ -92,6 +108,15 @@ function delete_cluster(name::String; kwargs...)
)
end

function update_cluster(name::String; kwargs...)
configure(; kwargs...)
@debug "Updating cluster"
send_request_get_response(
:update_cluster,
Dict{String, Any}("cluster_name" => name)
)
end

function assert_cluster_is_ready(name::String; kwargs...)
@info "Setting cluster status to running"

Expand All @@ -104,6 +129,7 @@ end
struct Cluster
name::String
status::Symbol
status_explanation::String
num_jobs_running::Int32
s3_bucket_arn::String
end
Expand Down Expand Up @@ -140,6 +166,7 @@ function get_clusters(; kwargs...)
name => Cluster(
name,
parsestatus(c["status"]),
haskey(c, "status_explanation") ? c["status_explanation"] : "",
c["num_jobs"],
c["s3_read_write_resource"],
) for (name, c) in response["clusters"]
Expand All @@ -154,4 +181,37 @@ end

get_cluster(name::String=get_cluster_name(), kwargs...) = get_clusters(; kwargs...)[name]

get_running_clusters(args...; kwargs...) = filter(entry -> entry[2].status == :running, get_clusters(args...; kwargs...))
get_running_clusters(args...; kwargs...) = filter(entry -> entry[2].status == :running, get_clusters(args...; kwargs...))

function get_cluster_status(name::String=get_cluster_name(), kwargs...)
c = get_clusters(; kwargs...)[name]
if c.status == :failed
@info c.status_explanation
end
c.status
end

function wait_for_cluster(name::String=get_cluster_name(), kwargs...)
t = 5
cluster_status = get_cluster_status(name; kwargs...)
while (cluster_status == :creating || cluster_status == :updating)
if cluster_status == :creating
@info "Cluster $(name) is getting set up"
else
@info "Cluster $(name) is updating"
end
sleep(t)
if t < 80
t *= 2
end
cluster_status = get_cluster_status(name; kwargs...)
end
if cluster_status == :running
@info "Cluster $(name) is running and ready for jobs"
elseif cluster_status == :terminated
@info "Cluster $(name) no longer exists"
elseif cluster_status != :creating && cluster_status != :updating
@info "Cluster $(name) setup has failed"
# delete_cluster(name)
end
end
4 changes: 3 additions & 1 deletion Banyan/src/jobs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ get_cluster_name() = get_job().cluster_name

function create_job(;
cluster_name::Union{String,Nothing} = nothing,
nworkers::Union{Integer,Nothing} = 2,
nworkers::Union{Integer,Nothing} = 16,
print_logs::Union{Bool,Nothing} = false,
store_logs_in_s3::Union{Bool,Nothing} = true,
store_logs_on_cluster::Union{Bool,Nothing} = false,
Expand Down Expand Up @@ -174,6 +174,8 @@ function create_job(;
jobs[current_job_id] = Job(cluster_name, current_job_id, nworkers, sample_rate)
jobs[current_job_id].current_status = "running"

wait_for_cluster(cluster_name)

@debug "Finished creating job $job_id"
return job_id
end
Expand Down
10 changes: 3 additions & 7 deletions Banyan/src/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ function configure(; kwargs...)
user_id = if_in_or(:user_id, kwargs)
api_key = if_in_or(:api_key, kwargs)
ec2_key_pair_name = if_in_or(:ec2_key_pair_name, kwargs)
require_ec2_key_pair_name =
if_in_or(:require_ec2_key_pair_name, kwargs, false)
banyanconfig_path = if_in_or(:banyanconfig_path, kwargs)

# Load config
Expand Down Expand Up @@ -214,10 +212,6 @@ function configure(; kwargs...)
banyan_config["aws"]["ec2_key_pair_name"] = ec2_key_pair_name
is_modified = true
end
if require_ec2_key_pair_name &&
!(haskey(banyan_config["aws"], "ec2_key_pair_name"))
error("Name of an EC2 key pair required but not provided; visit here to create a key pair: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#having-ec2-create-your-key-pair")
end

# # aws.region
# if !isnothing(region) && (
Expand Down Expand Up @@ -358,11 +352,13 @@ function send_request_get_response(method, content::Dict)
)
if resp.status == 403
throw(ErrorException("Please use a valid user ID and API key. Sign into the dashboard to retrieve these credentials."))
elseif resp.status == 500 || resp.status == 504
elseif resp.status == 504
# HTTP request timed out, for example
if isa(data, Dict) && haskey(data, "message")
data = data["message"]
end
@info data
elseif resp.status == 500 || resp.status == 504
throw(ErrorException(data))
elseif resp.status == 502
throw(ErrorException("Sorry there has been an error. Please contact support"))
Expand Down
Loading