diff --git a/eg/eg-02-job-updates.pl b/eg/eg-02-job-updates.pl index 9c741f4..1264ab4 100644 --- a/eg/eg-02-job-updates.pl +++ b/eg/eg-02-job-updates.pl @@ -13,16 +13,20 @@ with 'Reflex::Role::Collectible'; with 'Reflexive::WorkerPool::Role::Job'; + # Changing this attribute inside work will also update the MyJob object in + # this process provided: + # 1) The attribute is rw + # 2) The attribute uses the Reflex::Trait::EmitsOnChange trait emits attr => ( is => 'rw', isa => 'Int', ); + # This method will be executed within it's own process sub work { my $self = shift; - $self->attr(123456789); - $self->attr(2); + $self->attr(42); sleep 5; } @@ -34,9 +38,6 @@ extends 'Reflex::Base'; use Reflexive::WorkerPool; use Reflex::Trait::Observed; - use Reflex::Callbacks qw(cb_method); - use Scalar::Util qw(blessed); - use Data::Dumper; observes pool => ( is => 'rw', @@ -51,6 +52,8 @@ } ); + # The ready_to_work event files when the pool_interval is reached and the + # workerpool isn't full. sub on_pool_ready_to_work { my $self = shift; @@ -69,23 +72,16 @@ my ( $self, $job ) = @_; printf "Job: %s, stopped!\n", $job->get_id; - } - - sub on_pool_job_errored { - my ( $self, $job ) = @_; - printf "Job: %s, errored!\n", $job->get_id; + print $job->attr; # prints 42 } sub on_pool_job_updated { - my ( $self, $state ) = @_; + my ( $self, $job ) = @_; - # This sessions job object - my $job = delete($state->{_sender})->get_first_emitter; - printf "Job: %s, updated with values:\n", $job->get_id; + printf "Job: %s, stopped!\n", $job->get_id; - # Other sessions emitted attribute change - print Dumper $state; + print $job->attr; # prints 42 } } diff --git a/lib/Reflexive/WorkerPool.pm b/lib/Reflexive/WorkerPool.pm index c57b5d7..760b127 100644 --- a/lib/Reflexive/WorkerPool.pm +++ b/lib/Reflexive/WorkerPool.pm @@ -159,4 +159,213 @@ sub _build_workers { return \@workers; } -1; \ No newline at end of file +1; + +__END__ + +=head1 NAME + +Reflexive::WorkerPool - Sandbox for brain dumping workerpool ideas/concepts + +=head1 SYNOPSIS + +{ + package MyJob; + use Moose; + extends 'Reflex::Base'; + use Reflex::Trait::EmitsOnChange; + with 'Reflex::Role::Collectible'; + with 'Reflexive::WorkerPool::Role::Job'; + + # Changing this attribute inside work will also update the MyJob object in + # this process provided: + # 1) The attribute is rw + # 2) The attribute uses the Reflex::Trait::EmitsOnChange trait + emits attr => ( + is => 'rw', + isa => 'Int', + ); + + # This method will be executed within it's own process + sub work { + my $self = shift; + + $self->attr(42); + + sleep 5; + } +} + +{ + package HasWorkerPool; + use Moose; + extends 'Reflex::Base'; + use Reflexive::WorkerPool; + use Reflex::Trait::Observed; + + observes pool => ( + is => 'rw', + isa => 'Reflexive::WorkerPool', + setup => { + max_workers => 5, + max_jobs_per_worker => 1, + poll_interval => 1, + }, + handles => { + enqueue => 'enqueue_job', + } + ); + + # The ready_to_work event files when the pool_interval is reached and the + # workerpool isn't full. + sub on_pool_ready_to_work { + my $self = shift; + + for (1..$self->pool->available_job_slots) { + $self->enqueue(MyJob->new); + } + } + + sub on_pool_job_started { + my ( $self, $job ) = @_; + + printf "Job: %s, started!\n", $job->get_id; + } + + sub on_pool_job_stopped { + my ( $self, $job ) = @_; + + printf "Job: %s, stopped!\n", $job->get_id; + + print $job->attr; # prints 42 + } + + sub on_pool_job_updated { + my ( $self, $job ) = @_; + + printf "Job: %s, stopped!\n", $job->get_id; + + print $job->attr; # prints 42 + } +} + +HasWorkerPool->new->run_all(); + +=head1 DESCRIPTION + +A worker pool for Reflex! The pool contains 0 or more workers. Calling the +enqueue method adds a job to a worker and starts that job running. + +=head2 Workers + +Each worker has 0 or more jobs. The pool delegates jobs to the first free worker +it finds. The worker adds a job to it's L and calls the +job's work() method. + +=head2 Jobs + +Jobs implement a work() method that is run it's own process using +L. + +If a job has an attribute with the L trait then +that attribute will be updated across processes so that when the job_updated +or job_stopped events are fired the job's attributes reflect the changes. + +=head1 METHODS + +=head1 available_job_slots + +Gets the number of jobs that can be run by the worker pool at any given moment. + +=head2 enqueue + + (Object $job) + +Adds an object that consumes Reflexive::WorkerPool::Role::Job, +Reflex::Role::Collectible and implements a work() method. + +=head2 shut_down + +Stops worker pool execution by destroying the internal Reflex::Interval object. + +=head1 ATTRIBUTES + +=head2 max_workers + + is: ro, isa: Int, default: 5 + +The maximum number of workers that can be running at any given time. + +TODO: Should this just be called num_workers? The workers are created during +WorkerPool construction. + +=head2 max_jobs_per_worker + + is: ro, isa: Int, default: 5 + +The maximum number of workers that can be run for a given worker. + +=head2 poll_interval + + is: ro, isa: Int, default: 60 + +How often the workerpool will fire the ready_to_work event so that new jobs can +be added to the pool. Keep in mind that if the pool is full the ready_to_work +event will not fire. + +=head1 CALLBACKS + +=head1 job_started + + (Object $job) + +Fires right before a job's work() method is run. + +=head1 job_stopped + + (Object $job) + +Fires right after a job's work() method is run. + +=head1 job_updated + + (Object $job) + +Fires any time an attribute of a job (with the L +trait) is changed. + +=head1 TODO + +Get Rocco's opinion on the way Job classes update. e.g. POE::Filter::Reference +and STDOUT. + +Add the ability to $workerpool->enqueue( sub { ... }, [ $arg1, $arg2 ] ). This +could be done by creating a BasicJob class that implements a work method to +call the bassed in sub. + +enqueue is probably a bad method name for adding a job to the job. It's not +enqueued... + +Add proper job erroring. This can be done by adding a _error attribute to +Reflexive::WorkerPool::Role::Job and updating it across the process boundary via +STDOUT and POE::Filter::Reference + +The on_pool_... callbacks clober _sender. It makes for a nicer interface but +worse functionality. + +Should be have any other kind of worker balancing besides just grabbing the +first available worker and giving it a job? Does it really matter? + +What other callbacks might people care about? Should workers have callbacks? + +Is there a better way to pass callbacks up than having the workerpool watch jobs +and manually reemit their events? + +=head1 AUTHOR + +Andy Gorman, agorman@cpan.org + +=head1 COPYRIGHT AND LICENSE + +This is free software; you can redistribute it and/or modify it under +the same terms as the Perl 5 programming language system itself. \ No newline at end of file diff --git a/lib/Reflexive/WorkerPool/Role/Job.pm b/lib/Reflexive/WorkerPool/Role/Job.pm index 6627e81..b5bc413 100644 --- a/lib/Reflexive/WorkerPool/Role/Job.pm +++ b/lib/Reflexive/WorkerPool/Role/Job.pm @@ -27,6 +27,9 @@ sub run { Program => sub { my $self = shift; +# $self->emit(event => 'job_updated', args => { +# in => 'process', +# }); $self->_bind_update_handlers(); $self->work(); $self->ignore($self); @@ -50,7 +53,13 @@ sub on_child_signal { sub on_child_stdout { my ( $self, $args ) = @_; - $self->emit(event => 'job_updated', args => $args->{output}); + my $update = $args->{output}; + + foreach my $key ( keys %$update ) { + $self->$key($args->{output}->{$key}) if $self->can($key); + } + + $self->emit(event => 'job_updated', args => $self); } ################################################################################ @@ -79,4 +88,23 @@ sub _bind_update_handlers { } } -1; \ No newline at end of file +1; + +__END__ + +=head1 NAME + +Reflexive::WorkerPool::Role::Job - Handles job execution + +=head1 DESCRIPTION + +See L for details. + +=head1 AUTHOR + +Andy Gorman, agorman@cpan.org + +=head1 COPYRIGHT AND LICENSE + +This is free software; you can redistribute it and/or modify it under +the same terms as the Perl 5 programming language system itself. \ No newline at end of file diff --git a/lib/Reflexive/WorkerPool/Worker.pm b/lib/Reflexive/WorkerPool/Worker.pm index 2f895fd..ae2dcfd 100644 --- a/lib/Reflexive/WorkerPool/Worker.pm +++ b/lib/Reflexive/WorkerPool/Worker.pm @@ -45,4 +45,23 @@ sub add_job { $job->run(); } -1; \ No newline at end of file +1; + +__END__ + +=head1 NAME + +Reflexive::Worker - Manages a collection of jobs + +=head1 DESCRIPTION + +See L for details. + +=head1 AUTHOR + +Andy Gorman, agorman@cpan.org + +=head1 COPYRIGHT AND LICENSE + +This is free software; you can redistribute it and/or modify it under +the same terms as the Perl 5 programming language system itself. \ No newline at end of file