Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ gem 'rails', '~> 5.2'
gem 'activerecord-import', '~> 0.24'
gem 'with_advisory_lock', '~> 3.2'

gem 'aws-sdk-s3', '~> 1'
gem 'faraday', '0.12.2'
gem 'foreman', '~> 0.85'
gem 'gds-api-adapters', '~> 52.6'
Expand Down
19 changes: 18 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ GEM
public_suffix (>= 2.0.2, < 4.0)
arel (9.0.0)
ast (2.4.0)
aws-eventstream (1.0.1)
aws-partitions (1.94.0)
aws-sdk-core (3.22.0)
aws-eventstream (~> 1.0)
aws-partitions (~> 1.0)
aws-sigv4 (~> 1.0)
jmespath (~> 1.0)
aws-sdk-kms (1.6.0)
aws-sdk-core (~> 3)
aws-sigv4 (~> 1.0)
aws-sdk-s3 (1.15.0)
aws-sdk-core (~> 3, >= 3.21.2)
aws-sdk-kms (~> 1)
aws-sigv4 (~> 1.0)
aws-sigv4 (1.0.2)
builder (3.2.3)
byebug (10.0.2)
climate_control (0.2.0)
Expand Down Expand Up @@ -119,6 +134,7 @@ GEM
domain_name (~> 0.5)
i18n (1.0.1)
concurrent-ruby (~> 1.0)
jmespath (1.4.0)
jwt (1.5.6)
kgio (2.11.2)
link_header (0.0.8)
Expand Down Expand Up @@ -335,6 +351,7 @@ PLATFORMS

DEPENDENCIES
activerecord-import (~> 0.24)
aws-sdk-s3 (~> 1)
climate_control
equivalent-xml
factory_bot_rails
Expand Down Expand Up @@ -363,4 +380,4 @@ DEPENDENCIES
with_advisory_lock (~> 3.2)

BUNDLED WITH
1.16.1
1.16.2
60 changes: 60 additions & 0 deletions app/presenters/email_archive_presenter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
class EmailArchivePresenter
S3_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%L".freeze

# This is expected to be called with a JSON representation of a record
# returned from EmailArchiveQuery
def self.for_s3(*args)
new.for_s3(*args)
end

def for_s3(record, archived_at)
{
archived_at_utc: archived_at.utc.strftime(S3_DATETIME_FORMAT),
content_change: build_content_change(record),
created_at_utc: record.fetch("created_at").utc.strftime(S3_DATETIME_FORMAT),
finished_sending_at_utc: record.fetch("finished_sending_at").utc.strftime(S3_DATETIME_FORMAT),
id: record.fetch("id"),
sent: record.fetch("sent"),
subject: record.fetch("subject"),
subscriber_id: record.fetch("subscriber_id"),
}
end

def self.for_db(*args)
new.for_db(*args)
end

def for_db(record, archived_at, exported_at = nil)
{
archived_at: archived_at,
content_change: build_content_change(record),
created_at: record.fetch("created_at"),
exported_at: exported_at,
finished_sending_at: record.fetch("finished_sending_at"),
id: record.fetch("id"),
sent: record.fetch("sent"),
subject: record.fetch("subject"),
subscriber_id: record.fetch("subscriber_id"),
}
end

private_class_method :new

private

def build_content_change(record)
return if record.fetch("content_change_ids").empty?

if record.fetch("digest_run_ids").count > 1
error = "Email with id: #{record['id']} is associated with "\
"multiple digest runs: #{record['digest_run_ids'].join(', ')}"
GovukError.notify(error)
end

{
content_change_ids: record.fetch("content_change_ids"),
digest_run_id: record.fetch("digest_run_ids").first,
subscription_ids: record.fetch("subscription_ids"),
}
end
end
53 changes: 53 additions & 0 deletions app/services/s3_email_archive_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
class S3EmailArchiveService
def self.call(*args)
new.call(*args)
end

# For batch we expect an array of hashes containing email data in the format
# from EmailArchivePresenter
def call(batch)
group_by_date(batch).map { |prefix, records| send_to_s3(prefix, records) }
end

private_class_method :new

private

def group_by_date(batch)
batch.group_by do |item|
# we group by date in this way to create partitions for s3/athena
# these are grouped in case dates span more than one day
Date.parse(
item.fetch(:finished_sending_at_utc)
).strftime("year=%Y/month=%m/date=%d")
end
end

def send_to_s3(prefix, records)
records = records.sort_by { |r| r.fetch(:finished_sending_at_utc) }
last_time = records.last[:finished_sending_at_utc]
obj = bucket.object(object_name(prefix, last_time))
obj.put(
body: object_body(records),
content_encoding: "gzip"
)
end

def bucket
@bucket ||= begin
s3 = Aws::S3::Resource.new
s3.bucket(ENV.fetch("EMAIL_ARCHIVE_S3_BUCKET"))
end
end

def object_name(prefix, last_time)
uuid = SecureRandom.uuid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this fails half way through, can we end up in a state where half the archives are uploaded, and then the transaction rolls back and when it runs again we get everything uploaded so end up with double the archives on half of the dates?

I haven't thought about it too much, but can we build this uuid from some sort of hash of the batch to avoid this situation?

I realise that in most cases, only one days worth of emails will be archived so this won't be a problem though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a problem we have with every upload to S3 - if we think it's failed but it actually succeeded we may end up duplicating the data. The only way I can think around this is having a very predictable batching so that we can generate a reliable hash but this seems difficult to do relative to the risk involved.

time = ActiveSupport::TimeZone["UTC"].parse(last_time)
"email-archive/#{prefix}/#{time.to_s(:iso8601)}-#{uuid}.json.gz"
end

def object_body(records)
data = records.map(&:to_json).join("\n") + "\n"
ActiveSupport::Gzip.compress(data, Zlib::BEST_COMPRESSION)
end
end
55 changes: 21 additions & 34 deletions app/workers/email_archive_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def perform

EmailArchiveQuery.call.in_batches do |batch|
Email.transaction do
archived_count += archive_batch(batch)
archived_count += archive_batch(batch.as_json)
end
end

Expand All @@ -24,47 +24,34 @@ def perform

def archive_batch(batch)
archived_at = Time.zone.now
exported_at = nil

import_batch(batch, archived_at)
Email.where(id: batch.pluck(:id)).update_all(archived_at: archived_at)
end
if ENV.include?("EMAIL_ARCHIVE_S3_ENABLED")
send_to_s3(batch, archived_at)
exported_at = archived_at
end

def import_batch(batch, archived_at)
to_import = batch.as_json.map { |e| build_email_archive(e, archived_at) }
columns = to_import.first.keys.map(&:to_s)
values = to_import.map(&:values)
EmailArchive.import(columns, values, validate: false)
import_batch(batch, archived_at, exported_at)
mark_emails_as_archived(batch.pluck("id"), archived_at)
end

def build_email_archive(email_data, archived_at)
content_change = build_content_change(email_data)
def import_batch(batch, archived_at, exported_at)
archive = batch.map do |b|
EmailArchivePresenter.for_db(b, archived_at, exported_at)
end

{
archived_at: archived_at,
content_change: content_change,
created_at: email_data.fetch("created_at"),
finished_sending_at: email_data.fetch("finished_sending_at"),
id: email_data.fetch("id"),
sent: email_data.fetch("sent"),
subject: email_data.fetch("subject"),
subscriber_id: email_data.fetch("subscriber_id"),
}
columns = archive.first.keys.map(&:to_s)
values = archive.map(&:values)
EmailArchive.import(columns, values, validate: false)
end

def build_content_change(email_data)
return if email_data.fetch("content_change_ids").empty?

if email_data.fetch("digest_run_ids").count > 1
error = "Email with id: #{email_data['id']} is associated with "\
"multiple digest runs: #{email_data['digest_run_ids'].join(', ')}"
GovukError.notify(error)
end
def send_to_s3(batch, archived_at)
archive = batch.map { |b| EmailArchivePresenter.for_s3(b, archived_at) }
S3EmailArchiveService.call(archive)
end

{
content_change_ids: email_data.fetch("content_change_ids"),
digest_run_id: email_data.fetch("digest_run_ids").first,
subscription_ids: email_data.fetch("subscription_ids"),
}
def mark_emails_as_archived(ids, archived_at)
Email.where(id: ids).update_all(archived_at: archived_at)
end

def log_complete(archived, start_time, end_time)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class AddExportedAtToEmailArchives < ActiveRecord::Migration[5.2]
def change
add_column :email_archives, :exported_at, :datetime
end
end
7 changes: 7 additions & 0 deletions db/migrate/20180628135936_add_indexes_to_email_archive.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class AddIndexesToEmailArchive < ActiveRecord::Migration[5.2]
disable_ddl_transaction!
def change
add_index :email_archives, :finished_sending_at, algorithm: :concurrently
add_index :email_archives, :exported_at, algorithm: :concurrently
end
end
7 changes: 5 additions & 2 deletions db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2018_06_28_072318) do
ActiveRecord::Schema.define(version: 2018_06_28_135936) do

# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"
Expand Down Expand Up @@ -85,6 +85,9 @@
t.datetime "created_at", null: false
t.datetime "archived_at", null: false
t.datetime "finished_sending_at", null: false
t.datetime "exported_at"
t.index ["exported_at"], name: "index_email_archives_on_exported_at"
t.index ["finished_sending_at"], name: "index_email_archives_on_finished_sending_at"
end

create_table "emails", id: :uuid, default: -> { "uuid_generate_v4()" }, force: :cascade do |t|
Expand Down Expand Up @@ -164,8 +167,8 @@
t.bigint "subscriber_list_id", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.string "signon_user_uid"
t.integer "frequency", default: 0, null: false
t.string "signon_user_uid"
t.integer "source", default: 0, null: false
t.datetime "ended_at"
t.integer "ended_reason"
Expand Down
91 changes: 91 additions & 0 deletions lib/email_archive_exporter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
class EmailArchiveExporter
def self.call(*args)
new.call(*args)
end

def call(from_date, until_date)
from_date = Date.parse(from_date)
until_date = Date.parse(until_date)

puts "Exporting records that finished sending from #{from_date} and before #{until_date}"

total = (from_date...until_date).inject(0) { |sum, date| sum + export(date) }

puts "Exported #{total} records"
end

private_class_method :new

private

def export(date)
puts "Exporting #{date}"
start = Time.now

count = 0

loop do
records = email_archive_records(date)

break unless records.any?

ExportToS3.call(records)

count += records.count
puts "Processed #{count} emails"
end

seconds = Time.now.to_i - start.to_i
puts "Completed #{date} in #{seconds} seconds"

count
end

def email_archive_records(date)
EmailArchive
.where(
"finished_sending_at >= ? AND finished_sending_at < ?",
date,
date + 1.day
)
.where(exported_at: nil)
.order(finished_sending_at: :asc, id: :asc)
.limit(50_000)
.as_json
end

class ExportToS3
def self.call(*args)
new.call(*args)
end

def call(records)
send_to_s3(records)
mark_as_exported(records)
end

private

def send_to_s3(records)
batch = records.map do |r|
{
archived_at_utc: r["archived_at"].utc.strftime(EmailArchivePresenter::S3_DATETIME_FORMAT),
content_change: r["content_change"],
created_at_utc: r["created_at"].utc.strftime(EmailArchivePresenter::S3_DATETIME_FORMAT),
finished_sending_at_utc: r["finished_sending_at"].utc.strftime(EmailArchivePresenter::S3_DATETIME_FORMAT),
id: r["id"],
sent: r["sent"],
subject: r["subject"],
subscriber_id: r["subscriber_id"]
}
end

S3EmailArchiveService.call(batch)
end

def mark_as_exported(records)
ids = records.map { |r| r["id"] }
EmailArchive.where(id: ids).update_all(exported_at: Time.now)
end
end
end
5 changes: 5 additions & 0 deletions lib/tasks/export.rake
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ namespace :export do
task csv_from_living_in_europe: :environment do
DataExporter.new.export_csv_from_living_in_europe
end

desc "Export data from Email Archives to S3 - accepts arguments for the date range of emails to export"
task :email_archives_to_s3, %i[from_date until_date] => :environment do |_, args|
EmailArchiveExporter.call(args.from_date, args.until_date)
end
end
Loading