Skip to content

Commit

Permalink
Add the cPanel TaskQueue Messaging Parser to the library
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Dees committed Nov 11, 2011
1 parent 5d55fcc commit fddc0b5
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 15 deletions.
5 changes: 4 additions & 1 deletion lib/cPanel/TaskQueue.pm
Expand Up @@ -369,9 +369,11 @@ my $taskqueue_uuid = 'TaskQueue';
my $task = $self->find_task($uuid);
my $proc = _get_task_processor( $task );
if ( my $cr = $proc->can('pre_unqueue') ) {
eval {$cr->('Task Failed to Execute', $task) };
eval {$cr->('Task was Cancelled', $task) };
$self->info($@) if $@;
}


# Lock the queue before we begin accessing it.
my $guard = $self->{disk_state}->synch();
my $old_count = @{$self->{queue_waiting}};
Expand Down Expand Up @@ -661,6 +663,7 @@ my $taskqueue_uuid = 'TaskQueue';
# Implement queue "hooks"
if ( my $cr = $proc->can('pre_queue') ) {
eval {$cr->($task)};
$self->info($@) if $@;
}

push @{$self->{queue_waiting}}, $task;
Expand Down
29 changes: 16 additions & 13 deletions lib/cPanel/TaskQueue/Messaging.pm
Expand Up @@ -14,25 +14,27 @@ sub initiate_msg_file {

my $uuid = $task->{_uuid};
my $msg_file = "$dir/${uuid}_msgs";

my $initial_data = time() . ':' . 'info' . ':' . $task->{'_command'};
if ( -e $msg_file ) {
Carp::croak("The TaskQueue message file $msg_file already exists, cannot initialize message queue");
}

# TODO: beef up validation that this worked.
open ( my $msg_fh, '>', $msg_file );
open( my $msg_fh, '>', $msg_file );
print ${msg_fh} $initial_data . "\n";
close $msg_fh;
}

#SERIALIZATION
sub finalize_msg_queue {
my ($dir, $msg, $task ) = @_;
my ( $dir, $msg, $task ) = @_;

my $uuid = $task->{_uuid};

my $uuid = $task->{_uuid};
my $msg = time() . ':info:' . $msg;
$msg = time() . ':info:' . $msg;
my $msg_file = "$dir/" . $task->{'_uuid'};
open (my $msg_fh, '>>',$msg_file);
open( my $msg_fh, '>>', $msg_file );
print ${msg_fh} $msg;
close $msg_fh;
}
Expand All @@ -56,6 +58,7 @@ sub new {
}

if ( !validate_message_format($msg_file) ) {

#FAIL
}

Expand All @@ -73,28 +76,28 @@ sub add_msg {
#TODO: THROW WARNING HERE
return;
}

#SERIALIZATION
my $msg = time() . ':' . $msg_ref->{'type'} . ':'. $msg_ref->{'msg'};
my $msg = time() . ':' . $msg_ref->{'type'} . ':' . $msg_ref->{'msg'};
$self->update_msg_file();
}

sub finish_msging {
my ($self ) = @_;
my ($self) = @_;
my $msg_ref = {
'type' => 'info',
'msg' => 'Task Compelted'.
'msg' => 'Task Completed'
};
$self->add_msg($msg_ref);
}


sub update_msg_file {
my ($self, $msg, $close_fh ) = @_;
my ( $self, $msg, $close_fh ) = @_;
if ( !defined $self->{'msg_fh'} ) {
open $self->{'msg_fh'}, '>>', $self->{'msg_file'};
}
print $self->{'msg_fh'} $msg . "\n";
if ( $close_fh ) {
print {$self->{'msg_fh'}} $msg . "\n";
if ($close_fh) {
close $self->{'msg_fh'};
}
}
Expand Down
92 changes: 92 additions & 0 deletions lib/cPanel/TaskQueue/Messaging/Parser.pm
@@ -0,0 +1,92 @@
package cPanel::TaskQueue::Messaging::Parser;

use File::ReadBackwards ();

#instantiate a new parsing object
sub new {
my ($class, $dir, $id) = @_;
if ( !-d $dir ) {
return undef, 'Msg directory does not exist';
}
my $id_file = "${dir}/${id}_msg";
if ( !-e $id_file ) {
return undef, 'Id file does not exist';
}

open $msg_fh, '<', $id_file || return undef, "unable to open id file ${id_file}, ";
my $call_line = readline $msg_fh;
chomp $call_line if $call_line =~ m/\n$/;

my $end_of_call_tell = tell $msg_fh;
my $self = bless {
'id' => $id,
'_dir' => $dir,
'_file' => $id_file,
'_msg_fh' => $msg_fh,
'call_line' => $call_line,
'_end_of_call_tell' => $end_of_call_tell,
}, $class;

return $self;
}


# Stub, This should be overwritten by something more intelligent
sub get_call_info {
my ($self) = @_;
my ($time, undef, $command ) = split(':', $self->{'call_line'}, 3);
return {
'start_time' => $time,
'parameters' => $command,
};
}

# stub, should be overwritten by something more intelligent
sub get_last_msgs_of_type {
my ($self, @types) = @_;
return $self->_get_last_msgs_of_type(@types);
}

sub _get_last_msgs_of_type {
my ( $self, @types ) = @_;
my $rb_fh = File::ReadBackwards->new( $self->{'_msg_fh'} );
my %res;
while ( my $line = $rb_fh->readline() ) {
my ( undef, $type, undef ) = split(':', $line, 3);

# Add the type to the result hash
if ( ( grep /\Q$type\E/, @types ) && !exists $res{$type} ) {
$res{$type} = $line;
}

# If we have all our types detected, stop processing.
if ( keys %res == length @types ) {
last;
}

}
return wantarray ? %res : \%res;
}

# stub, should be overwritten by something more intelligent
sub get_last_number_of_msgs {
my ( $self, $num_of_lines ) = @_;
return $self->_get_last_number_of_msgs($num_of_lines);
}

sub _get_last_number_of_msgs {
my ($self, $num_of_lines) = @_;
my $rb_fh = File::ReadBackwards->new( $self->{'_msg_fh'} );
my @res;
for ( my $cnt = 0; $cnt < $num_of_lines; $cnt++) {
unshift @res, $rb_fh->readline();
}
return wantarray ? @res : \@res;
}

sub _read_msg_file {
my ($self) = @_;
sysseek( $self->{'_msg_fh'}, 0, $self->{'_end_of_call_tell'} )
}

1;
9 changes: 8 additions & 1 deletion lib/cPanel/TaskQueue/MessagingProcessor.pm
Expand Up @@ -11,13 +11,20 @@ use cPanel::TaskQueue::Messaging ();
my $dir = '/var/cpanel/taskqueue/msgs';
sub pre_queue {
my ($self, $task) = @_;

return if !_initialize_msg_queue();
cPanel::TaskQueue::Messaging::initialize_msg_queue($dir, $task);
}
sub pre_unqueue {
my ($self, $msg, $task) = @_;
return if !_finalize_msg_queue();
cPanel::TaskQueue::Messaging::finalize_msg_queue($dir, $msg, $task);
}
sub _initialize_msg_queue {
# Stub
}
sub _finalize_msg_queue {
# Stub
}
}

1;

0 comments on commit fddc0b5

Please sign in to comment.