Skip to content

Commit

Permalink
Merge pull request #2123 from alphagov/parallelise-rummager-dump
Browse files Browse the repository at this point in the history
Add a parallel option to the rummager export.
  • Loading branch information
elliotcm committed Apr 30, 2015
2 parents a6145fc + 03be994 commit bef2d9a
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 30 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ gem 'rails_translation_manager', '0.0.1'
gem 'rails-observers'
gem 'sprockets', '3.0.0.beta.8'
gem 'rinku', require: 'rails_rinku'
gem 'parallel', '1.4.1'

if ENV['GLOBALIZE_DEV']
gem 'globalize', path: '../globalize'
Expand Down
3 changes: 2 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ GEM
multi_json (~> 1.3)
oauth2 (~> 1.0)
omniauth (~> 1.2)
parallel (1.3.3)
parallel (1.4.1)
parallel_tests (1.0.7)
parallel
parser (2.2.0.1)
Expand Down Expand Up @@ -441,6 +441,7 @@ DEPENDENCIES
mysql2
newrelic_rpm
nokogiri
parallel (= 1.4.1)
parallel_tests
pdf-reader (= 1.3.3)
plek (= 1.10.0)
Expand Down
80 changes: 51 additions & 29 deletions script/rummager_export.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
# By default, exports the data for the "government" search index. If the
# --detailed flag is supplied on the command line, exports the data for the
# "detailed" search index.

#
# Providing an EXPORT_DIRECTORY environment variable will
# output multiple files to the directory and perform the dump
# in a parallel manner, one sub-process per CPU/core.
$LOAD_PATH << File.expand_path("../", File.dirname(__FILE__))

require 'pathname'
require 'logger'
logger = Logger.new(STDERR)
logger.info "Booting rails..."
Expand All @@ -18,26 +22,47 @@
Whitehall.searchable_classes_for_government_index
end

logger.info "Counting docs to index..."
counts_by_class = classes_to_index.each_with_object({}) do |klass, hash|
count = klass.searchable_instances.count
logger.info("%20s: %d" % [klass.name, count])
hash[klass] = count
id_groups = []
classes_to_index.each do |klass|
id_groups += klass.searchable_instances.pluck(:id).each_slice(1_000).map do |id_group|
[klass, id_group]
end
end

total_count = counts_by_class.values.inject(&:+)
def export_classes(classes_to_index, id_groups, &block)
if export_directory = ENV["EXPORT_DIRECTORY"]
export_directory = Pathname.new(export_directory).expand_path

start = Time.zone.now
done = 0
classes_to_index.each do |klass|
batch_start = Time.zone.now
rate = [done / (batch_start - start), 0.1].max
count = counts_by_class[klass]
total_remaining = total_count - done
total_time_remaining = (total_count - done) / rate
time_remaining_this_batch = count / rate
eta = batch_start + total_time_remaining
logger.info "Exporting #{klass.name} (this batch of #{count} will take #{time_remaining_this_batch}s. #{total_remaining} to go will eta #{eta})"
if export_directory.exist? && export_directory.children.any?
puts "#{ENV["EXPORT_DIRECTORY"]} exists and is not empty, aborting"
exit
else
puts "Starting export of #{id_groups.count} files to #{ENV["EXPORT_DIRECTORY"]}"
end

export_directory.mkpath

Parallel.each_with_index(id_groups) do |(klass, id_group), index|
file_path = export_directory+"#{klass.name.downcase}-#{index}.esdump"
logger.info "Exporting #{klass.name.downcase}-#{index}.esdump"
File.open(file_path.to_s, "w") do |output|
block.call(klass, output, id_group)
end
end
else
classes_to_index.each do |klass|
block.call(klass, STDOUT)
end
end
end

def output_es_line(obj, output)
s = obj.search_index
output.puts %Q[{"index": {"_type": "edition", "_id": "#{s['link']}"}}]
output.puts s.to_json
end

export_classes(classes_to_index, id_groups) do |klass, output, id_group|
association = klass.searchable_instances

eager_loads = [:document, :organisations, :attachments, :world_locations]
Expand All @@ -46,17 +71,14 @@
association = association.includes(sym)
end
end
i = 0
association.find_each do |obj|
s = obj.search_index
puts %Q[{"index": {"_type": "edition", "_id": "#{s['link']}"}}]
puts s.to_json
if i > 0 and i % 1000 == 0
logger.info " .. #{i}"

if id_group
association.find(id_group).each do |obj|
output_es_line(obj, output)
end
else
association.find_each do |obj|
output_es_line(obj, output)
end
done += 1
i += 1
end
batch_took = Time.zone.now - batch_start
logger.info("Export of %s complete in %.1fs rate %.2f/s (estimated %.1fs)" % [klass.name, batch_took, count / batch_took, time_remaining_this_batch])
end

0 comments on commit bef2d9a

Please sign in to comment.