From f0ca05175ba5b764e3fc37c4b12fe063046efc11 Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 8 Jan 2020 08:26:33 +0530 Subject: [PATCH] use ReentrantLock instead of Channel for locking Use the simpler `ReentrantLock` instead of `Channel` for locking in `YarnAppMaster`. --- src/Elly.jl | 3 --- src/api_yarn_appmaster.jl | 23 +++++------------------ 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/src/Elly.jl b/src/Elly.jl index 39f54e1..4e2164b 100644 --- a/src/Elly.jl +++ b/src/Elly.jl @@ -42,9 +42,6 @@ export YarnAppMaster, register, unregister, kill, can_schedule_mem, can_schedule export YarnManager, launch, manage -const Lock = Channel -makelock() = Channel{Int}(1) - include("hadoop/hadoop.jl") using Elly.hadoop using Elly.hadoop.common diff --git a/src/api_yarn_appmaster.jl b/src/api_yarn_appmaster.jl index e1e30c5..6855ac7 100644 --- a/src/api_yarn_appmaster.jl +++ b/src/api_yarn_appmaster.jl @@ -44,19 +44,17 @@ mutable struct YarnAppMaster response_id::Int32 # initial value must be 0, update with response_id sent from server on every response registration::Union{Nothing,RegisterApplicationMasterResponseProto} am_rm_task::Union{Nothing,Task} - lck::Lock + lck::ReentrantLock function YarnAppMaster(rmhost::AbstractString, rmport::Integer, ugi::UserGroupInformation=UserGroupInformation(), amhost::AbstractString="", amport::Integer=0, amurl::AbstractString="") amrm_conn = YarnAMRMProtocol(rmhost, rmport, ugi) - lck = makelock() - put!(lck, 1) new(amrm_conn, amhost, amport, amurl, Int32(0), Int32(0), Int32(0), Int32(0), YarnNodes(ugi), YarnContainers(), "", 0, - nothing, nothing, lck) + nothing, nothing, ReentrantLock()) end function YarnAppMaster(ugi::UserGroupInformation=UserGroupInformation()) @@ -83,17 +81,6 @@ function YarnAppMaster(fn::Function, ugi::UserGroupInformation=UserGroupInformat end end -function withlock(fn, yam) - try - take!(yam.lck) - return fn() - catch ex - rethrow(ex) - finally - put!(yam.lck, 1) - end -end - function show(io::IO, yam::YarnAppMaster) show(io, yam.amrm_conn) if yam.registration !== nothing @@ -137,7 +124,7 @@ function register(yam::YarnAppMaster) end !isempty(yam.tracking_url) && setproperty!(inp, :tracking_url, yam.tracking_url) - resp = withlock(yam) do + resp = lock(yam.lck) do registerApplicationMaster(yam.amrm_conn, inp) end yam.registration = resp @@ -170,7 +157,7 @@ function _unregister(yam::YarnAppMaster, finalstatus::Int32, diagnostics::Abstra !isempty(yam.tracking_url) && setproperty!(inp, :tracking_url, yam.tracking_url) !isempty(diagnostics) && setproperty!(inp, :diagnostics, diagnostics) - resp = withlock(yam) do + resp = lock(yam.lck) do finishApplicationMaster(yam.amrm_conn, inp) end resp.isUnregistered && (yam.registration = nothing) @@ -249,7 +236,7 @@ function _update_rm(yam::YarnAppMaster) setproperty!(inp, :response_id, yam.response_id) #@debug(inp) - resp = withlock(yam) do + resp = lock(yam.lck) do allocate_resp = allocate(yam.amrm_conn, inp) yam.response_id = allocate_resp.response_id # next response id must match this allocate_resp