Skip to content

Commit

Permalink
Merge pull request #303 from hathitrust/DEV-1125-duplicate-holdings-c…
Browse files Browse the repository at this point in the history
…leanup

DEV-1125 - run duplicate holdings cleanup under sidekiq
  • Loading branch information
aelkiss committed May 29, 2024
2 parents 2bf7c3f + 3078966 commit ede5661
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 101 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ gem "sequel"
gem "thor"
gem "zinzout"
gem "puma"
gem "rack-session"
gem "sidekiq"
gem "sidekiq-batch", git: "https://github.com/breamware/sidekiq-batch"

Expand Down
3 changes: 3 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ GEM
nio4r (~> 2.0)
racc (1.8.0)
rack (3.0.11)
rack-session (2.0.0)
rack (>= 3.0.0)
rainbow (3.1.1)
rdoc (6.6.3.1)
psych (>= 4.0.0)
Expand Down Expand Up @@ -212,6 +214,7 @@ DEPENDENCIES
pry
puma
push_metrics!
rack-session
rgl
rspec
rspec-sidekiq
Expand Down
73 changes: 0 additions & 73 deletions bin/cleanup_duplicate_holdings.rb

This file was deleted.

26 changes: 11 additions & 15 deletions bin/sidekiq_web.ru
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
require "sidekiq"
require "sidekiq/web"
require_relative "../config/initializers/sidekiq"

SESSION_KEY = ".session.key"

if !File.exist?(SESSION_KEY)
require "securerandom"
File.open(SESSION_KEY, "w") do |f|
f.write(SecureRandom.hex(32))
end
end
# This is sidekiq_web.ru
# Run with `bundle exec rackup sidekiq_web.ru` or similar.

Services.redis_config[:size] = 1

use Rack::Session::Cookie, secret: File.read(SESSION_KEY), same_site: true, max_age: 86400
require "securerandom"
require "rack/session"
require "sidekiq/web"

# In a multi-process deployment, all Web UI instances should share
# this secret key so they can all decode the encrypted browser cookies
# and provide a working session.
# Rails does this in /config/initializers/secret_token.rb
secret_key = SecureRandom.hex(32)
use Rack::Session::Cookie, secret: secret_key, same_site: true, max_age: 86400
run Sidekiq::Web
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ services:
<<: *holdings-container-defaults
restart: always
volumes:
- .:/usr/src/app
- gem_cache:/gems
- ./example/datasets:/tmp/datasets
command: bundle exec sidekiq -c 1 -r ./lib/sidekiq_jobs.rb

Expand Down
78 changes: 78 additions & 0 deletions lib/cleanup_duplicate_holdings.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# frozen_string_literal: true

require "services"
require "cluster"
require "sidekiq"
require "milemarker"

class CleanupDuplicateHoldings
include Sidekiq::Job
JOB_CLUSTER_COUNT = 100

def self.queue_jobs(job_cluster_count: JOB_CLUSTER_COUNT)
milemarker = Milemarker.new(name: "Queue clusters", batch_size: 50_000)
milemarker.logger = Services.logger

# Iterate over batches of clusters of size job_cluster_count, and queue
# a job for each batch.
#
# We need the "each" in there to turn it into an iterable
# such that "each_slice" will work performantly (i.e. without trying to
# fetch all the results)
Cluster.only(:id).each.each_slice(job_cluster_count) do |batch|
cluster_ids = batch.map(&:_id).map(&:to_s)
# Queues a job of this class
perform_async(cluster_ids)

milemarker.incr(job_cluster_count).on_batch do
Services.logger.info milemarker.batch_line
end
end

milemarker.log_final_line
end

def initialize
@clusters_processed = 0
@old_holdings_processed = 0
@new_holdings_processed = 0
@last_log_time = Time.now
end

def perform(cluster_ids)
cluster_ids.each do |cluster_id|
cluster = Cluster.find(_id: cluster_id)
Services.logger.info("Cleaning cluster #{cluster._id}: #{cluster.ocns}")
old_count = cluster.holdings.count
remove_duplicate_holdings(cluster)
new_count = cluster.holdings.count
update_progress(old_count, new_count)
Thread.pass
end

Services.logger.info("Processed #{@clusters_processed} clusters, #{@old_holdings_processed} old holdings, kept #{@new_holdings_processed} holdings")
end

private

def update_progress(old_count, new_count)
@clusters_processed += 1
@old_holdings_processed += old_count
@new_holdings_processed += new_count
end

# Returns the count of deduped holdings
def remove_duplicate_holdings(cluster)
rejected_count = 0

deduped_holdings = cluster.holdings.group_by(&:update_key).map do |update_key, holdings_group|
latest_date = holdings_group.map(&:date_received).max
holdings_group.reject { |h| h.date_received != latest_date && rejected_count += 1 }
end.flatten

if rejected_count > 0
cluster.holdings = deduped_holdings
cluster.save
end
end
end
5 changes: 5 additions & 0 deletions lib/phctl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class Cleanup < JobCommand
def holdings(inst, date)
run_job(Jobs::Cleanup::Holdings, inst, date)
end

desc "duplicate_holdings", "Cleans duplicate holdings from all clusters"
def duplicate_holdings
CleanupDuplicateHoldings.queue_jobs
end
end

class Concordance < JobCommand
Expand Down
1 change: 1 addition & 0 deletions lib/sidekiq_jobs.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require "services"
require "sidekiq"
require "cleanup_duplicate_holdings"
require "concordance_processing"
require "loader/cluster_loader"
require "loader/file_loader"
Expand Down
65 changes: 52 additions & 13 deletions spec/cleanup_duplicate_holdings_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

require "spec_helper"

require_relative "../bin/cleanup_duplicate_holdings"
require "cleanup_duplicate_holdings"

RSpec.describe CleanupDuplicateHoldings do
RSpec.describe CleanupDuplicateHoldings, type: :sidekiq_fake do
def set_blank_fields(holding, value)
[:n_enum=, :n_chron=, :condition=, :issn=].each do |setter|
holding.public_send(setter, value)
Expand All @@ -24,14 +24,39 @@ def nil_fields_dupe_holding(h)
end
end

def cluster_ids
Cluster.all.map(&:id).map(&:to_s)
end

before(:each) { Cluster.each(&:delete) }

describe "run" do
describe "self.queue_jobs" do
it "queues a job of the expected size for each batch of holdings" do
10.times { create(:cluster) }

expect { described_class.queue_jobs(job_cluster_count: 5) }
.to change(described_class.jobs, :size).by(2)

expect(described_class.jobs[0]["args"][0].length).to eq(5)
expect(described_class.jobs[1]["args"][0].length).to eq(5)
end

it "queues jobs for all clusters" do
10.times { create(:cluster) }

described_class.queue_jobs(job_cluster_count: 5)

cluster_ids = described_class.jobs.map { |job| job["args"][0] }.flatten
expect(cluster_ids).to eq(Cluster.all.map(&:id).map(&:to_s))
end
end

describe "perform" do
it "cleans up duplicate holdings" do
holding = blank_fields_holding
create(:cluster, holdings: [holding, nil_fields_dupe_holding(holding)])

described_class.new.run
described_class.new.perform(cluster_ids)

expect(Cluster.first.holdings.count).to eq(1)
end
Expand All @@ -45,7 +70,7 @@ def nil_fields_dupe_holding(h)
another_holding
])

described_class.new.run
described_class.new.perform(cluster_ids)

cluster_holdings = Cluster.first.holdings
expect(cluster_holdings.length).to eq(2)
Expand All @@ -62,7 +87,7 @@ def nil_fields_dupe_holding(h)
nil_fields_dupe_holding(upenn_holding)
])

described_class.new.run
described_class.new.perform(cluster_ids)

expect(Cluster.first.holdings.count).to eq(2)
expect(Cluster.first.holdings.map(&:organization).uniq).to contain_exactly("umich", "upenn")
Expand All @@ -76,7 +101,7 @@ def nil_fields_dupe_holding(h)
nil_fields_dupe_holding(holding)
])

described_class.new.run
described_class.new.perform(cluster_ids)

expect(Cluster.first.holdings.count).to eq(1)
end
Expand All @@ -94,7 +119,7 @@ def nil_fields_dupe_holding(h)
nil_fields_dupe_holding(holding2)
])

described_class.new.run
described_class.new.perform(cluster_ids)

expect(Cluster.count).to eq(2)
Cluster.each do |c|
Expand All @@ -108,24 +133,24 @@ def nil_fields_dupe_holding(h)
holding = blank_fields_holding
create(:cluster, holdings: [holding, nil_fields_dupe_holding(holding)])

described_class.new.run
described_class.new.perform(cluster_ids)

expect(Cluster.first.holdings[0].date_received).to eq(Date.today)
end

it "logs what it's working on at DEBUG level" do
it "logs what it's working on" do
Services.register(:logger) { Logger.new($stdout, level: Logger::DEBUG) }

create(:cluster)

expect { described_class.new.run }.to output(/#{Cluster.first.ocns.first}/).to_stdout
expect { described_class.new.perform(cluster_ids) }.to output(/#{Cluster.first.ocns.first}/).to_stdout
end

it "logs how many clusters it's worked on" do
Services.register(:logger) { Logger.new($stdout, level: Logger::INFO) }
create(:cluster)

expect { described_class.new.run }.to output(/Processed 1 cluster/).to_stdout
expect { described_class.new.perform(cluster_ids) }.to output(/Processed.* 1 cluster/).to_stdout
end

it "logs how many holdings it's worked on" do
Expand All @@ -134,7 +159,21 @@ def nil_fields_dupe_holding(h)
holding = blank_fields_holding
create(:cluster, holdings: [holding, nil_fields_dupe_holding(holding)])

expect { described_class.new.run }.to output(/Processed 2 old holdings.*Kept 1 holding/m).to_stdout
expect { described_class.new.perform(cluster_ids) }.to output(/Processed.* 2 old holdings.* kept 1 holding/).to_stdout
end

it "doesn't save the cluster when there are no duplicate holdings" do
create(:cluster,
holdings: [
build(:holding),
build(:holding)
])

orig_time = Cluster.first.last_modified

described_class.new.perform(cluster_ids)

expect(Cluster.first.last_modified).to eq(orig_time)
end
end
end

0 comments on commit ede5661

Please sign in to comment.