forked from NixOS/hydra
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
hydra-backfill-ids: create to add jobset_id values to Builds and Jobs
Vacuum every 10 iterations, update 10k at a time.
- Loading branch information
Showing
2 changed files
with
165 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
#! /usr/bin/env perl | ||
|
||
use strict; | ||
use utf8; | ||
use Hydra::Model::DB; | ||
|
||
STDOUT->autoflush(); | ||
STDERR->autoflush(1); | ||
binmode STDERR, ":encoding(utf8)"; | ||
|
||
my $db = Hydra::Model::DB->new(); | ||
my $vacuum = $db->storage->dbh->prepare("VACUUM;"); | ||
|
||
my $dryRun = defined $ENV{'HYDRA_DRY_RUN'}; | ||
|
||
my $batchSize = 10000; | ||
my $iterationsPerVacuum = 500; | ||
|
||
sub backfillJobsJobsetId { | ||
my ($skipLocked) = @_; | ||
my $logPrefix; | ||
|
||
if ($skipLocked) { | ||
$logPrefix = "(pass 1/2)"; | ||
} else { | ||
$logPrefix = "(pass 2/2)"; | ||
} | ||
|
||
print STDERR "$logPrefix Backfilling Jobs records where jobset_id is NULL...\n"; | ||
|
||
my $totalToGoSth = $db->storage->dbh->prepare(<<QUERY); | ||
SELECT COUNT(*) FROM jobs WHERE jobset_id IS NULL | ||
QUERY | ||
|
||
$totalToGoSth->execute(); | ||
my ($totalToGo) = $totalToGoSth->fetchrow_array; | ||
|
||
my $skipLockedStmt = $skipLocked ? "FOR UPDATE SKIP LOCKED" : ""; | ||
my $update10kJobs = $db->storage->dbh->prepare(<<QUERY); | ||
UPDATE jobs | ||
SET jobset_id = ( | ||
SELECT jobsets.id | ||
FROM jobsets | ||
WHERE jobsets.name = jobs.jobset | ||
AND jobsets.project = jobs.project | ||
) | ||
WHERE (jobs.project, jobs.jobset, jobs.name) in ( | ||
SELECT jobsprime.project, jobsprime.jobset, jobsprime.name | ||
FROM jobs jobsprime | ||
WHERE jobsprime.jobset_id IS NULL | ||
$skipLockedStmt | ||
LIMIT ? | ||
); | ||
QUERY | ||
|
||
print STDERR "$logPrefix Total Jobs records without a jobset_id: $totalToGo\n"; | ||
|
||
my $iteration = 0; | ||
my $affected; | ||
do { | ||
$iteration++; | ||
$affected = $update10kJobs->execute($batchSize); | ||
print STDERR "$logPrefix (batch #$iteration; $totalToGo remaining) Jobs.jobset_id: affected $affected rows...\n"; | ||
$totalToGo -= $affected; | ||
|
||
if ($iteration % $iterationsPerVacuum == 0) { | ||
print STDERR "$logPrefix (batch #$iteration) Vacuuming...\n"; | ||
$vacuum->execute(); | ||
} | ||
} while ($affected > 0); | ||
|
||
|
||
if ($skipLocked) { | ||
backfillJobsJobsetId(0); | ||
} | ||
} | ||
|
||
|
||
sub backfillBuildsJobsetId { | ||
my ($skipLocked) = @_; | ||
my $logPrefix; | ||
|
||
if ($skipLocked) { | ||
$logPrefix = "(pass 1/2)"; | ||
print STDERR "$logPrefix Backfilling unlocked Builds records where jobset_id is NULL...\n"; | ||
} else { | ||
$logPrefix = "(pass 2/2)"; | ||
print STDERR "$logPrefix Backfilling all Builds records where jobset_id is NULL...\n"; | ||
} | ||
|
||
my $skipLockedStmt = $skipLocked ? "FOR UPDATE SKIP LOCKED" : ""; | ||
my $update10kBuilds = $db->storage->dbh->prepare(<<"QUERY"); | ||
WITH updateprogress AS ( | ||
UPDATE builds | ||
SET jobset_id = ( | ||
SELECT jobsets.id | ||
FROM jobsets | ||
WHERE jobsets.name = builds.jobset | ||
AND jobsets.project = builds.project | ||
) | ||
WHERE builds.id in ( | ||
SELECT buildprime.id | ||
FROM builds buildprime | ||
WHERE buildprime.jobset_id IS NULL | ||
AND buildprime.id >= ? | ||
ORDER BY buildprime.id | ||
$skipLockedStmt | ||
LIMIT ? | ||
) | ||
RETURNING id | ||
) | ||
SELECT | ||
count(*) AS affected, | ||
max(updateprogress.id) AS highest_id | ||
FROM updateprogress; | ||
QUERY | ||
|
||
my $lowestNullIdSth = $db->storage->dbh->prepare(<<QUERY); | ||
SELECT id FROM builds WHERE jobset_id IS NULL ORDER BY id LIMIT 1 | ||
QUERY | ||
$lowestNullIdSth->execute(); | ||
my ($highestId) = $lowestNullIdSth->fetchrow_array; | ||
|
||
my $totalToGoSth = $db->storage->dbh->prepare(<<QUERY); | ||
SELECT COUNT(*) FROM builds WHERE jobset_id IS NULL AND id >= ? | ||
QUERY | ||
$totalToGoSth->execute($highestId); | ||
my ($totalToGo) = $totalToGoSth->fetchrow_array; | ||
|
||
print STDERR "$logPrefix Total Builds records without a jobset_id: $totalToGo, starting at $highestId\n"; | ||
|
||
my $iteration = 0; | ||
my $affected; | ||
do { | ||
my $previousHighId = $highestId; | ||
$iteration++; | ||
$update10kBuilds->execute($highestId, $batchSize); | ||
($affected, $highestId) = $update10kBuilds->fetchrow_array; | ||
|
||
print STDERR "$logPrefix (batch #$iteration; $totalToGo remaining) Builds.jobset_id: affected $affected rows; max ID: $previousHighId -> $highestId\n"; | ||
$totalToGo -= $affected; | ||
|
||
if ($iteration % $iterationsPerVacuum == 0) { | ||
print STDERR "$logPrefix (batch #$iteration) Vacuuming...\n"; | ||
$vacuum->execute(); | ||
} | ||
} while ($affected > 0); | ||
|
||
if ($skipLocked) { | ||
backfillBuildsJobsetId(0); | ||
} | ||
} | ||
|
||
die "syntax: $0\n" unless @ARGV == 0; | ||
|
||
print STDERR "Beginning with a VACUUM\n"; | ||
$vacuum->execute(); | ||
|
||
backfillJobsJobsetId(1); | ||
backfillBuildsJobsetId(1); | ||
|
||
print STDERR "Ending with a VACUUM\n"; | ||
$vacuum->execute(); |