Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
IPC - Use SHM when available to notify of pending events
When SHM is available use it to notify other processes and threads that
there are pending events.

How this works:

The shm holds a string, with an initial value. Each thread/proc tracks
what it thinks the last value of the shm is. Whenever an event is sent
via IPC it will change the shm value. Any thread/proc that then reads
shm will see that it is out of sync and knows it should cull. It does
not matter if other procs/threads also change the shm value as that
still results in a difference, so long as all values are unique.

For this to be effective shm should always be set to a unique value. In
this case the Files driver filenames must always be unique, so we set
the shm to the event filename.

If something screws up and a proc or thread fails to see that there are
pending events (because the shm value is repeated?) then the
consequences are pretty minimal. Anything that depends on an event being
read at a specific time should call cull() directly, which reads files
without consulting the shm value. At the end of the test all hubs call
cull() directly as well so all events will still be seen, just later
than ideal.

If an shm cannot be obtained, or is not avaialable then cull simply
always polls alays assuming there are pending events.
  • Loading branch information
exodist committed Jan 6, 2016
1 parent 009899b commit 86aa836
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 1 deletion.
21 changes: 21 additions & 0 deletions lib/Test2/API.pm
Expand Up @@ -38,6 +38,8 @@ our @EXPORT_OK = qw{
test2_ipc_polling
test2_ipc_disable_polling
test2_ipc_enable_polling
test2_ipc_get_pending
test2_ipc_set_pending
test2_formatter
test2_formatters
Expand Down Expand Up @@ -76,6 +78,8 @@ sub test2_ipc_drivers { @{$INST->ipc_drivers} }
sub test2_ipc_polling { $INST->ipc_polling }
sub test2_ipc_enable_polling { $INST->enable_ipc_polling }
sub test2_ipc_disable_polling { $INST->disable_ipc_polling }
sub test2_ipc_get_pending { $INST->get_ipc_pending }
sub test2_ipc_set_pending { $INST->set_ipc_pending(@_) }

sub test2_formatter { $INST->formatter }
sub test2_formatters { @{$INST->formatters} }
Expand Down Expand Up @@ -815,6 +819,23 @@ time a context is created.
Turn off IPC polling.
=item test2_ipc_set_pending($uniq_val)
Tell other processes and events that an event is pending. C<$uniq_val> should
be a unique value no other thread/process will generate.
B<Note:> After calling this C<test2_ipc_get_pending()> will return 1. This is
intentional, and not avoidable.
=item $pending = test2_ipc_get_pending()
This returns -1 if there is no way to check (assume yes)
This returns 0 if there are (most likely) no pending events.
This returns 1 if there are (likely) pending events. Upon return it will reset,
nothing else will be able to see that there were pending events.
=back
=head2 MANAGING FORMATTERS
Expand Down
91 changes: 90 additions & 1 deletion lib/Test2/API/Instance.pm
Expand Up @@ -18,6 +18,9 @@ use Test2::Util::HashBase qw{
ipc stack formatter
contexts
ipc_shm_size
ipc_shm_last
ipc_shm_id
ipc_polling
ipc_drivers
formatters
Expand Down Expand Up @@ -132,6 +135,24 @@ sub _finalize {
for my $driver (@{$self->{+IPC_DRIVERS}}) {
next unless $driver->can('is_viable') && $driver->is_viable;
$self->{+IPC} = $driver->new or next;

return unless $self->{+IPC}->use_shm;

try {
require IPC::SysV;

my $ipc_key = IPC::SysV::IPC_PRIVATE();
my $shm_size = $self->{+IPC}->can('shm_size') ? $self->{+IPC}->shm_size : 64;
my $shm_id = shmget($ipc_key, $shm_size, 0666) or die;

my $initial = 'a' x $shm_size;
shmwrite($shm_id, $initial, 0, $shm_size) or die;

$self->{+IPC_SHM_SIZE} = $shm_size;
$self->{+IPC_SHM_ID} = $shm_id;
$self->{+IPC_SHM_LAST} = $initial;
};

return;
}

Expand Down Expand Up @@ -231,12 +252,47 @@ sub enable_ipc_polling {
$self->add_context_init_callback(
# This is called every time a context is created, it needs to be fast.
# $_[0] is a context object
sub { $_[0]->{hub}->cull if $self->{+IPC_POLLING} }
sub {
return unless $self->{+IPC_POLLING};
return $_[0]->{hub}->cull unless defined $self->{+IPC_SHM_ID};

my $val;
shmread($self->{+IPC_SHM_ID}, $val, 0, $self->{+IPC_SHM_SIZE}) or return -1;

This comment has been minimized.

Copy link
@exodist

exodist Jan 8, 2016

Author Member

oops, the return -1 should just be return, doesn't matter a lot as the return from this sub is ignored.

return if $val eq $self->{+IPC_SHM_LAST};
shmread($self->{+IPC_SHM_ID}, $val, 0, $self->{+IPC_SHM_SIZE});

This comment has been minimized.

Copy link
@exodist

exodist Jan 8, 2016

Author Member

oops, should NOT be reading ti a second time, will fix.

$self->{+IPC_SHM_LAST} = $val;

$_[0]->{hub}->cull;
}
) unless defined $self->ipc_polling;

$self->set_ipc_polling(1);
}

sub get_ipc_pending {
my $self = shift;
return -1 unless defined $self->{+IPC_SHM_ID};
my $val;
shmread($self->{+IPC_SHM_ID}, $val, 0, $self->{+IPC_SHM_SIZE}) or return -1;
return 0 if $val eq $self->{+IPC_SHM_LAST};
$self->{+IPC_SHM_LAST} = $val;
return 1;
}

sub set_ipc_pending {
my $self = shift;

return undef unless defined $self->{+IPC_SHM_ID};

my ($val) = @_;

confess "value is required for set_ipc_pending"
unless $val;

shmwrite($self->{+IPC_SHM_ID}, $val, 0, $self->{+IPC_SHM_SIZE});
shmread($self->{+IPC_SHM_ID}, $val, 0, $self->{+IPC_SHM_SIZE});

This comment has been minimized.

Copy link
@exodist

exodist Jan 8, 2016

Author Member

this read is not necessary, will remove

}

sub disable_ipc_polling {
my $self = shift;
return unless defined $self->{+IPC_POLLING};
Expand Down Expand Up @@ -446,6 +502,39 @@ This is intended to be called in an C<END { ... }> block. This will look at
test state and set $?. This will also call any end callbacks, and wait on child
processes/threads.
=item $shm_id = $obj->ipc_shm_id()
If SHM is enabled for IPC this will be the shm_id for it.
=item $shm_size = $obj->ipc_shm_size()
If SHM is enabled for IPC this will be the size of it.
=item $shm_last_val = $obj->ipc_shm_last()
If SHM is enabled for IPC this will return the last SHM value seen.
=item $obj->set_ipc_pending($val)
use the IPC SHM to tell other processes and threads there is a pending event.
C<$val> should be a unique value no other thread/process will generate.
B<Note:> This will also make the current process see a pending event. It does
not set C<ipc_shm_last()>, this is important because doing so could hide a
previous change.
=item $pending = $obj->get_ipc_pending()
This returns -1 if SHM is not enabled for IPC.
This returns 0 if the SHM value matches the last known value, which means there
are no pending events.
This returns 1 if the SHM value has changed, which means there are probably
pending events.
When 1 is returned this will set C<< $obj->ipc_shm_last() >>.
=item $drivers = $obj->ipc_drivers
Get the list of IPC drivers.
Expand Down
30 changes: 30 additions & 0 deletions lib/Test2/IPC/Driver.pm
Expand Up @@ -15,6 +15,8 @@ sub import {
test2_ipc_add_driver($class);
}

sub use_shm { 0 }

for my $meth (qw/send cull add_hub drop_hub waiting is_viable/) {
no strict 'refs';
*$meth = sub {
Expand Down Expand Up @@ -84,6 +86,11 @@ error.
This is the same as C<< $ipc->abort($msg) >> except that it uses
C<Carp::longmess> to add a stack trace to the message.
=item $false = $self->use_shm
The base class always returns false for this method. You may override it if you
wish to use the SHM made avilable in L<Test2::API>/L<Test2::API::Instance>.
=back
=head1 LOADING DRIVERS
Expand Down Expand Up @@ -130,6 +137,10 @@ load it too late for it to be effective.
my ($hid, $e) = @_;
... # Send the event to the proper hub.
# If you are using the SHM you should notify other procs/threads that
# there is a pending event.
Test2::API::test2_ipc_set_pending($uniq_val);
}
sub cull {
Expand Down Expand Up @@ -196,6 +207,10 @@ process+thread.
my ($hid, $e) = @_;
... # Send the event to the proper hub.
# If you are using the SHM you should notify other procs/threads that
# there is a pending event.
Test2::API::test2_ipc_set_pending($uniq_val);
}
=item @events = $ipc->cull($hid)
Expand Down Expand Up @@ -225,6 +240,21 @@ child processes and threads to complete.
=back
=head2 METHODS SUBCLASSES MAY IMPLEMENT OR OVERRIDE
=over 4
=item $bool = $ipc->use_shm()
True if you want to make use of the L<Test2::API>/L<Test2::API::Instance> SHM.
=item $bites = $ipc->shm_size()
Use this to customize the size of the shm space. There are no guarentees about
what the size will be if you do not implement this.
=back
=head1 SOURCE
The source code repository for Test2 can be found at
Expand Down
5 changes: 5 additions & 0 deletions lib/Test2/IPC/Driver/Files.pm
Expand Up @@ -12,6 +12,10 @@ use Storable();
use File::Spec();

use Test2::Util qw/try get_tid pkg_to_file/;
use Test2::API qw/test2_ipc_set_pending/;

sub use_shm { 1 }
sub shm_size { 64 }

sub is_viable { 1 }

Expand Down Expand Up @@ -133,6 +137,7 @@ sub send {
my ($ok, $err) = try {
Storable::store($e, $file);
rename($file, $ready) or $self->abort("Could not rename file '$file' -> '$ready'");
test2_ipc_set_pending($file);
};
if (!$ok) {
my $src_file = __FILE__;
Expand Down
3 changes: 3 additions & 0 deletions t/modules/API/Instance.t
Expand Up @@ -386,20 +386,23 @@ if (CAN_REALLY_FORK) {
$one->enable_ipc_polling;
is(@{$one->context_init_callbacks}, 1, "added the callback");
is($one->ipc_polling, 1, "polling on");
$one->set_ipc_shm_last('abc1');
$one->context_init_callbacks->[0]->({'hub' => 'Fake::Hub'});
is($cull, 1, "called cull once");
$cull = 0;

$one->disable_ipc_polling;
is(@{$one->context_init_callbacks}, 1, "kept the callback");
is($one->ipc_polling, 0, "no polling, set to 0");
$one->set_ipc_shm_last('abc3');
$one->context_init_callbacks->[0]->({'hub' => 'Fake::Hub'});
is($cull, 0, "did not call cull");
$cull = 0;

$one->enable_ipc_polling;
is(@{$one->context_init_callbacks}, 1, "did not add the callback");
is($one->ipc_polling, 1, "polling on");
$one->set_ipc_shm_last('abc3');
$one->context_init_callbacks->[0]->({'hub' => 'Fake::Hub'});
is($cull, 1, "called cull once");
}
Expand Down

0 comments on commit 86aa836

Please sign in to comment.