Skip to content

Commit

Permalink
TaskQueue: to replace TheSchwartz (#2600)
Browse files Browse the repository at this point in the history
Replace TheSchwartz with a TaskQueue shim, with some plumbing for doing SQS based routing of events. This also has workers for doing our ESN pipeline.
  • Loading branch information
zorkian committed Dec 5, 2019
1 parent 65cbb77 commit b00a0f6
Show file tree
Hide file tree
Showing 231 changed files with 4,003 additions and 250 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -8,6 +8,7 @@
/var
*.sw?
src/proxy/proxy
.vstags

# Ignore SCSS cache
.sass-cache
Expand Down
5 changes: 1 addition & 4 deletions bin/incoming-mail-inject.pl
Expand Up @@ -23,9 +23,6 @@ BEGIN
my $log = Log::Log4perl->get_logger(__PACKAGE__);

use Digest::MD5 qw/ md5_hex /;
use DW::BlobStore;

my $sclient = LJ::theschwartz() or die "No schwartz config.\n";

my $tempfail = sub {
my $msg = shift;
Expand Down Expand Up @@ -73,7 +70,7 @@ ()
$msg = "ie:$md5";
}

my $h = $sclient->insert(
my $h = DW::TaskQueue->dispatch(
TheSchwartz::Job->new(
funcname => "LJ::Worker::IncomingEmail",
arg => $msg,
Expand Down
6 changes: 3 additions & 3 deletions bin/schedule-copier-jobs
Expand Up @@ -17,6 +17,8 @@
#

use strict;
use v5.10;

BEGIN {
require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
}
Expand All @@ -28,11 +30,9 @@ my $dbr = LJ::get_db_reader() or die;
my $sth = $dbr->prepare( q{SELECT userid FROM user WHERE journaltype IN ('P','C')} );
$sth->execute;

my $sclient = LJ::theschwartz() or die;

while ( my ( $userid ) = $sth->fetchrow_array ) {
warn "Scheduling $userid ...\n";
$sclient->insert_jobs( TheSchwartz::Job->new_from_array( 'DW::Worker::Sphinx::Copier',
DW::TaskQueue->dispatch( TheSchwartz::Job->new_from_array( 'DW::Worker::Sphinx::Copier',
{ userid => $userid, source => "schedule" } ) );
select undef, undef, undef, 0.02;
}
24 changes: 24 additions & 0 deletions bin/worker/dw-esn-cluster-subs
@@ -0,0 +1,24 @@
#!/usr/bin/perl
#
# bin/worker/dw-esn-process-sub
#
# DW style ESN subs processor.
#
# Authors:
# Mark Smith <mark@dreamwidth.org>
#
# Copyright (c) 2019 by Dreamwidth Studios, LLC.
#
# This program is free software; you may redistribute it and/or modify it under
# the same terms as Perl itself. For a copy of the license, please reference
# 'perldoc perlartistic' or 'perldoc perlgpl'.
#

use strict;
BEGIN {
require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
}

use DW::TaskQueue;
my $q = DW::TaskQueue->init( %LJ::SQS );
$q->start_work( 'DW::Task::ESN::FindSubsByCluster' );
24 changes: 24 additions & 0 deletions bin/worker/dw-esn-filter-subs
@@ -0,0 +1,24 @@
#!/usr/bin/perl
#
# bin/worker/dw-esn-process-sub
#
# DW style ESN subs processor.
#
# Authors:
# Mark Smith <mark@dreamwidth.org>
#
# Copyright (c) 2019 by Dreamwidth Studios, LLC.
#
# This program is free software; you may redistribute it and/or modify it under
# the same terms as Perl itself. For a copy of the license, please reference
# 'perldoc perlartistic' or 'perldoc perlgpl'.
#

use strict;
BEGIN {
require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
}

use DW::TaskQueue;
my $q = DW::TaskQueue->init( %LJ::SQS );
$q->start_work( 'DW::Task::ESN::FilterSubs' );
24 changes: 24 additions & 0 deletions bin/worker/dw-esn-fired-event
@@ -0,0 +1,24 @@
#!/usr/bin/perl
#
# bin/worker/dw-esn-process-sub
#
# DW style ESN subs processor.
#
# Authors:
# Mark Smith <mark@dreamwidth.org>
#
# Copyright (c) 2019 by Dreamwidth Studios, LLC.
#
# This program is free software; you may redistribute it and/or modify it under
# the same terms as Perl itself. For a copy of the license, please reference
# 'perldoc perlartistic' or 'perldoc perlgpl'.
#

use strict;
BEGIN {
require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
}

use DW::TaskQueue;
my $q = DW::TaskQueue->init( %LJ::SQS );
$q->start_work( 'DW::Task::ESN::FiredEvent' );
24 changes: 24 additions & 0 deletions bin/worker/dw-esn-process-sub
@@ -0,0 +1,24 @@
#!/usr/bin/perl
#
# bin/worker/dw-esn-process-sub
#
# DW style ESN subs processor.
#
# Authors:
# Mark Smith <mark@dreamwidth.org>
#
# Copyright (c) 2019 by Dreamwidth Studios, LLC.
#
# This program is free software; you may redistribute it and/or modify it under
# the same terms as Perl itself. For a copy of the license, please reference
# 'perldoc perlartistic' or 'perldoc perlgpl'.
#

use strict;
BEGIN {
require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
}

use DW::TaskQueue;
my $q = DW::TaskQueue->init( %LJ::SQS );
$q->start_work( 'DW::Task::ESN::ProcessSub' );
6 changes: 3 additions & 3 deletions bin/worker/schedule-synsuck
Expand Up @@ -16,6 +16,8 @@
#

use strict;
use v5.10;

BEGIN {
require "$ENV{LJHOME}/cgi-bin/ljlib.pl";
}
Expand Down Expand Up @@ -49,8 +51,6 @@ sub work {
LJ::start_request();
my $dbh = LJ::get_db_writer()
or die "unable to get db handle\n";
my $sclient = LJ::theschwartz()
or die "unable to get TheSchwartz client\n";

# find feeds that are ready to be checked
my $rows = $dbh->selectcol_arrayref(
Expand All @@ -63,7 +63,7 @@ sub work {

# iterate and schedule jobs
foreach my $row ( @$rows ) {
my $rv = $sclient->insert( TheSchwartz::Job->new(
my $rv = DW::TaskQueue->dispatch( TheSchwartz::Job->new(
funcname => 'DW::Worker::SynSuck',
arg => { userid => $row },

Expand Down
2 changes: 1 addition & 1 deletion cgi-bin/DW/BlobStore/S3.pm
Expand Up @@ -40,7 +40,7 @@ sub init {

my $credentials = Paws::Credential::InstanceProfile->new;
if ( defined $args{access_key} && defined $args{secret_key} ) {
$log->warning('Using INSECURE AWS configuration!');
$log->warn('Using INSECURE AWS configuration!');
$credentials = Paws::Credential::Local->new(
access_key => $args{access_key},
secret_key => $args{secret_key},
Expand Down
17 changes: 8 additions & 9 deletions cgi-bin/DW/Controller/Importer.pm
Expand Up @@ -58,15 +58,14 @@ sub erase_handler {
}

# Confirmed, let's schedule.
my $sclient = LJ::theschwartz() or die "Unable to get TheSchwartz.\n";
my $job = TheSchwartz::Job->new_from_array(
'DW::Worker::ImportEraser',
{
userid => $rv->{u}->userid
}
);
die "Failed to insert eraser job.\n"
unless $job && $sclient->insert($job);
DW::TaskQueue->dispatch(
TheSchwartz::Job->new_from_array(
'DW::Worker::ImportEraser',
{
userid => $rv->{u}->userid
}
)
) or die "Failed to insert eraser job.\n";

return DW::Template->render_template(
'tools/importer/erase.tt',
Expand Down
4 changes: 2 additions & 2 deletions cgi-bin/DW/Controller/MassPrivacy.pm
Expand Up @@ -163,15 +163,15 @@ sub editprivacy_handler {
# User is sure they want to update posts
}
elsif ( $mode eq 'amsure' ) {
my $handle = LJ::MassPrivacy->enqueue_job(
my $rv = LJ::MassPrivacy->enqueue_job(
'userid' => $u->{userid},
's_security' => $security{ $POST->{s_security} }[0],
'e_security' => $security{ $POST->{e_security} }[0],
's_unixtime' => $POST->{s_unixtime},
'e_unixtime' => $POST->{e_unixtime}
);

if ($handle) {
if ($rv) {
$u->log_event(
'mass_privacy_change',
{
Expand Down
4 changes: 1 addition & 3 deletions cgi-bin/DW/Hooks/SiteSearch.pm
Expand Up @@ -81,10 +81,8 @@ LJ::Hooks::register_hook(
sub {
my ($u) = @_;

my $sclient = LJ::theschwartz() or die;

# queue up a copier job, which will notice that the entries by this user have been deleted...
$sclient->insert_jobs(
DW::TaskQueue->dispatch(
TheSchwartz::Job->new_from_array(
'DW::Worker::Sphinx::Copier', { userid => $u->id, source => "purghook" }
)
Expand Down
10 changes: 6 additions & 4 deletions cgi-bin/DW/LatestFeed.pm
Expand Up @@ -22,7 +22,11 @@
#

package DW::LatestFeed;

use strict;
use v5.10;
use Log::Log4perl;
my $log = Log::Log4perl->get_logger(__PACKAGE__);

# time in seconds to hold events for. until an event is this old, we will not
# show it on any page.
Expand All @@ -40,15 +44,13 @@ sub new_item {
my ( $class, $obj ) = @_;
return unless $obj && ref $obj;

my $sclient = LJ::theschwartz() or return;

# entries are [ journalid, jitemid ] which lets us get the LJ::Entry back
if ( $obj->isa('LJ::Entry') ) {
return
unless $obj->journal->is_community
|| $obj->journal->is_individual;

$sclient->insert_jobs(
DW::TaskQueue->dispatch(
TheSchwartz::Job->new_from_array(
'DW::Worker::LatestFeed',
{
Expand All @@ -63,7 +65,7 @@ sub new_item {
# the object easily
}
elsif ( $obj->isa('LJ::Comment') ) {
$sclient->insert_jobs(
DW::TaskQueue->dispatch(
TheSchwartz::Job->new_from_array(
'DW::Worker::LatestFeed',
{
Expand Down
7 changes: 5 additions & 2 deletions cgi-bin/DW/Pay.pm
Expand Up @@ -17,6 +17,9 @@
package DW::Pay;

use strict;
use v5.10;
use Log::Log4perl;
my $log = Log::Log4perl->get_logger(__PACKAGE__);

use Carp qw/ confess /;
use HTTP::Request;
Expand Down Expand Up @@ -667,8 +670,8 @@ sub update_paid_status {

# and now, at this last step, we kick off a job to check if this user
# needs to have their search index setup/messed with.
if ( @LJ::SPHINX_SEARCHD && ( my $sclient = LJ::theschwartz() ) ) {
$sclient->insert_jobs(
if (@LJ::SPHINX_SEARCHD) {
DW::TaskQueue->dispatch(
TheSchwartz::Job->new_from_array(
'DW::Worker::Sphinx::Copier', { userid => $u->id, source => "paidstat" }
)
Expand Down
40 changes: 40 additions & 0 deletions cgi-bin/DW/Task.pm
@@ -0,0 +1,40 @@
#!/usr/bin/perl
#
# DW::Task
#
# Base class for asynchronously executed tasks.
#
# Authors:
# Mark Smith <mark@dreamwidth.org>
#
# Copyright (c) 2019 by Dreamwidth Studios, LLC.
#
# This program is free software; you may redistribute it and/or modify it under
# the same terms as Perl itself. For a copy of the license, please reference
# 'perldoc perlartistic' or 'perldoc perlgpl'.
#

package DW::Task;

use strict;
use v5.10;
use Log::Log4perl;
my $log = Log::Log4perl->get_logger(__PACKAGE__);

use constant COMPLETED => 100;
use constant FAILED => 101;

sub new {
my ( $class, @args ) = @_;

my $self = { args => \@args, };
return bless $self, $class;
}

sub args {
my $self = $_[0];

return $self->{args};
}

1;

0 comments on commit b00a0f6

Please sign in to comment.