diff --git a/src/api_hdfs_base.jl b/src/api_hdfs_base.jl index 2a9e842..5569224 100644 --- a/src/api_hdfs_base.jl +++ b/src/api_hdfs_base.jl @@ -135,9 +135,7 @@ function _walkdir(client::HDFSClient, path::AbstractString, process_entry::Funct cont = true start_after = UInt8[] while cont - inp = protobuild(GetListingRequestProto, Dict(:src => path, - :startAfter => start_after, - :needLocation => false)) + inp = GetListingRequestProto(src=path, startAfter=start_after, needLocation=false) resp = getListing(client.nn_conn, inp) if isfilled(resp, :dirList) @@ -164,7 +162,7 @@ end function _get_file_info(client::HDFSClient, path::AbstractString) path = abspath(client, path) - inp = protobuild(GetFileInfoRequestProto, Dict(:src => path)) + inp = GetFileInfoRequestProto(src=path) resp = getFileInfo(client.nn_conn, inp) isfilled(resp, :fs) ? resp.fs : nothing end @@ -174,9 +172,7 @@ function _get_block_locations(client::HDFSClient, path::AbstractString, offset:: if length == 0 length = UInt64(1024) end - inp = protobuild(GetBlockLocationsRequestProto, Dict(:src => path, - :offset => offset, - :length => length)) + inp = GetBlockLocationsRequestProto(src=path, offset=offset, length=length) resp = getBlockLocations(client.nn_conn, inp) isfilled(resp, :locations) ? resp.locations : nothing end @@ -269,7 +265,7 @@ end hdfs_set_replication(file::HDFSFile, replication::Integer) = hdfs_set_replication(file.client, file.path, replication) function hdfs_set_replication(client::HDFSClient, path::AbstractString, replication::Integer) path = abspath(client, path) - inp = protobuild(SetReplicationRequestProto, Dict(:src => path, :replication => replication)) + inp = SetReplicationRequestProto(src=path, replication=replication) resp = setReplication(client.nn_conn, inp) resp.result @@ -279,7 +275,7 @@ end # Disk Usage function _get_content_summary(client::HDFSClient, path::AbstractString) path = abspath(client, path) - inp = protobuild(GetContentSummaryRequestProto, Dict(:path => path)) + inp = GetContentSummaryRequestProto(path=path) resp = getContentSummary(client.nn_conn, inp) resp.summary @@ -310,10 +306,8 @@ mkdir(file::HDFSFile, mode::UInt32) = mkdir(file, false, mode) mkdir(file::HDFSFile, create_parents::Bool=false, mode::UInt32=DEFAULT_FOLDER_MODE) = mkdir(file.client, file.path, create_parents, mode) function mkdir(client::HDFSClient, path::AbstractString, create_parents::Bool=false, mode::UInt32=DEFAULT_FOLDER_MODE) path = abspath(client, path) - perms = protobuild(FsPermissionProto, Dict(:perm => mode)) - inp = protobuild(MkdirsRequestProto, Dict(:src => path, - :createParent => create_parents, - :masked => perms)) + perms = FsPermissionProto(perm=mode) + inp = MkdirsRequestProto(src=path, createParent=create_parents, masked=perms) resp = mkdirs(client.nn_conn, inp) resp.result @@ -326,9 +320,7 @@ mv(src::HDFSFile, dst::AbstractString, overwrite::Bool) = mv(src.client, src.pat function mv(client::HDFSClient, src::AbstractString, dst::AbstractString, overwrite::Bool) src = abspath(client, src) dst = abspath(client, dst) - inp = protobuild(Rename2RequestProto, Dict(:src => src, - :dst => dst, - :overwriteDest => overwrite)) + inp = Rename2RequestProto(src=src, dst=dst, overwriteDest=overwrite) rename2(client.nn_conn, inp) true @@ -338,8 +330,7 @@ mv(src::HDFSFile, dst::AbstractString) = mv(src.client, src.path, dst) function mv(client::HDFSClient, src::AbstractString, dst::AbstractString) src = abspath(client, src) dst = abspath(client, dst) - inp = protobuild(RenameRequestProto, Dict(:src => src, - :dst => dst)) + inp = RenameRequestProto(src=src, dst=dst) resp = rename(client.nn_conn, inp) resp.result @@ -348,8 +339,7 @@ end rm(file::HDFSFile, recursive::Bool=false) = rm(file.client, file.path, recursive) function rm(client::HDFSClient, path::AbstractString, recursive::Bool=false) path = abspath(client, path) - inp = protobuild(DeleteRequestProto, Dict(:src => path, - :recursive => recursive)) + inp = DeleteRequestProto(src=path, recursive=recursive) resp = delete(client.nn_conn, inp) resp.result @@ -366,16 +356,10 @@ function _create_file(client::HDFSClient, path::AbstractString, overwrite::Bool= (replication == 0) && (replication = defaults.replication) end - perms = protobuild(FsPermissionProto, Dict(:perm => mode)) + perms = FsPermissionProto(perm=mode) createFlag = UInt32(overwrite ? CreateFlagProto.OVERWRITE : CreateFlagProto.CREATE) - inp = protobuild(CreateRequestProto, Dict(:src => path, - :masked => perms, - :clientName => ELLY_CLIENTNAME, - :createFlag => createFlag, - :createParent => false, - :replication => replication, - :blockSize => blocksz)) + inp = CreateRequestProto(src=path, masked=perms, clientName=ELLY_CLIENTNAME, createFlag=createFlag, createParent=false, replication=replication, blockSize=blocksz) resp = create(client.nn_conn, inp) isfilled(resp, :fs) || (return nothing) @@ -390,8 +374,7 @@ end function _complete_file(client::HDFSClient, path::AbstractString, last::Union{Nothing,ExtendedBlockProto}=nothing) path = abspath(client, path) - endinp = protobuild(CompleteRequestProto, Dict(:src => path, - :clientName => ELLY_CLIENTNAME)) + endinp = CompleteRequestProto(src=path, clientName=ELLY_CLIENTNAME) if last !== nothing setproperty!(endinp, :last, last) @debug("setting last block as", last) @@ -409,8 +392,7 @@ end function _add_block(::Type{T}, client::HDFSClient, path::AbstractString, previous::Union{Nothing,T}=nothing) where T<:ExtendedBlockProto path = abspath(client, path) - inp = protobuild(AddBlockRequestProto, Dict(:src => path, - :clientName => ELLY_CLIENTNAME)) + inp = AddBlockRequestProto(src=path, clientName=ELLY_CLIENTNAME) (previous === nothing) || setproperty!(inp, :previous, previous) resp = addBlock(client.nn_conn, inp) @@ -423,7 +405,7 @@ the namenode from assuming the client from having abandoned the file or some oth from recovering the lease. """ function hdfs_renewlease(client::HDFSClient) - inp = protobuild(RenewLeaseRequestProto, Dict(:clientName => ELLY_CLIENTNAME)) + inp = RenewLeaseRequestProto(clientName=ELLY_CLIENTNAME) renewLease(client.nn_conn, inp) nothing end @@ -433,9 +415,7 @@ function touch(client::HDFSClient, path::AbstractString, replication::UInt32=zer if exists(client, path) path = abspath(client, path) t = UInt64(datetime2unix(now(UTC))*1000) - inp = protobuild(SetTimesRequestProto, Dict(:src => path, - :mtime => t, - :atime => t)) + inp = SetTimesRequestProto(src=path, mtime=t, atime=t) setTimes(client.nn_conn, inp) else diff --git a/src/api_yarn_appmaster.jl b/src/api_yarn_appmaster.jl index 5ccd247..5a3ed8c 100644 --- a/src/api_yarn_appmaster.jl +++ b/src/api_yarn_appmaster.jl @@ -166,7 +166,7 @@ function can_schedule(yam::YarnAppMaster, restype::Int32) end function _unregister(yam::YarnAppMaster, finalstatus::Int32, diagnostics::AbstractString) - inp = protobuild(FinishApplicationMasterRequestProto, Dict(:final_application_status => finalstatus)) + inp = FinishApplicationMasterRequestProto(final_application_status=finalstatus) !isempty(yam.tracking_url) && setproperty!(inp, :tracking_url, yam.tracking_url) !isempty(diagnostics) && setproperty!(inp, :diagnostics, diagnostics) @@ -187,8 +187,8 @@ container_release(yam::YarnAppMaster, cids::ContainerIdProto...) = request_relea container_start(yam::YarnAppMaster, cid::ContainerIdProto, container_spec::ContainerLaunchContextProto) = container_start(yam, yam.containers.containers[cid], container_spec) function container_start(yam::YarnAppMaster, container::ContainerProto, container_spec::ContainerLaunchContextProto) @debug("starting", container) - req = protobuild(StartContainerRequestProto, Dict(:container_launch_context => container_spec, :container_token => container.container_token)) - inp = protobuild(StartContainersRequestProto, Dict(:start_container_request => [req])) + req = StartContainerRequestProto(container_launch_context=container_spec, container_token=container.container_token) + inp = StartContainersRequestProto(start_container_request=[req]) nodeid = container.nodeId nm_conn = get_connection(yam.nodes, nodeid) @@ -214,7 +214,7 @@ container_stop(yam::YarnAppMaster, cid::ContainerIdProto) = container_stop(yam, function container_stop(yam::YarnAppMaster, container::ContainerProto) @debug("stopping", container) - inp = protobuild(StopContainersRequestProto, Dict(:container_id => [container.id])) + inp = StopContainersRequestProto(container_id=[container.id]) nodeid = container.nodeId nm_conn = get_connection(yam.nodes, nodeid) success = false diff --git a/src/api_yarn_base.jl b/src/api_yarn_base.jl index fd2f9e6..960ffc8 100644 --- a/src/api_yarn_base.jl +++ b/src/api_yarn_base.jl @@ -340,12 +340,9 @@ end function request_alloc(containers::YarnContainers, numcontainers::Int; mem::Integer=YARN_CONTAINER_MEM_DEFAULT, cpu::Integer=YARN_CONTAINER_CPU_DEFAULT, loc::AbstractString=YARN_CONTAINER_LOCATION_DEFAULT, priority::Integer=YARN_CONTAINER_PRIORITY_DEFAULT) - prio = protobuild(PriorityProto, Dict(:priority => priority)) - capability = protobuild(ResourceProto, Dict(:memory => mem, :virtual_cores => cpu)) - req = protobuild(ResourceRequestProto, Dict(:priority => prio, - :resource_name => loc, - :num_containers => numcontainers, - :capability => capability)) + prio = PriorityProto(priority=priority) + capability = ResourceProto(memory=mem, virtual_cores=cpu) + req = ResourceRequestProto(priority=prio, resource_name=loc, num_containers=numcontainers, capability=capability) pending(containers.alloc_pipeline, req) containers.ndesired += numcontainers nothing diff --git a/src/api_yarn_client.jl b/src/api_yarn_client.jl index df6942c..640a1a3 100644 --- a/src/api_yarn_client.jl +++ b/src/api_yarn_client.jl @@ -194,20 +194,20 @@ function submit(client::YarnClient, container_spec::ContainerLaunchContextProto, appid, maxmem, maxcores = _new_app(client) - prio = protobuild(PriorityProto, Dict(:priority => priority)) - res = protobuild(ResourceProto, Dict(:memory => mem, :virtual_cores => cores)) - asc = protobuild(ApplicationSubmissionContextProto, Dict(:application_id => appid, - :application_name => appname, - :queue => queue, - :priority => prio, - :unmanaged_am => unmanaged, - :am_container_spec => container_spec, - :resource => res, - :maxAppAttempts => 2, - :applicationType => apptype, - :keep_containers_across_application_attempts => reuse)) + prio = PriorityProto(priority=priority) + res = ResourceProto(memory=mem, virtual_cores=cores) + asc = ApplicationSubmissionContextProto(application_id = appid, + application_name = appname, + queue = queue, + priority = prio, + unmanaged_am = unmanaged, + am_container_spec = container_spec, + resource = res, + maxAppAttempts = 2, + applicationType = apptype, + keep_containers_across_application_attempts = reuse) - inp = protobuild(SubmitApplicationRequestProto, Dict(:application_submission_context => asc)) + inp = SubmitApplicationRequestProto(application_submission_context=asc) submitApplication(client.rm_conn, inp) YarnApp(client, appid) @@ -215,7 +215,7 @@ end function kill(app::YarnApp) client = app.client - inp = protobuild(KillApplicationRequestProto, Dict(:application_id => app.appid)) + inp = KillApplicationRequestProto(application_id=app.appid) resp = forceKillApplication(client.rm_conn, inp) resp.is_kill_completed @@ -224,7 +224,7 @@ end function status(app::YarnApp, refresh::Bool=true) if refresh || (app.status === nothing) client = app.client - inp = protobuild(GetApplicationReportRequestProto, Dict(:application_id => app.appid)) + inp = GetApplicationReportRequestProto(application_id=app.appid) resp = getApplicationReport(client.rm_conn, inp) app.status = isfilled(resp.application_report) ? YarnAppStatus(resp.application_report) : nothing @@ -258,7 +258,7 @@ end function attempts(app::YarnApp, refresh::Bool=true) if refresh || isempty(app.attempts) client = app.client - inp = protobuild(GetApplicationAttemptsRequestProto, Dict(:application_id => app.appid)) + inp = GetApplicationAttemptsRequestProto(application_id=app.appid) resp = getApplicationAttempts(client.rm_conn, inp) atmptlist = app.attempts diff --git a/src/rpc.jl b/src/rpc.jl index b2a6e00..6016cfa 100644 --- a/src/rpc.jl +++ b/src/rpc.jl @@ -152,17 +152,17 @@ end function buffer_connctx(channel::HadoopRpcChannel) protocol = channel.protocol_attribs[:id] - connctx = protobuild(IpcConnectionContextProto, Dict(:userInfo => channel.ugi.userinfo, :protocol => protocol)) + connctx = IpcConnectionContextProto(userInfo=channel.ugi.userinfo, protocol=protocol) buffer_size_delimited(channel.iob, connctx) end function buffer_reqhdr(channel::HadoopRpcChannel, call_id::Int32) - hdr = protobuild(RpcRequestHeaderProto, Dict(:rpcKind => HRPC_PROTOBUFF_TYPE, - :rpcOp => HRPC_FINAL_PACKET, - :callId => call_id, - #:retryCount => -1, - :clientId => convert(Vector{UInt8}, codeunits(channel.clnt_id)))) + hdr = RpcRequestHeaderProto(rpcKind = HRPC_PROTOBUFF_TYPE, + rpcOp = HRPC_FINAL_PACKET, + callId = call_id, + #retryCount = -1, + clientId = convert(Vector{UInt8}, codeunits(channel.clnt_id))) buffer_size_delimited(channel.iob, hdr) end @@ -173,9 +173,9 @@ buffer_conctx_reqhdr(channel::HadoopRpcChannel) = (channel.sent_call_id = channe function buffer_method_reqhdr(channel::HadoopRpcChannel, method::MethodDescriptor) protocol = channel.protocol_attribs[:id] protocol_ver = channel.protocol_attribs[:ver] - hdr = protobuild(RequestHeaderProto, Dict(:methodName => method.name, - :declaringClassProtocolName => protocol, - :clientProtocolVersion => protocol_ver)) + hdr = RequestHeaderProto(methodName = method.name, + declaringClassProtocolName = protocol, + clientProtocolVersion = protocol_ver) buffer_size_delimited(channel.iob, hdr) end @@ -480,9 +480,9 @@ function buffer_readblock(reader::HDFSBlockReader) setproperty!(exblock, fld, get_field(block.b, fld)) end - basehdr = protobuild(BaseHeaderProto, Dict(:block => exblock, :token => token)) - hdr = protobuild(ClientOperationHeaderProto, Dict(:baseHeader => basehdr, :clientName => ELLY_CLIENTNAME)) - readblock = protobuild(OpReadBlockProto, Dict(:offset => offset, :len => len, :header => hdr)) + basehdr = BaseHeaderProto(block=exblock, token=token) + hdr = ClientOperationHeaderProto(baseHeader=basehdr, clientName=ELLY_CLIENTNAME) + readblock = OpReadBlockProto(offset=offset, len=len, header=hdr) @debug("sending block read message", offset, len) buffer_size_delimited(channel.iob, readblock) @@ -490,7 +490,7 @@ end function buffer_client_read_status(reader::HDFSBlockReader, status::Int32) channel = reader.channel - read_status = protobuild(ClientReadStatusProto, Dict(:status => status)) + read_status = ClientReadStatusProto(status=status) buffer_size_delimited(channel.iob, read_status) end @@ -972,24 +972,24 @@ function buffer_writeblock(writer::HDFSBlockWriter) end setproperty!(exblock, :numBytes, zero(UInt64)) - basehdr = protobuild(BaseHeaderProto, Dict(:block => exblock, :token => token)) - hdr = protobuild(ClientOperationHeaderProto, Dict(:baseHeader => basehdr, :clientName => ELLY_CLIENTNAME)) - chksum = protobuild(ChecksumProto, Dict(:_type => defaults.checksumType, :bytesPerChecksum => defaults.bytesPerChecksum)) + basehdr = BaseHeaderProto(block=exblock, token=token) + hdr = ClientOperationHeaderProto(baseHeader=basehdr, clientName=ELLY_CLIENTNAME) + chksum = ChecksumProto(_type=defaults.checksumType, bytesPerChecksum=defaults.bytesPerChecksum) targets = DatanodeInfoProto[] for loc in block.locs (loc.id != writer.source_node.id) && push!(targets, loc) end - writeblock = protobuild(OpWriteBlockProto, Dict(:header => hdr, - :targets => targets, - :source => writer.source_node, - :stage => OpWriteBlockProto_BlockConstructionStage.PIPELINE_SETUP_CREATE, - :pipelineSize => length(block.locs), - :minBytesRcvd => 0, - :maxBytesRcvd => 0, - :latestGenerationStamp => exblock.generationStamp, - :requestedChecksum => chksum)) + writeblock = OpWriteBlockProto(header = hdr, + targets = targets, + source = writer.source_node, + stage = OpWriteBlockProto_BlockConstructionStage.PIPELINE_SETUP_CREATE, + pipelineSize = length(block.locs), + minBytesRcvd = 0, + maxBytesRcvd = 0, + latestGenerationStamp = exblock.generationStamp, + requestedChecksum = chksum) @debug("sending block write message", offset=block.offset) buffer_size_delimited(channel.iob, writeblock) @@ -1051,10 +1051,10 @@ function prepare_packet(writer::HDFSBlockWriter) seq_no = Int64(writer.pkt_seq += 1) #@debug("packet seqno $seq_no with $(bytes_in_packet)/$(defaults.writePacketSize) bytes is last packet: $last_pkt") - pkt_hdr = protobuild(PacketHeaderProto, Dict(:offsetInBlock => writer.total_written, - :seqno => seq_no, - :lastPacketInBlock => last_pkt, - :dataLen => bytes_in_packet)) + pkt_hdr = PacketHeaderProto(offsetInBlock = writer.total_written, + seqno = seq_no, + lastPacketInBlock = last_pkt, + dataLen = bytes_in_packet) writer.total_written += bytes_in_packet diff --git a/src/sasl.jl b/src/sasl.jl index 158d394..6e5da44 100644 --- a/src/sasl.jl +++ b/src/sasl.jl @@ -80,7 +80,7 @@ buffer_sasl_reqhdr(channel::HadoopRpcChannel) = (channel.sent_call_id = channel. function buffer_sasl_message(channel::HadoopRpcChannel, state::Int32, auths::Vector{RpcSaslProto_SaslAuth}=RpcSaslProto_SaslAuth[], token::Vector{UInt8}=UInt8[]) @debug("buffer SASL message", state, nauths=length(auths), token=!isempty(token)) - saslmsg = protobuild(RpcSaslProto, Dict(:version => 0, :state => state)) + saslmsg = RpcSaslProto(version=0, state=state) isempty(auths) || setproperty!(saslmsg, :auths, auths) isempty(token) || setproperty!(saslmsg, :token, token) buffer_size_delimited(channel.iob, saslmsg) @@ -150,10 +150,10 @@ function sasl_auth(channel::HadoopRpcChannel, token::TokenProto) @debug("response", response) # send response as a sasl initiate request - respauth = protobuild(RpcSaslProto_SaslAuth, Dict(:method => auth.method, - :mechanism => auth.mechanism, - :protocol => auth.protocol, - :serverId => auth.serverId)) + respauth = RpcSaslProto_SaslAuth(method = auth.method, + mechanism = auth.mechanism, + protocol = auth.protocol, + serverId = auth.serverId) begin_send(channel) buffer_sasl_reqhdr(channel) diff --git a/src/ugi.jl b/src/ugi.jl index 19cce59..19581fb 100644 --- a/src/ugi.jl +++ b/src/ugi.jl @@ -7,7 +7,7 @@ mutable struct UserGroupInformation credentials::Credentials function UserGroupInformation(username::AbstractString=default_username(); proxy::Bool=false, proxyuser::AbstractString=username) - userinfo = proxy ? protobuild(UserInformationProto, Dict(:realUser => username, :effectiveUser => proxyuser)) : protobuild(UserInformationProto, Dict(:realUser => username)) + userinfo = proxy ? UserInformationProto(realUser=username, effectiveUser=proxyuser) : UserInformationProto(realUser=username) ugi = new(userinfo, Credentials()) haskey(ENV, "HADOOP_TOKEN_FILE_LOCATION") && read_credentials!(ENV["HADOOP_TOKEN_FILE_LOCATION"]; credentials=ugi.credentials) ugi