Skip to content
Permalink
Browse files

FIX: remove the tmp inventory files after the s3 uploads check.

  • Loading branch information...
vinothkannans committed Aug 13, 2019
1 parent 1358339 commit 9919ee1900db7ad582cb3e5a4714fec78891f2c3
Showing with 62 additions and 56 deletions.
  1. +57 −47 lib/s3_inventory.rb
  2. +5 −9 spec/components/s3_inventory_spec.rb
@@ -31,50 +31,58 @@ def backfill_etags_and_list_missing
end

DistributedMutex.synchronize("s3_inventory_list_missing_#{type}") do
download_inventory_files_to_tmp_directory
decompress_inventory_files
begin
files.each do |file|
next if File.exists?(file[:filename][0...-3])

multisite_prefix = "uploads/#{RailsMultisite::ConnectionManagement.current_db}/"
ActiveRecord::Base.transaction do
begin
connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))")
connection.copy_data("COPY #{table_name} FROM STDIN CSV") do
files.each do |file|
CSV.foreach(file[:filename][0...-3], headers: false) do |row|
key = row[CSV_KEY_INDEX]
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
url = File.join(Discourse.store.absolute_base_url, key)
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
download_inventory_file_to_tmp_directory(file)
decompress_inventory_file(file)
end

multisite_prefix = "uploads/#{RailsMultisite::ConnectionManagement.current_db}/"
ActiveRecord::Base.transaction do
begin
connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))")
connection.copy_data("COPY #{table_name} FROM STDIN CSV") do
files.each do |file|
CSV.foreach(file[:filename][0...-3], headers: false) do |row|
key = row[CSV_KEY_INDEX]
next if Rails.configuration.multisite && key.exclude?(multisite_prefix)
url = File.join(Discourse.store.absolute_base_url, key)
connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n")
end
end
end
end

# backfilling etags
connection.async_exec("UPDATE #{model.table_name}
SET etag = #{table_name}.etag
FROM #{table_name}
WHERE #{model.table_name}.etag IS NULL
AND #{model.table_name}.url = #{table_name}.url")
# backfilling etags
connection.async_exec("UPDATE #{model.table_name}
SET etag = #{table_name}.etag
FROM #{table_name}
WHERE #{model.table_name}.etag IS NULL
AND #{model.table_name}.url = #{table_name}.url")

list_missing_post_uploads if type == "original"
list_missing_post_uploads if type == "original"

uploads = (model == Upload) ? model.by_users.where("created_at < ?", inventory_date) : model
missing_uploads = uploads
.joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag")
.where("#{table_name}.etag IS NULL AND #{model.table_name}.etag IS NOT NULL")
uploads = (model == Upload) ? model.by_users.where("created_at < ?", inventory_date) : model
missing_uploads = uploads
.joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag")
.where("#{table_name}.etag IS NULL AND #{model.table_name}.etag IS NOT NULL")

if (missing_count = missing_uploads.count) > 0
missing_uploads.select(:id, :url).find_each do |upload|
log upload.url
if (missing_count = missing_uploads.count) > 0
missing_uploads.select(:id, :url).find_each do |upload|
log upload.url
end

log "#{missing_count} of #{uploads.count} #{model.name.underscore.pluralize} are missing"
end

log "#{missing_count} of #{uploads.count} #{model.name.underscore.pluralize} are missing"
Discourse.stats.set("missing_s3_#{model.table_name}", missing_count)
ensure
connection.exec("DROP TABLE #{table_name}") unless connection.nil?
end

Discourse.stats.set("missing_s3_#{model.table_name}", missing_count)
ensure
connection.exec("DROP TABLE #{table_name}") unless connection.nil?
end
ensure
cleanup!
end
end
end
@@ -118,22 +126,18 @@ def list_missing_post_uploads
log "#{missing[:count]} post uploads are missing."
end

def download_inventory_files_to_tmp_directory
files.each do |file|
next if File.exists?(file[:filename])
def download_inventory_file_to_tmp_directory(file)
return if File.exists?(file[:filename])

log "Downloading inventory file '#{file[:key]}' to tmp directory..."
failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory."
log "Downloading inventory file '#{file[:key]}' to tmp directory..."
failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory."

@s3_helper.download_file(file[:key], file[:filename], failure_message)
end
@s3_helper.download_file(file[:key], file[:filename], failure_message)
end

def decompress_inventory_files
files.each do |file|
log "Decompressing inventory file '#{file[:filename]}', this may take a while..."
Discourse::Utils.execute_command('gzip', '--decompress', file[:filename], failure_message: "Failed to decompress inventory file '#{file[:filename]}'.", chdir: tmp_directory)
end
def decompress_inventory_file(file)
log "Decompressing inventory file '#{file[:filename]}', this may take a while..."
Discourse::Utils.execute_command('gzip', '--decompress', file[:filename], failure_message: "Failed to decompress inventory file '#{file[:filename]}'.", chdir: tmp_directory)
end

def update_bucket_policy
@@ -173,6 +177,13 @@ def update_bucket_inventory_configuration

private

def cleanup!
files.each do |file|
File.delete(file[:filename]) if File.exists?(file[:filename])
File.delete(file[:filename][0...-3]) if File.exists?(file[:filename][0...-3])
end
end

def connection
@connection ||= ActiveRecord::Base.connection.raw_connection
end
@@ -202,8 +213,7 @@ def files
def tmp_directory
@tmp_directory ||= begin
current_db = RailsMultisite::ConnectionManagement.current_db
timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S")
directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp)
directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db)
FileUtils.mkdir_p(directory)
directory
end
@@ -48,6 +48,8 @@
next_marker: "eyJNYXJrZXIiOiBudWxsLCAiYm90b190cnVuY2F0ZV9hbW91bnQiOiAyfQ=="
}
})

inventory.stubs(:cleanup!)
end

it "should raise error if an inventory file is not found" do
@@ -67,9 +69,7 @@
Fabricate(:upload, etag: "ETag2", created_at: Time.now)
Fabricate(:upload, created_at: 2.days.ago)

inventory.expects(:download_inventory_files_to_tmp_directory)
inventory.expects(:decompress_inventory_files)
inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(2)
inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(3)
inventory.expects(:inventory_date).returns(Time.now)

output = capture_stdout do
@@ -87,9 +87,7 @@
]
files.each { |file| Fabricate(:upload, url: file[0]) }

inventory.expects(:download_inventory_files_to_tmp_directory)
inventory.expects(:decompress_inventory_files)
inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(2)
inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(3)

output = capture_stdout do
expect { inventory.backfill_etags_and_list_missing }.to change { Upload.where(etag: nil).count }.by(-2)
@@ -111,9 +109,7 @@
post.link_post_uploads
upload.delete

inventory.expects(:download_inventory_files_to_tmp_directory)
inventory.expects(:decompress_inventory_files)
inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(2)
inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(3)

output = capture_stdout do
inventory.backfill_etags_and_list_missing

1 comment on commit 9919ee1

@discoursereviewbot

This comment has been minimized.

Copy link

commented on 9919ee1 Aug 13, 2019

SamSaffron posted:

I recommend leaning less on stubbing and mocking here for the testing, the only real stuff you need to mock in an integration test is going to be the HTTP request to get the inventory. Then the integration test can write it to the file system and nuke it.

Please sign in to comment.
You can’t perform that action at this time.