Skip to content

Commit

Permalink
Added timeout support
Browse files Browse the repository at this point in the history
  • Loading branch information
Leont committed Apr 30, 2012
1 parent acd96fd commit b298c1f
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 5 deletions.
1 change: 1 addition & 0 deletions Changes
Expand Up @@ -2,6 +2,7 @@ Revision history for AnyEvent-Handle-UDP

{{$NEXT}}
Make push_send accept an address array too
Added support for timeouts
Make sure buffers is predefined

0.033 2012-02-19 17:35:10 Europe/Amsterdam
Expand Down
119 changes: 114 additions & 5 deletions lib/AnyEvent/Handle/UDP.pm
Expand Up @@ -10,9 +10,10 @@ use AnyEvent::Socket qw/parse_address/;

use Carp qw/croak/;
use Const::Fast qw/const/;
use Errno qw/EAGAIN EWOULDBLOCK EINTR/;
use Scalar::Util qw/reftype looks_like_number/;
use Errno qw/EAGAIN EWOULDBLOCK EINTR ETIMEDOUT/;
use Scalar::Util qw/reftype looks_like_number weaken/;
use Socket qw/SOL_SOCKET SO_REUSEADDR SOCK_DGRAM INADDR_ANY/;
use Sub::Name;
use Symbol qw/gensym/;

use namespace::clean;
Expand Down Expand Up @@ -96,6 +97,78 @@ has autoflush => (
default => sub { 0 },
);

for my $dir ('', 'r', 'w') {
my $timeout = "${dir}timeout";
my $clear_timeout = "clear_$timeout";
my $has_timeout = "has_$timeout";
my $activity = "_${dir}activity";
my $on_timeout = "on_$timeout";
my $timer = "_${dir}timer";
my $clear_timer = "_clear$timer";
my $timeout_reset = "${timeout}_reset";

has $timer => (
is => 'rw',
clearer => $clear_timer,
);

my $callback;
$callback = sub {
my $self = shift;
if (not $self->$has_timeout or not $self->fh) {
$self->$clear_timer;
return;
}
my $now = AE::now;
my $after = $self->$activity + $self->$timeout - $now;
if ($after <= 0) {
$self->$activity($now);
my $time = $self->$on_timeout;
$time ? $time->($self) : $self->_error->(Errno::ETIMEDOUT, 0);
return if not $self->$has_timeout;
}
weaken $self;
return if not $self;
$self->$timer(AE::timer($after, 0, sub {
$self->$clear_timer;
$callback->($self);
}));
};

has $timeout => (
is => 'rw',
isa => sub {
return $_[0] >= 0;
},
predicate => $has_timeout,
clearer => $clear_timeout,
trigger => sub {
my ($self, $value) = @_;
if ($value == 0) {
$self->$clear_timer;
$self->$clear_timeout;
return;
}
else {
$callback->($self);
}
},
);
has $activity => (
is => 'rw',
default => sub { AE::now },
);

has $on_timeout => (
is => 'rw',
isa => sub { ref($_[0]) eq 'CODE' },
);
no strict 'refs';
*{$timeout_reset} = subname $timeout_reset, sub {
$activity = AE::now;
};
}

sub bind_to {
my ($self, $addr) = @_;
if (ref $addr) {
Expand Down Expand Up @@ -147,7 +220,9 @@ sub _on_addr {

$self->{reader} = AE::io $self->fh, 0, sub {
while (defined (my $addr = recv $self->fh, my ($buffer), $self->{receive_size}, 0)) {
$self->{on_recv}($buffer, $self, $addr);
$self->timeout_reset;
$self->rtimeout_reset;
$self->on_recv->($buffer, $self, $addr);
}
$self->_error(1, "Couldn't recv: $!") if $! != EAGAIN and $! != EWOULDBLOCK;
return;
Expand Down Expand Up @@ -193,7 +268,11 @@ sub _send {
my ($self, $message, $to, $cv) = @_;
my $ret = defined $to ? send $self->{fh}, $message, 0, $to : send $self->{fh}, $message, 0;
$self->_error(1, "$!") if not defined $ret and !$non_fatal{$! + 0};
$cv->($ret) if defined $cv and defined $ret;
if (defined $cv and defined $ret) {
$self->timeout_reset;
$self->wtimeout_reset;
$cv->($ret);
}
return $ret;
}

Expand All @@ -207,7 +286,7 @@ sub _push_writer {
my $ret = $self->_send($msg, $to, $cv);
if (not defined $ret) {
unshift @{$self->{buffers}}, $entry;
$self->on_error->($self->{fh}, 1, "$!") if !$non_fatal{$! + 0};
$self->_error->(1, "$!") if !$non_fatal{$! + 0};
last;
}
}
Expand Down Expand Up @@ -282,6 +361,28 @@ Sets the socket family. The default is C<0>, which means either IPv4 or IPv6. Th
The underlying filehandle.
=attr timeout
=attr rtimeout
=attr wtimeout
If non-zero, then these enables an "inactivity" timeout: whenever this many seconds pass without a successful read or write on the underlying file handle (or a call to C<timeout_reset>), the on_timeout callback will be invoked (and if that one is missing, a non-fatal ETIMEDOUT error will be raised).
There are three variants of the timeouts that work independently of each other, for both read and write (triggered when nothing was read OR written), just read (triggered when nothing was read), and just write: timeout, rtimeout and wtimeout, with corresponding callbacks on_timeout, on_rtimeout and on_wtimeout, and reset functions timeout_reset, rtimeout_reset, and wtimeout_reset.
Note that timeout processing is active even when you do not have any outstanding read or write requests: If you plan to keep the connection idle then you should disable the timeout temporarily or ignore the timeout in the corresponding on_timeout callback, in which case AnyEvent::Handle will simply restart the timeout.
Calling C<clear_timeout> (or setting it to zero, which does the same) disables the corresponding timeout.
=attr on_timeout
=attr on_rtimeout
=attr on_wtimeout
The callback that's called whenever the inactivity timeout passes. If you return from this callback, then the timeout will be reset as if some activity had happened, so this condition is not fatal in any way.
=method bind_to($address)
Bind to the specified addres. Note that a bound socket may be rebound to another address. C<$address> must be in the same form as the bind argument to new.
Expand All @@ -294,6 +395,14 @@ Connect to the specified address. Note that a connected socket may be reconnecte
Try to send a message. If a socket is not connected a receptient address must also be given. If it is connected giving a receptient may not work as expected, depending on your platform. It returns C<$cv>, which will become true when C<$message> is sent.
=method timeout_reset
=method rtimeout_reset
=method wtimeout_reset
Reset the activity timeout, as if data was received or sent.
=method destroy
Destroy the handle.
Expand Down
29 changes: 29 additions & 0 deletions t/20-timeout.t
@@ -0,0 +1,29 @@
#! perl

use strict;
use warnings FATAL => 'all';
use Test::More 0.89;
use Test::Exception;

use AnyEvent::Handle::UDP;
use IO::Socket::INET;

alarm 12;

{
my $cb = AE::cv;
my $cb2 = AE::cv;
my $server = AnyEvent::Handle::UDP->new(
on_recv => $cb,
timeout => 3, on_timeout => sub { $cb->croak("Timeout") },
rtimeout => 4.5, on_rtimeout => sub { $cb2->croak("Read Timeout") }
);
my $client = IO::Socket::INET->new(PeerHost => 'localhost', PeerPort => 1382, Proto => 'udp');
my $start_time = AE::now;
throws_ok { $cb->recv } qr/Timeout/, 'Receive throws a timeout';
cmp_ok AE::now, '>=', $start_time + 3, 'Three seconds have passed';
throws_ok { $cb2->recv } qr/Read Timeout/, 'Receive throws a timeout again';
cmp_ok AE::now, '>=', $start_time + 4.5, '1.5 more seconds have passed';
}

done_testing;

0 comments on commit b298c1f

Please sign in to comment.