Skip to content

Commit

Permalink
Merge pull request #47 from JuliaParallel/tan/yarn
Browse files Browse the repository at this point in the history
use simpler protobuff model construction
  • Loading branch information
tanmaykm committed Jan 7, 2020
2 parents 9aa868b + 880a14f commit 0b9e19d
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 97 deletions.
52 changes: 16 additions & 36 deletions src/api_hdfs_base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ function _walkdir(client::HDFSClient, path::AbstractString, process_entry::Funct
cont = true
start_after = UInt8[]
while cont
inp = protobuild(GetListingRequestProto, Dict(:src => path,
:startAfter => start_after,
:needLocation => false))
inp = GetListingRequestProto(src=path, startAfter=start_after, needLocation=false)
resp = getListing(client.nn_conn, inp)

if isfilled(resp, :dirList)
Expand All @@ -164,7 +162,7 @@ end

function _get_file_info(client::HDFSClient, path::AbstractString)
path = abspath(client, path)
inp = protobuild(GetFileInfoRequestProto, Dict(:src => path))
inp = GetFileInfoRequestProto(src=path)
resp = getFileInfo(client.nn_conn, inp)
isfilled(resp, :fs) ? resp.fs : nothing
end
Expand All @@ -174,9 +172,7 @@ function _get_block_locations(client::HDFSClient, path::AbstractString, offset::
if length == 0
length = UInt64(1024)
end
inp = protobuild(GetBlockLocationsRequestProto, Dict(:src => path,
:offset => offset,
:length => length))
inp = GetBlockLocationsRequestProto(src=path, offset=offset, length=length)
resp = getBlockLocations(client.nn_conn, inp)
isfilled(resp, :locations) ? resp.locations : nothing
end
Expand Down Expand Up @@ -269,7 +265,7 @@ end
hdfs_set_replication(file::HDFSFile, replication::Integer) = hdfs_set_replication(file.client, file.path, replication)
function hdfs_set_replication(client::HDFSClient, path::AbstractString, replication::Integer)
path = abspath(client, path)
inp = protobuild(SetReplicationRequestProto, Dict(:src => path, :replication => replication))
inp = SetReplicationRequestProto(src=path, replication=replication)

resp = setReplication(client.nn_conn, inp)
resp.result
Expand All @@ -279,7 +275,7 @@ end
# Disk Usage
function _get_content_summary(client::HDFSClient, path::AbstractString)
path = abspath(client, path)
inp = protobuild(GetContentSummaryRequestProto, Dict(:path => path))
inp = GetContentSummaryRequestProto(path=path)

resp = getContentSummary(client.nn_conn, inp)
resp.summary
Expand Down Expand Up @@ -310,10 +306,8 @@ mkdir(file::HDFSFile, mode::UInt32) = mkdir(file, false, mode)
mkdir(file::HDFSFile, create_parents::Bool=false, mode::UInt32=DEFAULT_FOLDER_MODE) = mkdir(file.client, file.path, create_parents, mode)
function mkdir(client::HDFSClient, path::AbstractString, create_parents::Bool=false, mode::UInt32=DEFAULT_FOLDER_MODE)
path = abspath(client, path)
perms = protobuild(FsPermissionProto, Dict(:perm => mode))
inp = protobuild(MkdirsRequestProto, Dict(:src => path,
:createParent => create_parents,
:masked => perms))
perms = FsPermissionProto(perm=mode)
inp = MkdirsRequestProto(src=path, createParent=create_parents, masked=perms)

resp = mkdirs(client.nn_conn, inp)
resp.result
Expand All @@ -326,9 +320,7 @@ mv(src::HDFSFile, dst::AbstractString, overwrite::Bool) = mv(src.client, src.pat
function mv(client::HDFSClient, src::AbstractString, dst::AbstractString, overwrite::Bool)
src = abspath(client, src)
dst = abspath(client, dst)
inp = protobuild(Rename2RequestProto, Dict(:src => src,
:dst => dst,
:overwriteDest => overwrite))
inp = Rename2RequestProto(src=src, dst=dst, overwriteDest=overwrite)

rename2(client.nn_conn, inp)
true
Expand All @@ -338,8 +330,7 @@ mv(src::HDFSFile, dst::AbstractString) = mv(src.client, src.path, dst)
function mv(client::HDFSClient, src::AbstractString, dst::AbstractString)
src = abspath(client, src)
dst = abspath(client, dst)
inp = protobuild(RenameRequestProto, Dict(:src => src,
:dst => dst))
inp = RenameRequestProto(src=src, dst=dst)

resp = rename(client.nn_conn, inp)
resp.result
Expand All @@ -348,8 +339,7 @@ end
rm(file::HDFSFile, recursive::Bool=false) = rm(file.client, file.path, recursive)
function rm(client::HDFSClient, path::AbstractString, recursive::Bool=false)
path = abspath(client, path)
inp = protobuild(DeleteRequestProto, Dict(:src => path,
:recursive => recursive))
inp = DeleteRequestProto(src=path, recursive=recursive)

resp = delete(client.nn_conn, inp)
resp.result
Expand All @@ -366,16 +356,10 @@ function _create_file(client::HDFSClient, path::AbstractString, overwrite::Bool=
(replication == 0) && (replication = defaults.replication)
end

perms = protobuild(FsPermissionProto, Dict(:perm => mode))
perms = FsPermissionProto(perm=mode)
createFlag = UInt32(overwrite ? CreateFlagProto.OVERWRITE : CreateFlagProto.CREATE)

inp = protobuild(CreateRequestProto, Dict(:src => path,
:masked => perms,
:clientName => ELLY_CLIENTNAME,
:createFlag => createFlag,
:createParent => false,
:replication => replication,
:blockSize => blocksz))
inp = CreateRequestProto(src=path, masked=perms, clientName=ELLY_CLIENTNAME, createFlag=createFlag, createParent=false, replication=replication, blockSize=blocksz)

resp = create(client.nn_conn, inp)
isfilled(resp, :fs) || (return nothing)
Expand All @@ -390,8 +374,7 @@ end
function _complete_file(client::HDFSClient, path::AbstractString, last::Union{Nothing,ExtendedBlockProto}=nothing)
path = abspath(client, path)

endinp = protobuild(CompleteRequestProto, Dict(:src => path,
:clientName => ELLY_CLIENTNAME))
endinp = CompleteRequestProto(src=path, clientName=ELLY_CLIENTNAME)
if last !== nothing
setproperty!(endinp, :last, last)
@debug("setting last block as", last)
Expand All @@ -409,8 +392,7 @@ end
function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Union{Nothing,T}=nothing) where T<:ExtendedBlockProto
path = abspath(client, path)

inp = protobuild(AddBlockRequestProto, Dict(:src => path,
:clientName => ELLY_CLIENTNAME))
inp = AddBlockRequestProto(src=path, clientName=ELLY_CLIENTNAME)
(previous === nothing) || setproperty!(inp, :previous, previous)

resp = addBlock(client.nn_conn, inp)
Expand All @@ -423,7 +405,7 @@ the namenode from assuming the client from having abandoned the file or some oth
from recovering the lease.
"""
function hdfs_renewlease(client::HDFSClient)
inp = protobuild(RenewLeaseRequestProto, Dict(:clientName => ELLY_CLIENTNAME))
inp = RenewLeaseRequestProto(clientName=ELLY_CLIENTNAME)
renewLease(client.nn_conn, inp)
nothing
end
Expand All @@ -433,9 +415,7 @@ function touch(client::HDFSClient, path::AbstractString, replication::UInt32=zer
if exists(client, path)
path = abspath(client, path)
t = UInt64(datetime2unix(now(UTC))*1000)
inp = protobuild(SetTimesRequestProto, Dict(:src => path,
:mtime => t,
:atime => t))
inp = SetTimesRequestProto(src=path, mtime=t, atime=t)

setTimes(client.nn_conn, inp)
else
Expand Down
8 changes: 4 additions & 4 deletions src/api_yarn_appmaster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ function can_schedule(yam::YarnAppMaster, restype::Int32)
end

function _unregister(yam::YarnAppMaster, finalstatus::Int32, diagnostics::AbstractString)
inp = protobuild(FinishApplicationMasterRequestProto, Dict(:final_application_status => finalstatus))
inp = FinishApplicationMasterRequestProto(final_application_status=finalstatus)
!isempty(yam.tracking_url) && setproperty!(inp, :tracking_url, yam.tracking_url)
!isempty(diagnostics) && setproperty!(inp, :diagnostics, diagnostics)

Expand All @@ -187,8 +187,8 @@ container_release(yam::YarnAppMaster, cids::ContainerIdProto...) = request_relea
container_start(yam::YarnAppMaster, cid::ContainerIdProto, container_spec::ContainerLaunchContextProto) = container_start(yam, yam.containers.containers[cid], container_spec)
function container_start(yam::YarnAppMaster, container::ContainerProto, container_spec::ContainerLaunchContextProto)
@debug("starting", container)
req = protobuild(StartContainerRequestProto, Dict(:container_launch_context => container_spec, :container_token => container.container_token))
inp = protobuild(StartContainersRequestProto, Dict(:start_container_request => [req]))
req = StartContainerRequestProto(container_launch_context=container_spec, container_token=container.container_token)
inp = StartContainersRequestProto(start_container_request=[req])

nodeid = container.nodeId
nm_conn = get_connection(yam.nodes, nodeid)
Expand All @@ -214,7 +214,7 @@ container_stop(yam::YarnAppMaster, cid::ContainerIdProto) = container_stop(yam,
function container_stop(yam::YarnAppMaster, container::ContainerProto)
@debug("stopping", container)

inp = protobuild(StopContainersRequestProto, Dict(:container_id => [container.id]))
inp = StopContainersRequestProto(container_id=[container.id])
nodeid = container.nodeId
nm_conn = get_connection(yam.nodes, nodeid)
success = false
Expand Down
9 changes: 3 additions & 6 deletions src/api_yarn_base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,9 @@ end
function request_alloc(containers::YarnContainers, numcontainers::Int;
mem::Integer=YARN_CONTAINER_MEM_DEFAULT, cpu::Integer=YARN_CONTAINER_CPU_DEFAULT,
loc::AbstractString=YARN_CONTAINER_LOCATION_DEFAULT, priority::Integer=YARN_CONTAINER_PRIORITY_DEFAULT)
prio = protobuild(PriorityProto, Dict(:priority => priority))
capability = protobuild(ResourceProto, Dict(:memory => mem, :virtual_cores => cpu))
req = protobuild(ResourceRequestProto, Dict(:priority => prio,
:resource_name => loc,
:num_containers => numcontainers,
:capability => capability))
prio = PriorityProto(priority=priority)
capability = ResourceProto(memory=mem, virtual_cores=cpu)
req = ResourceRequestProto(priority=prio, resource_name=loc, num_containers=numcontainers, capability=capability)
pending(containers.alloc_pipeline, req)
containers.ndesired += numcontainers
nothing
Expand Down
32 changes: 16 additions & 16 deletions src/api_yarn_client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -194,28 +194,28 @@ function submit(client::YarnClient, container_spec::ContainerLaunchContextProto,

appid, maxmem, maxcores = _new_app(client)

prio = protobuild(PriorityProto, Dict(:priority => priority))
res = protobuild(ResourceProto, Dict(:memory => mem, :virtual_cores => cores))
asc = protobuild(ApplicationSubmissionContextProto, Dict(:application_id => appid,
:application_name => appname,
:queue => queue,
:priority => prio,
:unmanaged_am => unmanaged,
:am_container_spec => container_spec,
:resource => res,
:maxAppAttempts => 2,
:applicationType => apptype,
:keep_containers_across_application_attempts => reuse))
prio = PriorityProto(priority=priority)
res = ResourceProto(memory=mem, virtual_cores=cores)
asc = ApplicationSubmissionContextProto(application_id = appid,
application_name = appname,
queue = queue,
priority = prio,
unmanaged_am = unmanaged,
am_container_spec = container_spec,
resource = res,
maxAppAttempts = 2,
applicationType = apptype,
keep_containers_across_application_attempts = reuse)

inp = protobuild(SubmitApplicationRequestProto, Dict(:application_submission_context => asc))
inp = SubmitApplicationRequestProto(application_submission_context=asc)

submitApplication(client.rm_conn, inp)
YarnApp(client, appid)
end

function kill(app::YarnApp)
client = app.client
inp = protobuild(KillApplicationRequestProto, Dict(:application_id => app.appid))
inp = KillApplicationRequestProto(application_id=app.appid)

resp = forceKillApplication(client.rm_conn, inp)
resp.is_kill_completed
Expand All @@ -224,7 +224,7 @@ end
function status(app::YarnApp, refresh::Bool=true)
if refresh || (app.status === nothing)
client = app.client
inp = protobuild(GetApplicationReportRequestProto, Dict(:application_id => app.appid))
inp = GetApplicationReportRequestProto(application_id=app.appid)

resp = getApplicationReport(client.rm_conn, inp)
app.status = isfilled(resp.application_report) ? YarnAppStatus(resp.application_report) : nothing
Expand Down Expand Up @@ -258,7 +258,7 @@ end
function attempts(app::YarnApp, refresh::Bool=true)
if refresh || isempty(app.attempts)
client = app.client
inp = protobuild(GetApplicationAttemptsRequestProto, Dict(:application_id => app.appid))
inp = GetApplicationAttemptsRequestProto(application_id=app.appid)

resp = getApplicationAttempts(client.rm_conn, inp)
atmptlist = app.attempts
Expand Down
58 changes: 29 additions & 29 deletions src/rpc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,17 @@ end

function buffer_connctx(channel::HadoopRpcChannel)
protocol = channel.protocol_attribs[:id]
connctx = protobuild(IpcConnectionContextProto, Dict(:userInfo => channel.ugi.userinfo, :protocol => protocol))
connctx = IpcConnectionContextProto(userInfo=channel.ugi.userinfo, protocol=protocol)

buffer_size_delimited(channel.iob, connctx)
end

function buffer_reqhdr(channel::HadoopRpcChannel, call_id::Int32)
hdr = protobuild(RpcRequestHeaderProto, Dict(:rpcKind => HRPC_PROTOBUFF_TYPE,
:rpcOp => HRPC_FINAL_PACKET,
:callId => call_id,
#:retryCount => -1,
:clientId => convert(Vector{UInt8}, codeunits(channel.clnt_id))))
hdr = RpcRequestHeaderProto(rpcKind = HRPC_PROTOBUFF_TYPE,
rpcOp = HRPC_FINAL_PACKET,
callId = call_id,
#retryCount = -1,
clientId = convert(Vector{UInt8}, codeunits(channel.clnt_id)))

buffer_size_delimited(channel.iob, hdr)
end
Expand All @@ -173,9 +173,9 @@ buffer_conctx_reqhdr(channel::HadoopRpcChannel) = (channel.sent_call_id = channe
function buffer_method_reqhdr(channel::HadoopRpcChannel, method::MethodDescriptor)
protocol = channel.protocol_attribs[:id]
protocol_ver = channel.protocol_attribs[:ver]
hdr = protobuild(RequestHeaderProto, Dict(:methodName => method.name,
:declaringClassProtocolName => protocol,
:clientProtocolVersion => protocol_ver))
hdr = RequestHeaderProto(methodName = method.name,
declaringClassProtocolName = protocol,
clientProtocolVersion = protocol_ver)

buffer_size_delimited(channel.iob, hdr)
end
Expand Down Expand Up @@ -480,17 +480,17 @@ function buffer_readblock(reader::HDFSBlockReader)
setproperty!(exblock, fld, get_field(block.b, fld))
end

basehdr = protobuild(BaseHeaderProto, Dict(:block => exblock, :token => token))
hdr = protobuild(ClientOperationHeaderProto, Dict(:baseHeader => basehdr, :clientName => ELLY_CLIENTNAME))
readblock = protobuild(OpReadBlockProto, Dict(:offset => offset, :len => len, :header => hdr))
basehdr = BaseHeaderProto(block=exblock, token=token)
hdr = ClientOperationHeaderProto(baseHeader=basehdr, clientName=ELLY_CLIENTNAME)
readblock = OpReadBlockProto(offset=offset, len=len, header=hdr)
@debug("sending block read message", offset, len)

buffer_size_delimited(channel.iob, readblock)
end

function buffer_client_read_status(reader::HDFSBlockReader, status::Int32)
channel = reader.channel
read_status = protobuild(ClientReadStatusProto, Dict(:status => status))
read_status = ClientReadStatusProto(status=status)
buffer_size_delimited(channel.iob, read_status)
end

Expand Down Expand Up @@ -972,24 +972,24 @@ function buffer_writeblock(writer::HDFSBlockWriter)
end
setproperty!(exblock, :numBytes, zero(UInt64))

basehdr = protobuild(BaseHeaderProto, Dict(:block => exblock, :token => token))
hdr = protobuild(ClientOperationHeaderProto, Dict(:baseHeader => basehdr, :clientName => ELLY_CLIENTNAME))
chksum = protobuild(ChecksumProto, Dict(:_type => defaults.checksumType, :bytesPerChecksum => defaults.bytesPerChecksum))
basehdr = BaseHeaderProto(block=exblock, token=token)
hdr = ClientOperationHeaderProto(baseHeader=basehdr, clientName=ELLY_CLIENTNAME)
chksum = ChecksumProto(_type=defaults.checksumType, bytesPerChecksum=defaults.bytesPerChecksum)

targets = DatanodeInfoProto[]
for loc in block.locs
(loc.id != writer.source_node.id) && push!(targets, loc)
end

writeblock = protobuild(OpWriteBlockProto, Dict(:header => hdr,
:targets => targets,
:source => writer.source_node,
:stage => OpWriteBlockProto_BlockConstructionStage.PIPELINE_SETUP_CREATE,
:pipelineSize => length(block.locs),
:minBytesRcvd => 0,
:maxBytesRcvd => 0,
:latestGenerationStamp => exblock.generationStamp,
:requestedChecksum => chksum))
writeblock = OpWriteBlockProto(header = hdr,
targets = targets,
source = writer.source_node,
stage = OpWriteBlockProto_BlockConstructionStage.PIPELINE_SETUP_CREATE,
pipelineSize = length(block.locs),
minBytesRcvd = 0,
maxBytesRcvd = 0,
latestGenerationStamp = exblock.generationStamp,
requestedChecksum = chksum)
@debug("sending block write message", offset=block.offset)

buffer_size_delimited(channel.iob, writeblock)
Expand Down Expand Up @@ -1051,10 +1051,10 @@ function prepare_packet(writer::HDFSBlockWriter)
seq_no = Int64(writer.pkt_seq += 1)
#@debug("packet seqno $seq_no with $(bytes_in_packet)/$(defaults.writePacketSize) bytes is last packet: $last_pkt")

pkt_hdr = protobuild(PacketHeaderProto, Dict(:offsetInBlock => writer.total_written,
:seqno => seq_no,
:lastPacketInBlock => last_pkt,
:dataLen => bytes_in_packet))
pkt_hdr = PacketHeaderProto(offsetInBlock = writer.total_written,
seqno = seq_no,
lastPacketInBlock = last_pkt,
dataLen = bytes_in_packet)

writer.total_written += bytes_in_packet

Expand Down
10 changes: 5 additions & 5 deletions src/sasl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ buffer_sasl_reqhdr(channel::HadoopRpcChannel) = (channel.sent_call_id = channel.

function buffer_sasl_message(channel::HadoopRpcChannel, state::Int32, auths::Vector{RpcSaslProto_SaslAuth}=RpcSaslProto_SaslAuth[], token::Vector{UInt8}=UInt8[])
@debug("buffer SASL message", state, nauths=length(auths), token=!isempty(token))
saslmsg = protobuild(RpcSaslProto, Dict(:version => 0, :state => state))
saslmsg = RpcSaslProto(version=0, state=state)
isempty(auths) || setproperty!(saslmsg, :auths, auths)
isempty(token) || setproperty!(saslmsg, :token, token)
buffer_size_delimited(channel.iob, saslmsg)
Expand Down Expand Up @@ -150,10 +150,10 @@ function sasl_auth(channel::HadoopRpcChannel, token::TokenProto)
@debug("response", response)

# send response as a sasl initiate request
respauth = protobuild(RpcSaslProto_SaslAuth, Dict(:method => auth.method,
:mechanism => auth.mechanism,
:protocol => auth.protocol,
:serverId => auth.serverId))
respauth = RpcSaslProto_SaslAuth(method = auth.method,
mechanism = auth.mechanism,
protocol = auth.protocol,
serverId = auth.serverId)

begin_send(channel)
buffer_sasl_reqhdr(channel)
Expand Down
Loading

0 comments on commit 0b9e19d

Please sign in to comment.