Skip to content

Commit

Permalink
Merge pull request #1593 from alphagov/cluster-page-traffic
Browse files Browse the repository at this point in the history
Load page traffic into all active clusters
  • Loading branch information
bilbof committed Jun 21, 2019
2 parents cc088d0 + 3eacc88 commit 607fcd1
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 31 deletions.
40 changes: 22 additions & 18 deletions lib/govuk_index/page_traffic_loader.rb
Expand Up @@ -7,28 +7,30 @@ def initialize(iostream_batch_size: 250)
end

def load_from(iostream)
new_index = index_group.create_index
@logger.info "Created index #{new_index.real_name}"
Clusters.active.each do |cluster|
new_index = index_group(cluster).create_index
@logger.info "Created index #{new_index.real_name}"

old_index = index_group.current_real
@logger.info "Old index #{old_index.real_name}"
old_index = index_group(cluster).current_real
@logger.info "Old index #{old_index.real_name}"

old_index.with_lock do
@logger.info "Indexing to #{new_index.real_name}"
old_index.with_lock do
@logger.info "Indexing to #{new_index.real_name}"

in_even_sized_batches(iostream) do |lines|
GovukIndex::PageTrafficWorker.perform_async(lines, new_index.real_name)
in_even_sized_batches(iostream) do |lines|
GovukIndex::PageTrafficWorker.perform_async(lines, new_index.real_name, cluster.key)
end

GovukIndex::PageTrafficWorker.wait_until_processed
new_index.commit
end

GovukIndex::PageTrafficWorker.wait_until_processed
new_index.commit
# We need to switch the aliases without a lock, since
# read_only_allow_delete prevents aliases being changed
# The page traffic loader is is a daily process, so there
# won't be a race condition
index_group(cluster).switch_to(new_index)
end

# We need to switch the aliases without a lock, since
# read_only_allow_delete prevents aliases being changed
# The page traffic loader is is a daily process, so there
# won't be a race condition
index_group.switch_to(new_index)
end

private
Expand All @@ -40,10 +42,12 @@ def in_even_sized_batches(iostream, batch_size = @iostream_batch_size, &_block)
iostream.each_line.each_slice(batch_size * 2) do |batch|
yield(batch.map { |b| JSON.parse(b) })
end
iostream.pos = 0 # ensures we start at the beginning on next read.
end

def index_group
@index_group ||= SearchConfig.instance.search_server.index_group(
def index_group(cluster)
@index_group ||= {}
@index_group[cluster.key] ||= SearchConfig.instance.search_server(cluster: cluster).index_group(
SearchConfig.instance.page_traffic_index_name
)
end
Expand Down
10 changes: 5 additions & 5 deletions lib/govuk_index/page_traffic_worker.rb
Expand Up @@ -4,15 +4,15 @@ class PageTrafficWorker < Indexer::BaseWorker
QUEUE_NAME = 'bulk'.freeze
sidekiq_options queue: QUEUE_NAME

def self.perform_async(records, destination_index)
def self.perform_async(records, destination_index, cluster_key)
data = Base64.encode64(Zlib::Deflate.deflate(Sidekiq.dump_json(records)))
super(data, destination_index)
super(data, destination_index, cluster_key)
end

def perform(data, destination_index)
def perform(data, destination_index, cluster_key)
records = Sidekiq.load_json(Zlib::Inflate.inflate(Base64.decode64(data)))

actions = Index::ElasticsearchProcessor.new(client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name: destination_index))
cluster = Clusters.get_cluster(cluster_key)
actions = Index::ElasticsearchProcessor.new(client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name: destination_index, clusters: [cluster]))

records.each_slice(2) do |identifier, document|
identifier['index'] = identifier['index'].merge('_type' => 'generic-document')
Expand Down
5 changes: 4 additions & 1 deletion lib/index/client.rb
Expand Up @@ -12,6 +12,7 @@ def instance

def initialize(options = {})
@_index = options.delete(:index_name)
@clusters = options.delete(:clusters) || Clusters.active
@_options = options
end

Expand All @@ -22,7 +23,7 @@ def get(params)
end

def bulk(params)
Clusters.active.map do |cluster|
clusters.map do |cluster|
client(cluster: cluster).bulk(
params.merge(index: index_name)
)
Expand All @@ -31,6 +32,8 @@ def bulk(params)

private

attr_reader :clusters

def client(cluster: Clusters.default_cluster)
@_client ||= {}
@_client[cluster.key] ||= Services.elasticsearch(
Expand Down
8 changes: 5 additions & 3 deletions spec/integration/govuk_index/page_traffic_loader_spec.rb
Expand Up @@ -11,9 +11,11 @@
GovukIndex::PageTrafficLoader.new
.load_from(StringIO.new(data))

document = fetch_document_from_rummager(id: id, index: 'page-traffic_test')
Clusters.active.each do |cluster|
document = fetch_document_from_rummager(id: id, index: 'page-traffic_test', cluster: cluster)

expect(document["_source"]["document_type"]).to eq('page_traffic')
expect(document['_source']['rank_14']).to eq(100)
expect(document["_source"]["document_type"]).to eq('page_traffic')
expect(document['_source']['rank_14']).to eq(100)
end
end
end
11 changes: 7 additions & 4 deletions spec/unit/govuk_index/page_traffic_loader_spec.rb
Expand Up @@ -20,10 +20,13 @@
line2 = [{ "val" => "c" }, { "data" => 1 }, { "val" => "d" }, { "data" => 1 }]
line3 = [{ "val" => "e" }, { "data" => 1 }]

expect(GovukIndex::PageTrafficWorker).to receive(:perform_async).with(line1, 'new_index_name')
expect(GovukIndex::PageTrafficWorker).to receive(:perform_async).with(line2, 'new_index_name')
expect(GovukIndex::PageTrafficWorker).to receive(:perform_async).with(line3, 'new_index_name')

Clusters.active.each do |cluster|
# rubocop:disable RSpec/MessageSpies
expect(GovukIndex::PageTrafficWorker).to receive(:perform_async).with(line1, 'new_index_name', cluster.key)
expect(GovukIndex::PageTrafficWorker).to receive(:perform_async).with(line2, 'new_index_name', cluster.key)
expect(GovukIndex::PageTrafficWorker).to receive(:perform_async).with(line3, 'new_index_name', cluster.key)
# rubocop:enable RSpec/MessageSpies
end
loader = GovukIndex::PageTrafficLoader.new(iostream_batch_size: 2)

loader.load_from(input)
Expand Down

0 comments on commit 607fcd1

Please sign in to comment.