Skip to content

Commit

Permalink
Merge pull request #113 from hathitrust/DEV-1075
Browse files Browse the repository at this point in the history
DEV-1075: add HTFeed::JobMetrics
  • Loading branch information
mwarin committed May 10, 2024
2 parents 87dc6a5 + d502e1e commit 55eda8f
Show file tree
Hide file tree
Showing 17 changed files with 990 additions and 270 deletions.
2 changes: 0 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ RUN echo "deb [signed-by=/usr/share/keyrings/hathitrust-archive-keyring.gpg] htt

RUN apt-get update && apt-get install -y grokj2k-tools

RUN cpan -f -i Net::AMQP::RabbitMQ

COPY etc/imagemagick-policy.xml /etc/ImageMagick-6/policy.xml

COPY etc/jhove-auto-install.xml /tmp/jhove-auto-install.xml
Expand Down
1 change: 1 addition & 0 deletions Makefile.PL
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ WriteMakefile(
'Mouse' => 0,
'Net::AMQP::RabbitMQ' => 0,
'Net::Prometheus' => 0,
'Prometheus::Tiny::Shared' => 0,
'ProgressTracker' => 0,
'Readonly' => 0,
'Roman' => 0,
Expand Down
56 changes: 27 additions & 29 deletions bin/validate_volume.pl
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,33 @@
}

unless ($realmeta) {

*HTFeed::Volume::get_sources = sub {
return ( 'ht_test','ht_test','ht_test' );
};

# use faked-up marc in case it's missing

*HTFeed::SourceMETS::_get_marc_from_zephir = sub {
my $self = shift;
my $marc_path = shift;

my $identifier = $self->{volume}->get_identifier();

if (not HTFeed::Stage::Download::download($self,
url => "http://zephir.cdlib.org/api/item/" . $self->{volume}->get_identifier(),
path => dirname($marc_path),
filename => basename($marc_path),
not_found_ok => 1)) {


HTFeed::Stage::Download::download($self,
url => "http://zephir.cdlib.org/api/item/mdp.39015039746220",
path => dirname($marc_path),
filename => basename($marc_path),
not_found_ok => 1);

}

};
no warnings 'redefine';

*HTFeed::Volume::get_sources = sub {
return ('ht_test', 'ht_test', 'ht_test');
};

# use faked-up marc in case it's missing
*HTFeed::SourceMETS::_get_marc_from_zephir = sub {
my $self = shift;
my $marc_path = shift;
my $downloaded = HTFeed::Stage::Download::download(
$self,
url => "http://zephir.cdlib.org/api/item/" . $self->{volume}->get_identifier(),
path => dirname($marc_path),
filename => basename($marc_path),
not_found_ok => 1
);
if (not $downloaded) {
HTFeed::Stage::Download::download(
$self,
url => "http://zephir.cdlib.org/api/item/mdp.39015039746220",
path => dirname($marc_path),
filename => basename($marc_path),
not_found_ok => 1
);
}
};
}

pod2usage(-msg => 'must specify package type with -p or -d') if not defined $default_packagetype and not defined $dot_packagetype;
Expand Down
161 changes: 110 additions & 51 deletions lib/HTFeed/Job.pm
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package HTFeed::Job;

#use Moose;
use Mouse;
use HTFeed::Volume;
use HTFeed::Config;
use Carp;

use HTFeed::Config;
use HTFeed::Volume;
use Log::Log4perl qw(get_logger);
use Mouse;

has [qw(pkg_type namespace id)] => (is => 'ro', isa => 'Str', lazy_build => 1);
has 'status' => (is => 'ro', isa => 'Str', required => 1, default => 'ready');
Expand All @@ -18,6 +16,7 @@ has '_release' => (is => 'ro', isa => 'Bool', init_a
has 'volume' => (is => 'ro', isa => 'Object', lazy_build => 1);
has 'stage' => (is => 'ro', isa => 'Object', init_arg => undef, lazy_build => 1);


=head1 NAME
HTFed::Job
Expand Down Expand Up @@ -54,22 +53,40 @@ HTFeed::Job->new( volume => $volume,
sub update{
my $self = shift;

my $stage = $self->stage;
my $fail = $stage->failed;
my $new_status = $self->new_status;
my $stage = $self->stage;
my $fail = $stage->failed;
my $new_status = $self->new_status;

## TODO: make this a class global or see if it can be better accessed with YAML::Config, etc.
## i.e. put it somwhere else, but preferably somthing tidy
my %release_states = map {$_ => 1} @{get_config('release_states')};

my $release = $self->_release;

get_logger()->info( 'StageSucceeded', objid => $self->id, namespace => $self->namespace, stage => $self->stage_class, detail => $stage->success_info() )
if (!$fail);
get_logger()->info( 'StageFailed', objid => $self->id, namespace => $self->namespace, stage => $self->stage_class, detail => 'fatal=' . ($new_status eq 'punted' ? '1' : '0') )
if ($fail);
if ($fail) {
get_logger()->info(
'StageFailed',
objid => $self->id,
namespace => $self->namespace,
stage => $self->stage_class,
detail => 'fatal=' . ($new_status eq 'punted' ? '1' : '0')
);
} else {
get_logger()->info(
'StageSucceeded',
objid => $self->id,
namespace => $self->namespace,
stage => $self->stage_class,
detail => $stage->success_info()
);
}

&{$self->{callback}}($self->namespace, $self->id, $new_status, $release, $fail);
&{$self->{callback}}(
$self->namespace,
$self->id,
$new_status,
$release,
$fail
);

return;
}
Expand Down Expand Up @@ -101,27 +118,28 @@ sub clean{
my $stage = $self->stage;

$stage->clean();

$stage->clean_punt() if ($self->new_status eq 'punted');

return;
}

# this wraps the default constructor to allow non-hash-style args
around BUILDARGS => sub {
my $orig = shift;
my $class = shift;

# exactly 6 args to construct w/o hash style args
# 10-12 with hash style args
if ( @_ == 6 ) {
my ($pkg_type, $namespace, $id, $status, $failure_count, $callback) = @_;
return $class->$orig(pkg_type => $pkg_type,
namespace => $namespace,
id => $id,
status => $status,
failure_count => $failure_count,
callback => $callback);
return $class->$orig(
pkg_type => $pkg_type,
namespace => $namespace,
id => $id,
status => $status,
failure_count => $failure_count,
callback => $callback
);
}
else {
return $class->$orig(@_);
Expand All @@ -131,12 +149,31 @@ around BUILDARGS => sub {
sub BUILD{
my $self = shift;
my $args = shift;

return
if( (!$self->has_pkg_type && !$self->has_namespace && !$self->has_id and $self->has_volume) or
($self->has_pkg_type && $self->has_namespace && $self->has_id and !$self->has_volume) );

croak 'Error instantiating Job: Must specify namespace, id, and packagetype, OR specify volume object. Must not specify both.';
my $cond1 = (
!$self->has_pkg_type &&
!$self->has_namespace &&
!$self->has_id and
$self->has_volume
);

my $cond2 = (
$self->has_pkg_type &&
$self->has_namespace &&
$self->has_id and
!$self->has_volume
);

# Desired outcome.
return if ($cond1 or $cond2);

# Undesired outcome.
croak join(
" ",
"Error instantiating Job:",
"Must specify namespace, id, and packagetype, OR specify volume object.",
"Must not specify both."
);
}

sub _build_volume{
Expand All @@ -150,15 +187,15 @@ sub _build_volume{
}

sub _build_namespace {
my $self = shift;
my $self = shift;
return $self->volume->get_namespace;
}
sub _build_pkg_type {
my $self = shift;
my $self = shift;
return $self->volume->get_packagetype;
}
sub _build_id {
my $self = shift;
my $self = shift;
return $self->volume->get_objid;
}

Expand All @@ -172,23 +209,24 @@ sub _build_stage_class{

sub _build_stage{
my $self = shift;

my $class = $self->stage_class;
my $volume = $self->volume;

return eval "$class->new(volume => \$volume)";
}

sub _build_new_status{
my $self = shift;

my $stage = $self->stage;

my $success = $stage->succeeded;
my $new_status = $success ?
my $new_status = $success ?
$stage->get_stage_info('success_state') : $stage->get_stage_info('failure_state');
$new_status = 'punted'
if((! $success) and ($self->failure_count >= get_config('failure_limit')));

if ( (! $success) and ($self->failure_count >= get_config('failure_limit')) ) {
$new_status = 'punted';
}

# punt if next status is undefined
$new_status = 'punted' unless $new_status;
Expand Down Expand Up @@ -225,19 +263,33 @@ sub run_job {
my $force_failed_status = shift;

my $stage;

eval {
$stage = $job->stage;

get_logger()->info( 'RunStage', objid => $job->id, namespace => $job->namespace, stage => $job->stage_class );

get_logger()->info(
'RunStage',
objid => $job->id,
namespace => $job->namespace,
stage => $job->stage_class
);

$job->volume->reset();


$stage->run();

};

my $err = $@;
if ( $err and $err !~ /STAGE_ERROR/ and $err !~ /VOLUME_ERROR/) {
get_logger()->error( 'UnexpectedError', objid => $job->id, namespace => $job->namespace, stage => $job->stage_class, detail => $err );
get_logger()->error(
'UnexpectedError',
objid => $job->id,
namespace => $job->namespace,
stage => $job->stage_class,
detail => $err
);
}

# handle fake status set in unit tests
Expand All @@ -253,13 +305,18 @@ sub run_job {
if ($stage and $clean) {
eval { $job->clean(); };
if ($@) {
get_logger()->error( 'UnexpectedError', objid => $job->id, namespace => $job->namespace, stage => $job->stage_class, detail => $@ );
get_logger()->error(
'UnexpectedError',
objid => $job->id,
namespace => $job->namespace,
stage => $job->stage_class,
detail => $@
);
}
}

# update queue table with new status and failure_count
$job->update();

}

=item successor()
Expand All @@ -272,17 +329,19 @@ returns false if we have reached a release state
sub successor {
my $self = shift;

return if($self->_release());
return if($self->_release());

my $failure_count = $self->failure_count + $self->stage->failed;
my $status = $self->new_status;

my $successor = HTFeed::Job->new(volume => $self->volume,
status => $self->new_status,
failure_count => ($failure_count),
callback => $self->callback);

return $successor;
my $successor = HTFeed::Job->new(
volume => $self->volume,
status => $self->new_status,
failure_count => ($failure_count),
callback => $self->callback
);

return $successor;
}

1;
Loading

0 comments on commit 55eda8f

Please sign in to comment.