diff --git a/Changes b/Changes index 23bea1a..2952bda 100644 --- a/Changes +++ b/Changes @@ -2,6 +2,7 @@ Revision history for AnyEvent-Handle-UDP {{$NEXT}} Make push_send accept an address array too + Added support for timeouts Make sure buffers is predefined 0.033 2012-02-19 17:35:10 Europe/Amsterdam diff --git a/lib/AnyEvent/Handle/UDP.pm b/lib/AnyEvent/Handle/UDP.pm index 56fa812..90c4b60 100644 --- a/lib/AnyEvent/Handle/UDP.pm +++ b/lib/AnyEvent/Handle/UDP.pm @@ -10,9 +10,10 @@ use AnyEvent::Socket qw/parse_address/; use Carp qw/croak/; use Const::Fast qw/const/; -use Errno qw/EAGAIN EWOULDBLOCK EINTR/; -use Scalar::Util qw/reftype looks_like_number/; +use Errno qw/EAGAIN EWOULDBLOCK EINTR ETIMEDOUT/; +use Scalar::Util qw/reftype looks_like_number weaken/; use Socket qw/SOL_SOCKET SO_REUSEADDR SOCK_DGRAM INADDR_ANY/; +use Sub::Name; use Symbol qw/gensym/; use namespace::clean; @@ -96,6 +97,78 @@ has autoflush => ( default => sub { 0 }, ); +for my $dir ('', 'r', 'w') { + my $timeout = "${dir}timeout"; + my $clear_timeout = "clear_$timeout"; + my $has_timeout = "has_$timeout"; + my $activity = "_${dir}activity"; + my $on_timeout = "on_$timeout"; + my $timer = "_${dir}timer"; + my $clear_timer = "_clear$timer"; + my $timeout_reset = "${timeout}_reset"; + + has $timer => ( + is => 'rw', + clearer => $clear_timer, + ); + + my $callback; + $callback = sub { + my $self = shift; + if (not $self->$has_timeout or not $self->fh) { + $self->$clear_timer; + return; + } + my $now = AE::now; + my $after = $self->$activity + $self->$timeout - $now; + if ($after <= 0) { + $self->$activity($now); + my $time = $self->$on_timeout; + $time ? $time->($self) : $self->_error->(Errno::ETIMEDOUT, 0); + return if not $self->$has_timeout; + } + weaken $self; + return if not $self; + $self->$timer(AE::timer($after, 0, sub { + $self->$clear_timer; + $callback->($self); + })); + }; + + has $timeout => ( + is => 'rw', + isa => sub { + return $_[0] >= 0; + }, + predicate => $has_timeout, + clearer => $clear_timeout, + trigger => sub { + my ($self, $value) = @_; + if ($value == 0) { + $self->$clear_timer; + $self->$clear_timeout; + return; + } + else { + $callback->($self); + } + }, + ); + has $activity => ( + is => 'rw', + default => sub { AE::now }, + ); + + has $on_timeout => ( + is => 'rw', + isa => sub { ref($_[0]) eq 'CODE' }, + ); + no strict 'refs'; + *{$timeout_reset} = subname $timeout_reset, sub { + $activity = AE::now; + }; +} + sub bind_to { my ($self, $addr) = @_; if (ref $addr) { @@ -147,7 +220,9 @@ sub _on_addr { $self->{reader} = AE::io $self->fh, 0, sub { while (defined (my $addr = recv $self->fh, my ($buffer), $self->{receive_size}, 0)) { - $self->{on_recv}($buffer, $self, $addr); + $self->timeout_reset; + $self->rtimeout_reset; + $self->on_recv->($buffer, $self, $addr); } $self->_error(1, "Couldn't recv: $!") if $! != EAGAIN and $! != EWOULDBLOCK; return; @@ -193,7 +268,11 @@ sub _send { my ($self, $message, $to, $cv) = @_; my $ret = defined $to ? send $self->{fh}, $message, 0, $to : send $self->{fh}, $message, 0; $self->_error(1, "$!") if not defined $ret and !$non_fatal{$! + 0}; - $cv->($ret) if defined $cv and defined $ret; + if (defined $cv and defined $ret) { + $self->timeout_reset; + $self->wtimeout_reset; + $cv->($ret); + } return $ret; } @@ -207,7 +286,7 @@ sub _push_writer { my $ret = $self->_send($msg, $to, $cv); if (not defined $ret) { unshift @{$self->{buffers}}, $entry; - $self->on_error->($self->{fh}, 1, "$!") if !$non_fatal{$! + 0}; + $self->_error->(1, "$!") if !$non_fatal{$! + 0}; last; } } @@ -282,6 +361,28 @@ Sets the socket family. The default is C<0>, which means either IPv4 or IPv6. Th The underlying filehandle. +=attr timeout + +=attr rtimeout + +=attr wtimeout + +If non-zero, then these enables an "inactivity" timeout: whenever this many seconds pass without a successful read or write on the underlying file handle (or a call to C), the on_timeout callback will be invoked (and if that one is missing, a non-fatal ETIMEDOUT error will be raised). + +There are three variants of the timeouts that work independently of each other, for both read and write (triggered when nothing was read OR written), just read (triggered when nothing was read), and just write: timeout, rtimeout and wtimeout, with corresponding callbacks on_timeout, on_rtimeout and on_wtimeout, and reset functions timeout_reset, rtimeout_reset, and wtimeout_reset. + +Note that timeout processing is active even when you do not have any outstanding read or write requests: If you plan to keep the connection idle then you should disable the timeout temporarily or ignore the timeout in the corresponding on_timeout callback, in which case AnyEvent::Handle will simply restart the timeout. + +Calling C (or setting it to zero, which does the same) disables the corresponding timeout. + +=attr on_timeout + +=attr on_rtimeout + +=attr on_wtimeout + +The callback that's called whenever the inactivity timeout passes. If you return from this callback, then the timeout will be reset as if some activity had happened, so this condition is not fatal in any way. + =method bind_to($address) Bind to the specified addres. Note that a bound socket may be rebound to another address. C<$address> must be in the same form as the bind argument to new. @@ -294,6 +395,14 @@ Connect to the specified address. Note that a connected socket may be reconnecte Try to send a message. If a socket is not connected a receptient address must also be given. If it is connected giving a receptient may not work as expected, depending on your platform. It returns C<$cv>, which will become true when C<$message> is sent. +=method timeout_reset + +=method rtimeout_reset + +=method wtimeout_reset + +Reset the activity timeout, as if data was received or sent. + =method destroy Destroy the handle. diff --git a/t/20-timeout.t b/t/20-timeout.t new file mode 100644 index 0000000..76ce092 --- /dev/null +++ b/t/20-timeout.t @@ -0,0 +1,29 @@ +#! perl + +use strict; +use warnings FATAL => 'all'; +use Test::More 0.89; +use Test::Exception; + +use AnyEvent::Handle::UDP; +use IO::Socket::INET; + +alarm 12; + +{ + my $cb = AE::cv; + my $cb2 = AE::cv; + my $server = AnyEvent::Handle::UDP->new( + on_recv => $cb, + timeout => 3, on_timeout => sub { $cb->croak("Timeout") }, + rtimeout => 4.5, on_rtimeout => sub { $cb2->croak("Read Timeout") } + ); + my $client = IO::Socket::INET->new(PeerHost => 'localhost', PeerPort => 1382, Proto => 'udp'); + my $start_time = AE::now; + throws_ok { $cb->recv } qr/Timeout/, 'Receive throws a timeout'; + cmp_ok AE::now, '>=', $start_time + 3, 'Three seconds have passed'; + throws_ok { $cb2->recv } qr/Read Timeout/, 'Receive throws a timeout again'; + cmp_ok AE::now, '>=', $start_time + 4.5, '1.5 more seconds have passed'; +} + +done_testing;