Skip to content

Commit

Permalink
use ReentrantLock instead of Channel for locking
Browse files Browse the repository at this point in the history
Use the simpler `ReentrantLock` instead of `Channel` for locking in `YarnAppMaster`.
  • Loading branch information
tanmaykm committed Jan 8, 2020
1 parent bfe61a3 commit f0ca051
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 21 deletions.
3 changes: 0 additions & 3 deletions src/Elly.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ export YarnAppMaster, register, unregister, kill, can_schedule_mem, can_schedule

export YarnManager, launch, manage

const Lock = Channel
makelock() = Channel{Int}(1)

include("hadoop/hadoop.jl")
using Elly.hadoop
using Elly.hadoop.common
Expand Down
23 changes: 5 additions & 18 deletions src/api_yarn_appmaster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,17 @@ mutable struct YarnAppMaster
response_id::Int32 # initial value must be 0, update with response_id sent from server on every response
registration::Union{Nothing,RegisterApplicationMasterResponseProto}
am_rm_task::Union{Nothing,Task}
lck::Lock
lck::ReentrantLock

function YarnAppMaster(rmhost::AbstractString, rmport::Integer, ugi::UserGroupInformation=UserGroupInformation(),
amhost::AbstractString="", amport::Integer=0, amurl::AbstractString="")
amrm_conn = YarnAMRMProtocol(rmhost, rmport, ugi)
lck = makelock()
put!(lck, 1)

new(amrm_conn, amhost, amport, amurl,
Int32(0), Int32(0), Int32(0), Int32(0),
YarnNodes(ugi), YarnContainers(),
"", 0,
nothing, nothing, lck)
nothing, nothing, ReentrantLock())
end

function YarnAppMaster(ugi::UserGroupInformation=UserGroupInformation())
Expand All @@ -83,17 +81,6 @@ function YarnAppMaster(fn::Function, ugi::UserGroupInformation=UserGroupInformat
end
end

function withlock(fn, yam)
try
take!(yam.lck)
return fn()
catch ex
rethrow(ex)
finally
put!(yam.lck, 1)
end
end

function show(io::IO, yam::YarnAppMaster)
show(io, yam.amrm_conn)
if yam.registration !== nothing
Expand Down Expand Up @@ -137,7 +124,7 @@ function register(yam::YarnAppMaster)
end
!isempty(yam.tracking_url) && setproperty!(inp, :tracking_url, yam.tracking_url)

resp = withlock(yam) do
resp = lock(yam.lck) do
registerApplicationMaster(yam.amrm_conn, inp)
end
yam.registration = resp
Expand Down Expand Up @@ -170,7 +157,7 @@ function _unregister(yam::YarnAppMaster, finalstatus::Int32, diagnostics::Abstra
!isempty(yam.tracking_url) && setproperty!(inp, :tracking_url, yam.tracking_url)
!isempty(diagnostics) && setproperty!(inp, :diagnostics, diagnostics)

resp = withlock(yam) do
resp = lock(yam.lck) do
finishApplicationMaster(yam.amrm_conn, inp)
end
resp.isUnregistered && (yam.registration = nothing)
Expand Down Expand Up @@ -249,7 +236,7 @@ function _update_rm(yam::YarnAppMaster)
setproperty!(inp, :response_id, yam.response_id)

#@debug(inp)
resp = withlock(yam) do
resp = lock(yam.lck) do
allocate_resp = allocate(yam.amrm_conn, inp)
yam.response_id = allocate_resp.response_id # next response id must match this
allocate_resp
Expand Down

0 comments on commit f0ca051

Please sign in to comment.