Skip to content

Commit

Permalink
fix leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
FGasper committed Nov 13, 2020
1 parent acc734d commit db60456
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 25 deletions.
1 change: 1 addition & 0 deletions Changes
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Revision history for Perl module Protocol::DBus
- BUG FIX: Prevent inadvertent event loop stoppage after initialize()
in Mojo.pm.
- BUG FIX: Make write buffering no longer auto-resume().
- BUG FIX: Fix memory leaks (and detection thereof) in event loop interfaces.
- Make event loops defer polling until at least one initialize() has run.
- Increase test coverage.
- Work around an apparent Devel::Cover bug. (https://github.com/pjcj/Devel--Cover/issues/276)
Expand Down
2 changes: 1 addition & 1 deletion lib/Protocol/DBus/Client/AnyEvent.pm
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ sub _set_watches_and_create_messenger {

my $read_watch_sr = \$self->{'_read_watch'};

$self->{'_stop_reading_cr'} = sub {
$self->{'_give_up_cr'} = sub {
$$read_watch_sr = undef;
};
}
Expand Down
33 changes: 22 additions & 11 deletions lib/Protocol/DBus/Client/EventBase.pm
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ It would be great to rectify that.
use Protocol::DBus::Client ();
use Protocol::DBus::Client::EventMessenger ();

use Scalar::Util ();

#----------------------------------------------------------------------

=head1 INSTANCE METHODS
Expand All @@ -53,21 +55,28 @@ this one, is what you’ll use to send and receive messages.
sub initialize {
my ($self) = @_;

my $paused_sr = \$self->{'_paused'};

my $dbus = $self->{'db'};

my $weak_self = $self;
Scalar::Util::weaken( $weak_self );

return $self->{'_initialize_promise'} ||= $self->{'db'}->_get_promise_class()->new( sub {
$self->_initialize(@_);
} )->then( sub {
my $post_send_cr = $self->_set_watches_and_create_messenger();
my $post_send_cr = $weak_self->_set_watches_and_create_messenger();

return Protocol::DBus::Client::EventMessenger->new(
$self->{'db'},
$dbus,
$post_send_cr,
sub {
$self->_pause();
$self->{'_paused'} = 1;
$weak_self->_pause();
$$paused_sr = 1;
},
sub {
$self->_resume();
$self->{'_paused'} = 0;
$weak_self->_resume();
$$paused_sr = 0;
},
);
} );
Expand Down Expand Up @@ -167,7 +176,9 @@ sub _create_get_message_callback {
my $on_signal_cr_r = $self->{'_on_signal_r'} ||= \do { my $v = undef };

my $on_failure_cr_r = \$self->{'_on_failure'};
my $stop_reading_cr_r = \$self->{'_stop_reading_cr'};
my $_give_up_cr_r = \$self->{'_give_up_cr'};

my $paused_r = \$self->{'_paused'};

return sub {
my $ok = eval {
Expand All @@ -184,7 +195,7 @@ sub _create_get_message_callback {
# a self-directed signal while this loop was happening,
# which caused receipt of messages even after pause()
# had been called.
last if $self->{'_paused'};
last if $$paused_r;
}

1;
Expand All @@ -200,14 +211,14 @@ sub _create_get_message_callback {
warn $err;
}

$$stop_reading_cr_r->();
$$_give_up_cr_r->();
}
};
}

sub DESTROY {
if (defined ${^GLOBAL_PHASE} && 'DESTROY' eq ${^GLOBAL_PHASE}) {
warn "$_[0] lasted until ${^GLOBAL_PHASE} phase!";
if (defined(${^GLOBAL_PHASE}) && 'DESTRUCT' eq ${^GLOBAL_PHASE}) {
warn "$_[0] lasted until global destruction!";
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/Protocol/DBus/Client/IOAsync.pm
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ sub _set_watches_and_create_messenger {

my $loop = $self->{'loop'};

$self->{'_stop_reading_cr'} = sub {
$self->{'_give_up_cr'} = sub {
$loop->remove($$watch_sr);
$$watch_sr = undef;
};
Expand Down
23 changes: 11 additions & 12 deletions lib/Protocol/DBus/Client/Mojo.pm
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ sub initialize {
sub _to_mojo {
my ($p_es6) = @_;

return Mojo::Promise->new( sub { $p_es6->then(@_) } )->then( sub {
return _PROMISE_CLASS()->new( sub { $p_es6->then(@_) } )->then( sub {
return bless $_[0], 'Protocol::DBus::Client::Mojo::Messenger';
} );
}
Expand Down Expand Up @@ -125,6 +125,7 @@ sub _initialize {
};
}
else {
$is_write_listening = 0;
$reactor->watch($socket, 1, 0);
}
};
Expand All @@ -141,11 +142,11 @@ sub _initialize {
}

sub _flush_send_queue {
my ($dbus, $reactor, $socket) = @_;
my ($dbus, $reactor, $socket, $read_yn) = @_;

my $is_empty = $dbus->flush_write_queue();

$reactor->watch($socket, !$_[0]->{'_paused'}, !$is_empty);
$reactor->watch($socket, $read_yn, !$is_empty);

return;
}
Expand All @@ -162,13 +163,15 @@ sub _set_watches_and_create_messenger {
my $reactor = Mojo::IOLoop->singleton->reactor();
my $socket = $self->{'socket'};

my $paused_r = $self->{'_paused'};

$reactor->io(
$self->{'socket'},
sub {
(undef, my $writable) = @_;

if ($writable) {
_flush_send_queue($dbus, $reactor, $socket);
_flush_send_queue($dbus, $reactor, $socket, !$$paused_r);
}
else {
$read_cb->();
Expand All @@ -178,20 +181,18 @@ sub _set_watches_and_create_messenger {

$self->_resume();

$self->{'_stop_reading_cr'} = sub {
$self->{'_give_up_cr'} = sub {
Mojo::IOLoop->singleton->reactor()->remove($socket);
};

return sub {
if ($dbus->pending_send()) {
_flush_send_queue( $dbus, $reactor, $socket );
_flush_send_queue( $dbus, $reactor, $socket, !$$paused_r );
}
};
}

sub _pause {
$_[0]->{'_paused'} = 1;

Mojo::IOLoop->singleton->reactor()->watch(
$_[0]{'socket'},
0,
Expand All @@ -200,8 +201,6 @@ sub _pause {
}

sub _resume {
delete $_[0]->{'_paused'};

Mojo::IOLoop->singleton->reactor()->watch(
$_[0]{'socket'},
1,
Expand Down Expand Up @@ -235,9 +234,9 @@ use parent 'Protocol::DBus::Client::EventMessenger';
sub send_call {
my $p = $_[0]->SUPER::send_call( @_[ 1 .. $#_ ] );

return Mojo::Promise->new( sub { $p->then(@_) } );
return Protocol::DBus::Client::Mojo::_PROMISE_CLASS()->new( sub { $p->then(@_) } );
}

*send_call_p = *send_call;
*send_call_p = __PACKAGE__->can('send_call');

1;

0 comments on commit db60456

Please sign in to comment.