-
Notifications
You must be signed in to change notification settings - Fork 1
/
eg-02-job-updates.pl
88 lines (69 loc) · 1.61 KB
/
eg-02-job-updates.pl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env perl
use strict;
use warnings;
use lib qw(../lib);
# Job updates across the process boundary.
{
package MyJob;
use Moose;
extends 'Reflex::Base';
use Reflex::Trait::EmitsOnChange;
with 'Reflex::Role::Collectible';
with 'Reflexive::WorkerPool::Role::Job';
# Changing this attribute inside work will also update the MyJob object in
# this process provided:
# 1) The attribute is rw
# 2) The attribute uses the Reflex::Trait::EmitsOnChange trait
emits attr => (
is => 'rw',
isa => 'Int',
);
# This method will be executed within it's own process
sub work {
my $self = shift;
$self->attr(42);
sleep 5;
}
}
{
package HasWorkerPool;
use Moose;
extends 'Reflex::Base';
use Reflexive::WorkerPool;
use Reflex::Trait::Observed;
observes pool => (
is => 'rw',
isa => 'Reflexive::WorkerPool',
setup => {
max_workers => 5,
max_jobs_per_worker => 1,
poll_interval => 1,
},
handles => {
enqueue => 'enqueue_job',
}
);
# The ready_to_work event files when the pool_interval is reached and the
# workerpool isn't full.
sub on_pool_ready_to_work {
my $self = shift;
for (1..$self->pool->available_job_slots) {
$self->enqueue(MyJob->new);
}
}
sub on_pool_job_started {
my ( $self, $job ) = @_;
printf "Job: %s, started!\n", $job->get_id;
}
sub on_pool_job_stopped {
my ( $self, $job ) = @_;
printf "Job: %s, stopped!\n", $job->get_id;
print $job->attr; # prints 42
}
sub on_pool_job_updated {
my ( $self, $job ) = @_;
printf "Job: %s, stopped!\n", $job->get_id;
print $job->attr; # prints 42
}
}
HasWorkerPool->new->run_all();