Skip to content

Commit

Permalink
fix: Fix multi-threading support on windows, essentially for netdisco…
Browse files Browse the repository at this point in the history
…very & netinventory

Add few optimizations & update related unittests
  • Loading branch information
g-bougard committed Jun 28, 2024
1 parent 61ac477 commit 4008e9f
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 23 deletions.
6 changes: 6 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ core:
* Add support for OAuth2 authentication included in next main GLPI version.
* Reduce drift on run date keeping not randomized time reference
* fix: Don't reset run date in hourly run for unmanaged mode
* Control concurrent calls to not thread safe apis on windows
* Little optimization on GLPI::Agent::XML objects check

inventory:
* fix #680: Enhanced disk storage serialnumber support on Windows (one more case)
* fix #565: Add support for Cortex XDR Antivirus on windows.
This is also an attempt to start antivirus support on Windows Server based on
service detection.

netdiscovery/netinventory:
* Always send netinventory jobs end messages from runners
* Fixed tasks blocking on windows with core concurrent calls control

esx:
* fix #691: Fix perl error while checking esx configuration template

Expand Down
28 changes: 16 additions & 12 deletions lib/GLPI/Agent/Task/NetInventory.pm
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ sub run {
my $skip_start_stop = 0;
foreach my $job (@{$self->{jobs}}) {
$devices_count += $job->count();
# Support glpi-netdiscovery --control option
$self->{_control} = $job->control;
# newer server won't need START message if PID is provided on <DEVICE/>
next if $skip_start_stop;
$skip_start_stop = $job->skip_start_stop || any { defined($_->{PID}) } $job->devices();
Expand Down Expand Up @@ -187,15 +189,7 @@ sub run {
return unless $jobid;
my $job = $jobs{$jobid};
$queued_count--;
if ($job->done) {
# send final message to the server before cleaning jobs
$self->_sendStopMessage($jobid) unless $skip_start_stop;

delete $jobs{$jobid};

# send final message to the server
$self->_sendStopMessage($jobid) unless $skip_start_stop;
}
delete $jobs{$jobid} if $job->done;
$devices_count--;
# Only reduce expiration when few devices are still to be scanned
if ($devices_count > 4 && $expiration > time + $devices_count*$target_expiration) {
Expand Down Expand Up @@ -272,11 +266,20 @@ sub run {
}

# Get result PID from result
my $thispid = delete $result->{PID};
my $thispid = delete $result->{PID} // $pid;

# Directly send the result message from the worker, but use job pid if
# it was not set in result
$self->_sendResultMessage($result, $thispid || $pid, $device->{IP});
$self->_sendResultMessage($result, $thispid, $device->{IP});

# Send control messages unless not required
if (!$skip_start_stop || $self->{_control}) {
# send end message to the server for this job
$self->_sendStopMessage($thispid);

# send final end message to the server
$self->_sendStopMessage($thispid);
}

delete $self->{logger}->{prefix} if $worker_count > 1;

Expand Down Expand Up @@ -350,11 +353,12 @@ sub _sendMessage {

if ($self->{target}->isType('local')) {
my ($handle, $file);
return unless $content->{DEVICE};
my $path = $self->{target}->getPath();
if ($path eq '-') {
return unless $content->{DEVICE} || $self->{_control};
$handle = \*STDOUT;
} else {
return unless $content->{DEVICE};
$path = $self->{target}->getFullPath("netinventory");
mkpath($path) unless -d $path;
$file = $path . "/$ip.xml";
Expand Down
10 changes: 9 additions & 1 deletion lib/GLPI/Agent/Tools/Win32.pm
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,7 @@ sub FreeAgentMem {

my $worker ;
my $worker_semaphore;
my $workers_semaphore;
my $worker_lasterror = [];

my @win32_ole_calls : shared;
Expand All @@ -918,6 +919,7 @@ sub start_Win32_OLE_Worker {
# Request a semaphore on which worker blocks immediatly
Thread::Semaphore->require();
$worker_semaphore = Thread::Semaphore->new(0);
$workers_semaphore = Thread::Semaphore->new(1);

# Start a worker thread
$worker = threads->create( \&_win32_ole_worker );
Expand Down Expand Up @@ -1051,12 +1053,15 @@ sub call_not_thread_safe_api_on_win32 {
$call->{expiration} = $expiration;

if (defined($worker)) {
# Limit concurrent calls from running threads
$workers_semaphore->down();

# Share the expect call
my $call = shared_clone($call);
my $result;

if (defined($call)) {
# Be sure the worker block
# Be sure the worker blocks
$worker_semaphore->down_nb();

# Lock list calls before releasing semaphore so worker waits
Expand All @@ -1077,6 +1082,9 @@ sub call_not_thread_safe_api_on_win32 {
# Be sure to always block worker on semaphore from now
$worker_semaphore->down_nb();

# Free any concurrent thread call
$workers_semaphore->up();

if (exists($call->{'result'})) {
$result = $call->{'result'};
} elsif (time < $expiration) {
Expand Down
26 changes: 19 additions & 7 deletions lib/GLPI/Agent/XML.pm
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ sub new {
}

$self->string($params{string});
$self->file($params{file}) unless $self->has_xml();
$self->file($params{file}) unless $self->_is_xml();

# Support library options set as private object attributes
map { $self->{"_$_"} = $params{$_} } grep { defined($params{$_}) } qw(
Expand Down Expand Up @@ -87,6 +87,15 @@ sub _empty {
return $self;
}

# On MSWin32, it is only intended to be called in dedicated thread
sub _is_xml {
my ($self) = @_;

my $xml = $self->_xml;

return ref($xml) eq 'XML::LibXML::Document' && $xml->documentElement() ? 1 : 0;
}

sub has_xml {
my ($self) = @_;

Expand All @@ -98,9 +107,7 @@ sub has_xml {
);
}

my $xml = $self->_xml;

return ref($xml) eq 'XML::LibXML::Document' && $xml->documentElement() ? 1 : 0;
return defined($self->_xml) ? 1 : 0;
}

sub string {
Expand All @@ -120,6 +127,7 @@ sub string {
$self->_init_libxml() unless $self->{_parser};

$self->_empty->_xml($self->{_parser}->parse_string(decode("UTF-8", $string)));
$self->_empty unless $self->_is_xml;

return if $self->{_threaded};

Expand All @@ -143,6 +151,7 @@ sub file {
$self->_init_libxml() unless $self->{_parser};

$self->_empty->_xml($self->{_parser}->parse_file($file));
$self->_empty unless $self->_is_xml;

return if $self->{_threaded};

Expand Down Expand Up @@ -242,11 +251,14 @@ sub write {
}

if ($hash) {
$self->_empty->_build_xml($hash)
or return;
$self->_empty->_build_xml($hash);
unless ($self->_is_xml()) {
$self->_empty();
return;
}
}

return '' unless $self->has_xml();
return '' unless $self->_is_xml();

# Support XML::LibXML setTagCompression option
$XML::LibXML::setTagCompression = $self->{_tag_compression} ? 1 : 0 ;
Expand Down
12 changes: 12 additions & 0 deletions lib/setup.pm
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use strict;
use warnings;
use parent qw(Exporter);

use English qw(-no_match_vars);
use UNIVERSAL::require;
use File::Spec;
use File::Basename qw(dirname);

Expand Down Expand Up @@ -33,4 +35,14 @@ eval {
}
};

# Set DLL directory from perl exe path on MSWin32
if ($OSNAME eq 'MSWin32') {
Win32::API->require();
my $apiSetDllDirectory = Win32::API->new(
'kernel32',
'BOOL SetDllDirectoryA(LPCSTR lpPathName)'
);
$apiSetDllDirectory->Call(dirname($^X));
}

1;
13 changes: 10 additions & 3 deletions t/tasks/netinventory.t
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ foreach my $test (keys(%responses)) {
} @{$responses{$test}->{SNMPQUERY}};
}

my $plan_tests_count = 6 * keys(%responses);
my $plan_tests_count = 7 * keys(%responses);
foreach my $case (values(%responses)) {
next unless $case->{SNMPQUERY};
$plan_tests_count += scalar(@{$case->{SNMPQUERY}});
Expand Down Expand Up @@ -514,9 +514,12 @@ $client_module->mock('send', sub {

# In workers, store %params to be sent later in testing process
if ($test_pid != $$) {
store \%params, "$storable_tempdir/sent-$$";
my $index = 0;
# More than one send can be done by workers
while (-e "$storable_tempdir/sent-$$-$index") { $index++; }
store \%params, "$storable_tempdir/sent-$$-$index";
} else {
ok ($responses{$case}->{index}->{$sent}--, "Sent $query message");
ok ($responses{$case}->{index}->{$sent}--, "Sent $query message for $case case: $responses{$case}->{index}->{$sent}\n$sent");
}
}

Expand Down Expand Up @@ -562,6 +565,10 @@ foreach my $case (keys(%responses)) {
unlink $file;
}

# Check if a message has not been sent
my $not_sent = grep { $_ } values(%{$responses{$case}->{index}});
ok($not_sent == 0, "All messages sent for $case case ($not_sent not sent)");

ok(
@{ $task->{jobs} || [] } == $responses{$case}->{cmp}->{jobs},
"$case: total jobs"
Expand Down

0 comments on commit 4008e9f

Please sign in to comment.