From 3eacc88ee0c6979cad91bb2f7a72214b7f91a6c3 Mon Sep 17 00:00:00 2001 From: William Franklin Date: Wed, 19 Jun 2019 13:48:25 +0000 Subject: [PATCH] Load page traffic into all active clusters This loads page traffic data into all indexes, not just the primary one. There's possibly better ways to do this (e.g. copying from one index to another?). But this is consistent with the way data enters into all clusters. One thing that might be nice is to create a worker for each cluster. --- lib/govuk_index/page_traffic_loader.rb | 40 ++++++++++--------- lib/govuk_index/page_traffic_worker.rb | 10 ++--- lib/index/client.rb | 5 ++- .../govuk_index/page_traffic_loader_spec.rb | 8 ++-- .../govuk_index/page_traffic_loader_spec.rb | 11 +++-- 5 files changed, 43 insertions(+), 31 deletions(-) diff --git a/lib/govuk_index/page_traffic_loader.rb b/lib/govuk_index/page_traffic_loader.rb index b38682a13..b295f0808 100644 --- a/lib/govuk_index/page_traffic_loader.rb +++ b/lib/govuk_index/page_traffic_loader.rb @@ -7,28 +7,30 @@ def initialize(iostream_batch_size: 250) end def load_from(iostream) - new_index = index_group.create_index - @logger.info "Created index #{new_index.real_name}" + Clusters.active.each do |cluster| + new_index = index_group(cluster).create_index + @logger.info "Created index #{new_index.real_name}" - old_index = index_group.current_real - @logger.info "Old index #{old_index.real_name}" + old_index = index_group(cluster).current_real + @logger.info "Old index #{old_index.real_name}" - old_index.with_lock do - @logger.info "Indexing to #{new_index.real_name}" + old_index.with_lock do + @logger.info "Indexing to #{new_index.real_name}" - in_even_sized_batches(iostream) do |lines| - GovukIndex::PageTrafficWorker.perform_async(lines, new_index.real_name) + in_even_sized_batches(iostream) do |lines| + GovukIndex::PageTrafficWorker.perform_async(lines, new_index.real_name, cluster.key) + end + + GovukIndex::PageTrafficWorker.wait_until_processed + new_index.commit end - GovukIndex::PageTrafficWorker.wait_until_processed - new_index.commit + # We need to switch the aliases without a lock, since + # read_only_allow_delete prevents aliases being changed + # The page traffic loader is is a daily process, so there + # won't be a race condition + index_group(cluster).switch_to(new_index) end - - # We need to switch the aliases without a lock, since - # read_only_allow_delete prevents aliases being changed - # The page traffic loader is is a daily process, so there - # won't be a race condition - index_group.switch_to(new_index) end private @@ -40,10 +42,12 @@ def in_even_sized_batches(iostream, batch_size = @iostream_batch_size, &_block) iostream.each_line.each_slice(batch_size * 2) do |batch| yield(batch.map { |b| JSON.parse(b) }) end + iostream.pos = 0 # ensures we start at the beginning on next read. end - def index_group - @index_group ||= SearchConfig.instance.search_server.index_group( + def index_group(cluster) + @index_group ||= {} + @index_group[cluster.key] ||= SearchConfig.instance.search_server(cluster: cluster).index_group( SearchConfig.instance.page_traffic_index_name ) end diff --git a/lib/govuk_index/page_traffic_worker.rb b/lib/govuk_index/page_traffic_worker.rb index a7bfac626..0b729f6c4 100644 --- a/lib/govuk_index/page_traffic_worker.rb +++ b/lib/govuk_index/page_traffic_worker.rb @@ -4,15 +4,15 @@ class PageTrafficWorker < Indexer::BaseWorker QUEUE_NAME = 'bulk'.freeze sidekiq_options queue: QUEUE_NAME - def self.perform_async(records, destination_index) + def self.perform_async(records, destination_index, cluster_key) data = Base64.encode64(Zlib::Deflate.deflate(Sidekiq.dump_json(records))) - super(data, destination_index) + super(data, destination_index, cluster_key) end - def perform(data, destination_index) + def perform(data, destination_index, cluster_key) records = Sidekiq.load_json(Zlib::Inflate.inflate(Base64.decode64(data))) - - actions = Index::ElasticsearchProcessor.new(client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name: destination_index)) + cluster = Clusters.get_cluster(cluster_key) + actions = Index::ElasticsearchProcessor.new(client: GovukIndex::Client.new(timeout: BULK_INDEX_TIMEOUT, index_name: destination_index, clusters: [cluster])) records.each_slice(2) do |identifier, document| identifier['index'] = identifier['index'].merge('_type' => 'generic-document') diff --git a/lib/index/client.rb b/lib/index/client.rb index 36bd8f2d2..918204b64 100644 --- a/lib/index/client.rb +++ b/lib/index/client.rb @@ -12,6 +12,7 @@ def instance def initialize(options = {}) @_index = options.delete(:index_name) + @clusters = options.delete(:clusters) || Clusters.active @_options = options end @@ -22,7 +23,7 @@ def get(params) end def bulk(params) - Clusters.active.map do |cluster| + clusters.map do |cluster| client(cluster: cluster).bulk( params.merge(index: index_name) ) @@ -31,6 +32,8 @@ def bulk(params) private + attr_reader :clusters + def client(cluster: Clusters.default_cluster) @_client ||= {} @_client[cluster.key] ||= Services.elasticsearch( diff --git a/spec/integration/govuk_index/page_traffic_loader_spec.rb b/spec/integration/govuk_index/page_traffic_loader_spec.rb index aa9dcc8ca..4472dc104 100644 --- a/spec/integration/govuk_index/page_traffic_loader_spec.rb +++ b/spec/integration/govuk_index/page_traffic_loader_spec.rb @@ -11,9 +11,11 @@ GovukIndex::PageTrafficLoader.new .load_from(StringIO.new(data)) - document = fetch_document_from_rummager(id: id, index: 'page-traffic_test') + Clusters.active.each do |cluster| + document = fetch_document_from_rummager(id: id, index: 'page-traffic_test', cluster: cluster) - expect(document["_source"]["document_type"]).to eq('page_traffic') - expect(document['_source']['rank_14']).to eq(100) + expect(document["_source"]["document_type"]).to eq('page_traffic') + expect(document['_source']['rank_14']).to eq(100) + end end end diff --git a/spec/unit/govuk_index/page_traffic_loader_spec.rb b/spec/unit/govuk_index/page_traffic_loader_spec.rb index b5d3ee36d..f8d833b71 100644 --- a/spec/unit/govuk_index/page_traffic_loader_spec.rb +++ b/spec/unit/govuk_index/page_traffic_loader_spec.rb @@ -20,10 +20,13 @@ line2 = [{ "val" => "c" }, { "data" => 1 }, { "val" => "d" }, { "data" => 1 }] line3 = [{ "val" => "e" }, { "data" => 1 }] - expect(GovukIndex::PageTrafficWorker).to receive(:perform_async).with(line1, 'new_index_name') - expect(GovukIndex::PageTrafficWorker).to receive(:perform_async).with(line2, 'new_index_name') - expect(GovukIndex::PageTrafficWorker).to receive(:perform_async).with(line3, 'new_index_name') - + Clusters.active.each do |cluster| + # rubocop:disable RSpec/MessageSpies + expect(GovukIndex::PageTrafficWorker).to receive(:perform_async).with(line1, 'new_index_name', cluster.key) + expect(GovukIndex::PageTrafficWorker).to receive(:perform_async).with(line2, 'new_index_name', cluster.key) + expect(GovukIndex::PageTrafficWorker).to receive(:perform_async).with(line3, 'new_index_name', cluster.key) + # rubocop:enable RSpec/MessageSpies + end loader = GovukIndex::PageTrafficLoader.new(iostream_batch_size: 2) loader.load_from(input)