Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Basic unit test. Promises example.

  • Loading branch information...
commit 6c3536e6c5e7da11fd0977df01a810cfa455110f 1 parent 200277b
@agorman authored
Showing with 105 additions and 72 deletions.
  1. +105 −72 lib/Reflexive/WorkerPool.pm
View
177 lib/Reflexive/WorkerPool.pm
@@ -169,87 +169,120 @@ Reflexive::WorkerPool - Sandbox for brain dumping workerpool ideas/concepts
=head1 SYNOPSIS
-{
- package MyJob;
- use Moose;
- extends 'Reflex::Base';
- use Reflex::Trait::EmitsOnChange;
- with 'Reflex::Role::Collectible';
- with 'Reflexive::WorkerPool::Role::Job';
-
- # Changing this attribute inside work will also update the MyJob object in
- # this process provided:
- # 1) The attribute is rw
- # 2) The attribute uses the Reflex::Trait::EmitsOnChange trait
- emits attr => (
- is => 'rw',
- isa => 'Int',
- );
-
- # This method will be executed within it's own process
- sub work {
- my $self = shift;
-
- $self->attr(42);
-
- sleep 5;
+ {
+ package MyJob;
+ use Moose;
+ extends 'Reflex::Base';
+ use Reflex::Trait::EmitsOnChange;
+ with 'Reflex::Role::Collectible';
+ with 'Reflexive::WorkerPool::Role::Job';
+
+ # Changing this attribute inside work will also update the MyJob object in
+ # this process provided:
+ # 1) The attribute is rw
+ # 2) The attribute uses the Reflex::Trait::EmitsOnChange trait
+ emits attr => (
+ is => 'rw',
+ isa => 'Int',
+ );
+
+ # This method will be executed within it's own process
+ sub work {
+ my $self = shift;
+
+ $self->attr(42);
+
+ sleep 5;
+ }
}
-}
-
-{
- package HasWorkerPool;
- use Moose;
- extends 'Reflex::Base';
- use Reflexive::WorkerPool;
- use Reflex::Trait::Observed;
-
- observes pool => (
- is => 'rw',
- isa => 'Reflexive::WorkerPool',
- setup => {
- max_workers => 5,
- max_jobs_per_worker => 1,
- poll_interval => 1,
- },
- handles => {
- enqueue => 'enqueue_job',
+
+ {
+ package HasWorkerPool;
+ use Moose;
+ extends 'Reflex::Base';
+ use Reflexive::WorkerPool;
+ use Reflex::Trait::Observed;
+
+ observes pool => (
+ is => 'rw',
+ isa => 'Reflexive::WorkerPool',
+ setup => {
+ max_workers => 5,
+ max_jobs_per_worker => 1,
+ poll_interval => 1,
+ },
+ handles => {
+ enqueue => 'enqueue_job',
+ }
+ );
+
+ # The ready_to_work event files when the pool_interval is reached and the
+ # workerpool isn't full.
+ sub on_pool_ready_to_work {
+ my $self = shift;
+
+ for (1..$self->pool->available_job_slots) {
+ $self->enqueue(MyJob->new);
+ }
}
- );
-
- # The ready_to_work event files when the pool_interval is reached and the
- # workerpool isn't full.
- sub on_pool_ready_to_work {
- my $self = shift;
-
- for (1..$self->pool->available_job_slots) {
- $self->enqueue(MyJob->new);
+
+ sub on_pool_job_started {
+ my ( $self, $job ) = @_;
+
+ printf "Job: %s, started!\n", $job->get_id;
+ }
+
+ sub on_pool_job_stopped {
+ my ( $self, $job ) = @_;
+
+ printf "Job: %s, stopped!\n", $job->get_id;
+
+ print $job->attr; # prints 42
+ }
+
+ sub on_pool_job_updated {
+ my ( $self, $job ) = @_;
+
+ printf "Job: %s, stopped!\n", $job->get_id;
+
+ print $job->attr; # prints 42
}
}
+
+ HasWorkerPool->new->run_all();
- sub on_pool_job_started {
- my ( $self, $job ) = @_;
- printf "Job: %s, started!\n", $job->get_id;
- }
-
- sub on_pool_job_stopped {
- my ( $self, $job ) = @_;
+No way man, I want to use promises!
- printf "Job: %s, stopped!\n", $job->get_id;
- print $job->attr; # prints 42
+ {
+ package MyJob;
+ use Moose;
+ extends 'Reflex::Base';
+ with 'Reflex::Role::Collectible';
+ with 'Reflexive::WorkerPool::Role::Job';
+
+ # This method will be executed within it's own process
+ sub work {}
}
-
- sub on_pool_job_updated {
- my ( $self, $job ) = @_;
-
- printf "Job: %s, stopped!\n", $job->get_id;
-
- print $job->attr; # prints 42
+
+ use Reflexive::WorkerPool;
+
+ my $pool = Reflexive::WorkerPool->new;
+ my @jobs = ( MyJob->new, MyJob->new, MyJob->new );
+
+ $pool->enqueue_job(shift @jobs);
+
+ while(my $event = $pool->next()) {
+ print "$event->{name}\n";
+
+ if ($event->{name} eq 'job_stopped') {
+ my $job = shift @jobs;
+ last unless $job;
+
+ $pool->enqueue_job($job);
+ }
}
-}
-
-HasWorkerPool->new->run_all();
=head1 DESCRIPTION
@@ -277,7 +310,7 @@ or job_stopped events are fired the job's attributes reflect the changes.
Gets the number of jobs that can be run by the worker pool at any given moment.
-=head2 enqueue
+=head2 enqueue_job
(Object $job)
Please sign in to comment.
Something went wrong with that request. Please try again.