Implement UDP #5697

Merged
merged 2 commits into from Feb 12, 2014

Projects

None yet

8 participants

@Keno
Member
Keno commented Feb 6, 2014

@StefanKarpinski I figured it'd be faster if I did it myself. This needs some more tests and documentation (if you wouldn't mind taking care of that), but other than that this is good to go.

@vtjnash vtjnash commented on the diff Feb 6, 2014
base/exports.jl
@@ -1102,6 +1105,7 @@ export
write,
writecsv,
writedlm,
+ UdpSocket,
@vtjnash
vtjnash Feb 6, 2014 Member

This looks strange. UDP is an acronym so shouldn't it be all caps UDPSocket. (also, why is it TcpServer, not TCPServer, etc?)

@johnmyleswhite
johnmyleswhite Feb 6, 2014 Member

+1 for more consistent use of caps

@Keno
Keno Feb 7, 2014 Member

Not sure, I did it for consistency with TCP, I'd vote for leaving this as is for this PR and then if we decide to rename, we can do that in one commit for both.

@vtjnash vtjnash commented on the diff Feb 6, 2014
base/socket.jl
@@ -300,19 +300,6 @@ function TcpServer()
this
end
-#type UdpSocket <: Socket
@vtjnash
vtjnash Feb 6, 2014 Member

this presumably should be deleted, since you moved it below

@Keno
Keno Feb 7, 2014 Member

Isn't that exactly what's happening? I'm confused.

@vtjnash
vtjnash Feb 7, 2014 Member

oh, right. i was looking on my phone and got confused. carry on

@vtjnash vtjnash and 1 other commented on an outdated diff Feb 6, 2014
base/socket.jl
+ if enable_broadcast !== nothing
+ uv_error("enable_broadcast",ccall(:uv_udp_set_broadcast,Cint,(Ptr{Void},Cint),sock.handle,enable_broadcast))
+ end
+ if ttl !== nothing
+ uv_error("ttl",ccall(:uv_udp_set_ttl,Cint,(Ptr{Void},Cint),sock.handle,ttl))
+ end
+end
+
+_uv_hook_alloc_buf(sock::UdpSocket,size::Int32) = (c_malloc(size),size)
+
+_recv_start(sock::UdpSocket) = uv_error("recv_start",ccall(:uv_udp_recv_start,Cint,(Ptr{Void},Ptr{Void},Ptr{Void}),
+ sock.handle,cglobal(:jl_uv_alloc_buf),cglobal(:jl_uv_recvcb)))
+_recv_stop(sock::UdpSocket) = uv_error("recv_stop",ccall(:uv_udp_recv_stop,Cint,(Ptr{Void},),sock.handle))
+
+function recv(sock::UdpSocket)
+ # If the socket has not been bound, it will be bound implicityly to ::0 and a random port
@vtjnash
vtjnash Feb 6, 2014 Member

implicitly

@Keno
Keno Feb 7, 2014 Member

It was a long day ;).

@vtjnash vtjnash commented on the diff Feb 6, 2014
base/socket.jl
+ sock.handle,cglobal(:jl_uv_alloc_buf),cglobal(:jl_uv_recvcb)))
+_recv_stop(sock::UdpSocket) = uv_error("recv_stop",ccall(:uv_udp_recv_stop,Cint,(Ptr{Void},),sock.handle))
+
+function recv(sock::UdpSocket)
+ # If the socket has not been bound, it will be bound implicityly to ::0 and a random port
+ if sock.status != StatusInit && sock.status != StatusOpen
+ error("Invalid socket state")
+ end
+ _recv_start(sock)
+ wait(sock.recvnotify)::Vector{Uint8}
+end
+
+function _uv_hook_recv(sock::UdpSocket, nread::Ptr{Void}, buf_addr::Ptr{Void}, buf_size::Int32, addr::Ptr{Void}, flags::Int32)
+ nread = convert(Cssize_t,nread)
+ if flags & UV_UDP_PARTIAL > 0
+ # TODO: Decide what to do in this case. For now throw an error
@vtjnash
vtjnash Feb 6, 2014 Member

i wonder if we should first notify the partial message?

@Keno
Keno Feb 7, 2014 Member

I'm not sure what the correct behavior is. On the one hand it's desirable to give partial messages to the user, on the other hand, the message boundary is actually quite meaningful in UDP, so if we don't make it clear that the message is incomplete, we could get bad behavior later.

@vtjnash
vtjnash Feb 7, 2014 Member

oh, i thought udp messages could get split/joined/reordered at random. what's nodejs do?

@Keno
Keno Feb 7, 2014 Member

As far as I am aware, UDP, though unreliable (i.e. it might drop or reorder them) will never split a single message. I'll have a look at what node does.

@Keno
Keno Feb 7, 2014 Member

As far as I can tell, node silently truncates (I really don't like that behavior). So does, rust, but there's an issue to change that: rust-lang/rust#9238

@vtjnash
vtjnash Feb 7, 2014 Member

i almost want a flag allow_partial, but it somehow still indicated as an exceptional condition. Perhaps we should make an error type IncompleteReadError(msg, data)?

@Keno
Keno Feb 7, 2014 Member

Hmm, that might work. I'm not sure though. @JeffBezanson @StefanKarpinski any ideas?

@StefanKarpinski
StefanKarpinski Feb 7, 2014 Member

I'm inclined to not expose the ability to read partial UPD packets. There's really not much point. UDP is typically used for fairly small chunks of data and even if it isn't, what are you really going to do with part of a UDP packet? If it ever comes up as something someone actually wants, we can provide a way to do it, but I'm skeptical that will ever happen.

@Keno
Keno Feb 7, 2014 Member

So you're saying silently drop the package? I'm fine with that, but it might lead to subtle bugs. How about it throws an error by default, but with a flag to ignore partial packages. That way people have to explicitly opt in and won't wonder why they don't get any packages if they accidentally set the UDP package size to large or something.

@StefanKarpinski
StefanKarpinski Feb 7, 2014 Member

That's not what I meant – perhaps I was misunderstanding the situation. If this happens doesn't it mean that more of the packet will eventually arrive? Or does this mean that the UDP layer got this much data but somehow determined that more isn't coming? How, with a timer?

@Keno
Keno Feb 7, 2014 Member

No, if a package is too large for the kernel to handle, it will just drop anything past the end of the buffer and indicate it with the flag.

@StefanKarpinski
StefanKarpinski Feb 7, 2014 Member

Oh, interesting. Is the buffer size controllable?

@Keno
Keno Feb 7, 2014 Member

I'm really not an expert on this, but I suspect the network card will read it directly into the userspace buffer, so it'll be whatever size we return from _uv_hook_alloc_buf(sock::UdpSocket,size::Int32) above.

@StefanKarpinski
StefanKarpinski Feb 7, 2014 Member

Ok, I guess for now throwing an error is probably the right thing to do. That way at least if this happens, someone will know about it. Silent I/O failures are impossible to track down, so that's no good.

@vtjnash vtjnash commented on an outdated diff Feb 6, 2014
base/socket.jl
+ notify_error(sock.recvnotify,"Partial message received")
+ end
+ buf = pointer_to_array(convert(Ptr{Uint8},buf_addr),int(buf_size),true)
+ notify(sock.recvnotify,buf[1:nread])
+end
+
+function _send(sock::UdpSocket,ipaddr::IPv4,port::Uint16,buf)
+ ccall(:jl_udp_send,Cint,(Ptr{Void},Uint16,Uint32,Ptr{Uint8},Csize_t),sock.handle,hton(port),hton(ipaddr.host),buf,sizeof(buf))
+end
+
+function _send(sock::UdpSocket,ipaddr::IPv6,port::Uint16,buf)
+ ccall(:jl_udp_send6,Cint,(Ptr{Void},Uint16,Ptr{Uint128},Ptr{Uint8},Csize_t),sock.handle,hton(port),&hton(ipaddr.host),buf,sizeof(buf))
+end
+
+function send(sock::UdpSocket,ipaddr,port,msg)
+ # If the socket has not been bound, it will be bound implicityly to ::0 and a random port
@vtjnash
vtjnash Feb 6, 2014 Member

implicitly

@staticfloat
Member

+1 for TCPxxx and UDPxxx, although that will be a breaking change.

@vtjnash vtjnash commented on the diff Feb 7, 2014
base/socket.jl
+ # If the socket has not been bound, it will be bound implicitly to ::0 and a random port
+ if sock.status != StatusInit && sock.status != StatusOpen
+ error("Invalid socket state")
+ end
+ _recv_start(sock)
+ wait(sock.recvnotify)::Vector{Uint8}
+end
+
+function _uv_hook_recv(sock::UdpSocket, nread::Ptr{Void}, buf_addr::Ptr{Void}, buf_size::Int32, addr::Ptr{Void}, flags::Int32)
+ nread = convert(Cssize_t,nread)
+ if flags & UV_UDP_PARTIAL > 0
+ # TODO: Decide what to do in this case. For now throw an error
+ c_free(buf_addr)
+ notify_error(sock.recvnotify,"Partial message received")
+ end
+ buf = pointer_to_array(convert(Ptr{Uint8},buf_addr),int(buf_size),true)
@vtjnash
vtjnash Feb 7, 2014 Member

I think you can use int(nread) here and avoid the extra copy

@Keno
Keno Feb 7, 2014 Member

Yes, I suppose that's a good idea.

@G3UKB
G3UKB commented Feb 7, 2014

As I started this I guess I should give some input. I'm no expert, just a user, but having used this protocol for the same application in a few different languages (last one was go) here is what I would need. I've added the relevant code in go syntax.

  1. Listen on all adapters.
    udpAddr, err := net.ResolveUDPAddr("up4", ":0")
    conn, err = net.ListenUDP("udp", udpAddr)
  2. Set buffers. This is essential to stop overflow (real-time data) and be able to buffer a number of messages. Messages are 1032 bytes and I want to block on writing.
    conn.SetReadBuffer(103200)
    conn.SetWriteBuffer(1032)
  3. I then need to send a broadcast message.
    dst, err := net.ResolveUDPAddr("udp", "255.255.255.255:1024")
    conn.WriteTo(msg, dst)
  4. Finally listen for broadcast responses with a timeout.
    set_timeout(conn, 5e9)
    n, peer, err := conn.ReadFrom(resp)
  5. I'm then into straight forward data exchanges reading, processing and outputting 1032 byte messages.

I would just add that I believe its normal to do ones own management so I would not for example expect an error to be thrown if a packet is dropped nor get partial packets. I have sequence numbers in all packets so I detect dropped and out of sequence packets. I don't know if this is standard but it seems logical to me on an unreliable connection.

--bob

@StefanKarpinski
Member

That is informative. I would still like to try raising an error by default and seeing how that goes. I suspect this doesn't happen much in practice.

@Keno
Member
Keno commented Feb 8, 2014

Alright, let's leave it like this then.

@bass3m
bass3m commented Feb 11, 2014

Any updates on when this will get merged ? For what it's worth, i've been playing around with @loladiro's patch and it works great.

@Keno
Member
Keno commented Feb 11, 2014

As soon as somebody writes documentation and a few more tests. I'll get to it eventually, but am quite busy at the moment. Feel free to tackle that though.

@bass3m
bass3m commented Feb 11, 2014

@loladiro sounds good, i'll see what i can do for some tests and documentation.

@StefanKarpinski
Member

Shall we merge it and open an issue for documentation and testing? Otherwise someone would have to make a pull request against your pull request...

@StefanKarpinski StefanKarpinski merged commit 6585e3d into master Feb 12, 2014

1 check passed

default The Travis CI build passed
Details
@StefanKarpinski StefanKarpinski deleted the kf/udp branch Feb 12, 2014
@Keno
Member
Keno commented Feb 12, 2014

Shall we merge it and open an issue for documentation and testing?

Why do I have a feeling that questionmark was rhetorical? ;)

@StefanKarpinski
Member

Yeah, I realized right after posting that I should just do it instead of suggesting it and waiting for someone else to do it.

@Keno
Member
Keno commented Feb 12, 2014

That's how things get done.

@vtjnash
Member
vtjnash commented Feb 12, 2014

I would do that with #3796, but somehow I feel that's how things get un-done. Unless someone else was to merge it, like Stefan did here...

@StefanKarpinski
Member

That's a little different – #3796 might break everything. This is quite safe as long as you don't use the new functionality and there's at least one report that the new functionality works nicely.

@StefanKarpinski
Member

But I am somewhat inclined at this point to just merge it and see what happens.

@bass3m
bass3m commented Feb 12, 2014

Thanks for merging it, this will make things easier.

@danluu
Contributor
danluu commented Sep 15, 2014

Looks like I'm late to the party here, but I wanted to echo @bob-c's comment above. I'm not a networking expert, but I've spent some time with UDP and implementing protocols on top of UDP, and I find it pretty surprising that the UDP API throws an exception on a partial packet instead of just dropping it.

Unless you have what is, by non-HPC standards, a fairly exotic setup, packets are expected to get dropped all the time, and throwing an exception on some drops doesn't seem particularly helpful. And if if you have a lossless fabric with a setup that isn't expected to drop anything, it would be pretty unorthodox to use libuv or something that wraps libuv.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment