Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEV: First pass at process_uploads script #26662

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions migrations/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@

tmp/*
Gemfile.lock

/config/process_uploads.yml
7 changes: 7 additions & 0 deletions migrations/bin/process_uploads
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require_relative "../lib/uploads/cli"

# ./migrations/bin/process_uploads [--settings=migrations/config/process_uploads.yml]
Migrations::Uploads::CLI.start(ARGV)
47 changes: 47 additions & 0 deletions migrations/config/process_uploads.yml.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
source_db_path: "/path/to/your/db.sqlite3"
output_db_path: "/path/to/your/uploads.sqlite3"

root_paths:
- "/path/to/your/files"
- "/path/to/more/files"

# The number of threads to use for processing uploads is calculated as:
# thread_count = [number of cores] * [thread_count_factor]
# The thread count will be doubled if uploads are stored on S3 because there's a higher latency.
thread_count_factor: 1.5

# Delete uploads from the output database that are not found in the source database.
delete_surplus_uploads: false

# Delete uploads from the output database that do not have a Discourse upload record.
delete_missing_uploads: false

# Check if files are missing in the upload store and update the database accordingly.
# Set to false and re-run the script afterwards if you want to create new uploads for missing files.
fix_missing: false

# Create optimized images for post uploads and avatars.
create_optimized_images: false

site_settings:
authorized_extensions: "*"
max_attachment_size_kb: 102_400
max_image_size_kb: 102_400

enable_s3_uploads: true
s3_upload_bucket: "your-bucket-name"
s3_region: "your-region"
s3_access_key_id: "your-access-key-id"
s3_secret_access_key: "your-secret-access-key"
s3_cdn_url: "https://your-cdn-url.com"

# Set this to true if the site is a multisite and configure the `multisite_db_name` accordingly
multisite: false
multisite_db_name: "default"

# Sometimes a file can be found at one of many locations. Here's a list of transformations that can
# be applied to the path to try and find the file. The first transformation that results in a file
# being found will be used.
path_replacements:
# - ["/foo/", "/bar"]
# - ["/foo/", "/bar/baz/"]
26 changes: 26 additions & 0 deletions migrations/lib/uploads/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

require "etc"
require "sqlite3"

module Migrations
module Uploads
class Base
TRANSACTION_SIZE = 1000
QUEUE_SIZE = 1000

# TODO: Use IntermediateDatabase instead
def create_connection(path)
sqlite = SQLite3::Database.new(path, results_as_hash: true)
sqlite.busy_timeout = 60_000 # 60 seconds
sqlite.journal_mode = "WAL"
sqlite.synchronous = "off"
sqlite
end

def query(sql, db)
db.prepare(sql).execute
end
end
end
end
67 changes: 67 additions & 0 deletions migrations/lib/uploads/cli.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# frozen_string_literal: true

require_relative "../migrations"
require_relative "./settings"
require_relative "./fixer"
require_relative "./uploader"
require_relative "./optimizer"

module Migrations
load_rails_environment

load_gemfiles("common")
configure_zeitwerk("lib/common")

module Uploads
class CLI < Thor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to keep CLI code in bin/process_uploads. It's not really reusable code that you'd expect to find in lib. It doesn't help with testability, either. But maybe I'm missing something, what are your arguments for putting it into lib?

Copy link
Contributor Author

@s3lase s3lase Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be great for if we ever want to compose the CLIs into a single "migration" CLI, making each a sub command will be relatively straightforward.

It doesn't help with testability, either.

Could you explain this further?

default_task :execute

class_option :settings,
type: :string,
aliases: "-s",
default: "./migrations/config/process_uploads.yml",
banner: "SETTINGS_FILE",
desc: "Upload settings file"

def initialize(*args)
super

EXIFR.logger = Logger.new(nil)
@settings = Settings.from_file(options[:settings])
end

def self.exit_on_failure?
true
end

desc "execute [--settings=SETTINGS_FILE]", "Process uploads"
def execute
return run_fixer! if @settings[:fix_missing]

Uploader.run!(@settings)

run_optimizer! if @settings[:create_optimized_images]
end

desc "fix-missing [--settings=SETTINGS_FILE]", "Fix missing uploads"
def fix_missing
run_fixer!
end

desc "optimize [--settings=SETTINGS_FILE]", "Optimize uploads"
def optimize
run_optimize!
end

private

def run_fixer!
Fixer.run!(@settings)
end

def run_optimizer!
Optimizer.run!(@settings)
end
end
end
end
110 changes: 110 additions & 0 deletions migrations/lib/uploads/fixer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# frozen_string_literal: true

require_relative "./base"

module Migrations
module Uploads
class Fixer < Base
def initialize(settings)
@settings = settings

@source_db = create_connection(settings[:output_db_path])
end

def self.run!(settings)
puts "Fixing missing uploads..."

new(settings).run!
end

def run!
queue = SizedQueue.new(QUEUE_SIZE)
consumer_threads = []

max_count =
@source_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")

binding
producer_thread =
Thread.new do
query(
"SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC",
@source_db,
).tap do |result_set|
result_set.each { |row| queue << row }
result_set.close
end
end

status_queue = SizedQueue.new(QUEUE_SIZE)
status_thread =
Thread.new do
error_count = 0
current_count = 0
missing_count = 0

while !(result = status_queue.pop).nil?
current_count += 1

case result[:status]
when :ok
# ignore
when :error
error_count += 1
puts "Error in #{result[:id]}"
when :missing
missing_count += 1
puts "Missing #{result[:id]}"

@output_db.execute("DELETE FROM uploads WHERE id = ?", result[:id])
Upload.delete_by(id: result[:upload_id])
end

error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"

print "\r%7d / %7d (%s, %s missing)" %
[current_count, max_count, error_count_text, missing_count]
end
end

store = Discourse.store

(Etc.nprocessors * @settings[:thread_count_factor] * 2).to_i.times do |index|
consumer_threads << Thread.new do
Thread.current.name = "worker-#{index}"
fake_upload = OpenStruct.new(url: "")
while (row = queue.pop)
begin
upload = JSON.parse(row["upload"])
fake_upload.url = upload["url"]
path = add_multisite_prefix(store.get_path_for_upload(fake_upload))

file_exists =
if store.external?
store.object_from_path(path).exists?
else
File.exist?(File.join(store.public_dir, path))
end

if file_exists
status_queue << { id: row["id"], upload_id: upload["id"], status: :ok }
else
status_queue << { id: row["id"], upload_id: upload["id"], status: :missing }
end
rescue StandardError => e
puts e.message
status_queue << { id: row["id"], upload_id: upload["id"], status: :error }
end
end
end
end

producer_thread.join
queue.close
consumer_threads.each(&:join)
status_queue.close
status_thread.join
end
end
end
end