Skip to content

Commit

Permalink
Change output behavior to be in addition to loading the database.
Browse files Browse the repository at this point in the history
The output file will be optionally written but the database will always
be loaded upon execution. The problem with trying to only write a SQL
script without loading the database is that the related data on a
belongs_to depends on parent tables having been already populated. So
the user would need to separate their scripts and make sure they are run
in the correct order.

Instead, we can load the data to begin with and also write the SQL
script. So from that point forward the user can use the SQL scripts to
reload the same data later on and save time that way.
  • Loading branch information
abeiderman committed Nov 15, 2021
1 parent 01aedc7 commit 5b1c7c6
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 96 deletions.
2 changes: 1 addition & 1 deletion lib/active_record_data_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
require "active_record_data_loader/dsl/polymorphic_association"
require "active_record_data_loader/dsl/model"
require "active_record_data_loader/dsl/definition"
require "active_record_data_loader/connection_output_adapter"
require "active_record_data_loader/file_output_adapter"
require "active_record_data_loader/null_output_adapter"
require "active_record_data_loader/copy_strategy"
require "active_record_data_loader/bulk_insert_strategy"
require "active_record_data_loader/table_loader"
Expand Down
14 changes: 10 additions & 4 deletions lib/active_record_data_loader/bulk_insert_strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

module ActiveRecordDataLoader
class BulkInsertStrategy
def initialize(data_generator, output_adapter)
def initialize(data_generator, file_adapter)
@data_generator = data_generator
@output_adapter = output_adapter
@file_adapter = file_adapter
end

def load_batch(row_numbers, connection)
output_adapter.insert(connection: connection, command: <<~SQL)
command = <<~SQL
INSERT INTO #{quoted_table_name(connection)} (#{column_list(connection)})
VALUES #{values(row_numbers, connection)}
SQL
insert(connection: connection, command: command)
file_adapter.insert(command)
end

def table_name
Expand All @@ -24,7 +26,11 @@ def name

private

attr_reader :data_generator, :output_adapter
attr_reader :data_generator, :file_adapter

def insert(connection:, command:)
connection.insert(command)
end

def quoted_table_name(connection)
@quoted_table_name ||= connection.quote_table_name(data_generator.table)
Expand Down
21 changes: 8 additions & 13 deletions lib/active_record_data_loader/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def initialize(
logger: nil,
statement_timeout: "2min",
connection_factory: -> { ::ActiveRecord::Base.connection },
output: :connection
output: nil
)
@default_batch_size = default_batch_size
@default_row_count = default_row_count
Expand All @@ -23,24 +23,19 @@ def initialize(
end

def output=(output)
@output = validate_output(output || { type: :connection })
@output = validate_output(output)
end

private

OUTPUT_OPTIONS_BY_TYPE = { connection: %i[type], file: %i[type filename] }.freeze

def validate_output(output)
if %i[file connection].include?(output)
{ type: output }
elsif output.is_a?(Hash)
raise "The output hash must contain a :type key with either :connection or :file" \
unless %i[file connection].include?(output[:type])

output.slice(*OUTPUT_OPTIONS_BY_TYPE[output[:type]])
if output.to_s.blank?
nil
elsif output.is_a?(String)
output
else
raise "The output configuration parameter must be either a symbol for :connection or :file, "\
"or a hash with more detailed output options."
raise "The output configuration parameter must be a filename meant to be the "\
"target for the SQL script"
end
end

Expand Down
20 changes: 0 additions & 20 deletions lib/active_record_data_loader/connection_output_adapter.rb

This file was deleted.

24 changes: 19 additions & 5 deletions lib/active_record_data_loader/copy_strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@

module ActiveRecordDataLoader
class CopyStrategy
def initialize(data_generator, output_adapter)
def initialize(data_generator, file_adapter)
@data_generator = data_generator
@output_adapter = output_adapter
@file_adapter = file_adapter
end

def load_batch(row_numbers, connection)
output_adapter.copy(
data = csv_rows(row_numbers, connection)
copy(
connection: connection,
table: table_name_for_copy(connection),
columns: columns_for_copy(connection),
data: csv_rows(row_numbers, connection),
data: data,
row_numbers: row_numbers
)
file_adapter.copy(
table: table_name_for_copy(connection),
columns: columns_for_copy(connection),
data: data,
row_numbers: row_numbers
)
end
Expand All @@ -27,7 +34,14 @@ def name

private

attr_reader :data_generator, :output_adapter
attr_reader :data_generator, :file_adapter

def copy(connection:, table:, columns:, data:, row_numbers:)
raw_connection = connection.raw_connection
raw_connection.copy_data("COPY #{table} (#{columns}) FROM STDIN WITH (FORMAT CSV)") do
raw_connection.put_copy_data(data.join("\n"))
end
end

def csv_rows(row_numbers, connection)
row_numbers.map do |i|
Expand Down
4 changes: 2 additions & 2 deletions lib/active_record_data_loader/file_output_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ def initialize(options)
File.open(@filename, File::TRUNC) if File.exist?(@filename)
end

def copy(connection:, table:, columns:, data:, row_numbers:)
def copy(table:, columns:, data:, row_numbers:)
data_filename = data_filename(table, row_numbers)
File.open(data_filename, "w") { |f| f.puts(data) }
File.open(filename, "a") do |file|
file.puts("\\COPY #{table} (#{columns}) FROM '#{data_filename}' WITH (FORMAT CSV);")
end
end

def insert(connection:, command:)
def insert(command)
write_command(command)
end

Expand Down
18 changes: 9 additions & 9 deletions lib/active_record_data_loader/loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ def initialize(configuration, definition)
def load_data
ActiveRecordDataLoader::ActiveRecord::PerRowValueCache.clear

output_adapter_class.with_output_options(output_adapter_options) do |output_adapter|
definition.models.map { |m| load_model(m, output_adapter) }
file_adapter_class.with_output_options(file_adapter_options) do |file_adapter|
definition.models.map { |m| load_model(m, file_adapter) }
end
end

private

attr_reader :definition, :configuration

def load_model(model, output_adapter)
def load_model(model, file_adapter)
generator = ActiveRecordDataLoader::ActiveRecord::ModelDataGenerator.new(
model: model.klass,
column_settings: model.columns,
Expand All @@ -32,20 +32,20 @@ def load_model(model, output_adapter)
batch_size: model.batch_size,
total_rows: model.row_count,
connection_handler: connection_handler,
strategy: strategy_class.new(generator, output_adapter),
strategy: strategy_class.new(generator, file_adapter),
logger: configuration.logger
)
end

def output_adapter_class
if configuration.output.fetch(:type) == :file
def file_adapter_class
if configuration.output.present?
ActiveRecordDataLoader::FileOutputAdapter
else
ActiveRecordDataLoader::ConnectionOutputAdapter
ActiveRecordDataLoader::NullOutputAdapter
end
end

def output_adapter_options
def file_adapter_options
timeout_commands =
if connection_handler.supports_timeout?
{
Expand All @@ -56,7 +56,7 @@ def output_adapter_options
{}
end

configuration.output.merge(timeout_commands)
timeout_commands.merge(filename: configuration.output)
end

def strategy_class
Expand Down
15 changes: 15 additions & 0 deletions lib/active_record_data_loader/null_output_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

module ActiveRecordDataLoader
class NullOutputAdapter
def self.with_output_options(_options)
yield new
end

def copy(table:, columns:, data:, row_numbers:); end

def insert(command); end

def write_command(command); end
end
end
32 changes: 22 additions & 10 deletions spec/active_record_data_loader_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
end
end

context "when the output is the connection" do
context "when there is no file output" do
shared_examples_for "loading data" do |adapter|
it "loads data into #{adapter}", adapter do
loader.load_data
Expand Down Expand Up @@ -87,7 +87,7 @@
end
end

context "when the output is a file" do
context "when there is a file output" do
def clean_files
FileUtils.rm(Dir.glob("./#{file_prefix}*"), force: true)
end
Expand All @@ -96,22 +96,34 @@ def clean_files
let(:filename) { "./#{file_prefix}.sql" }
after { clean_files }

shared_examples_for "writing SQL commands to a stream" do |adapter, block|
it "wirtes SQL commands for #{adapter} into a stream", adapter do
shared_examples_for "loading data and writing SQL commands to a file" do |adapter, block|
it "wirtes SQL commands for #{adapter} into a file", adapter do
ActiveRecordDataLoader.configure do |c|
c.logger = ::ActiveRecord::Base.logger
c.output = { type: :file, filename: filename }
c.output = filename
end

loader.load_data

expect(Company.all).to have(10).items
expect(Company.all.pluck(:created_at)).to all(be_within(10.minutes).of(Time.now))
expect(Company.all.pluck(:updated_at)).to all(be_within(10.minutes).of(Time.now))
expect(Customer.all).to have(100).items
expect(Employee.all).to have(100).items
expect(Order.all).to have(1_000).items
expect(Payment.all).to have(1_000).items
expect(Order.where(person_type: "Customer").count).to be_between(985, 995)
expect(Order.where(person_type: "Employee").count).to be_between(5, 15)

ActiveRecordHelper.define_schema
expect(Company.all).to be_empty
expect(Customer.all).to be_empty
expect(Employee.all).to be_empty
expect(Order.all).to be_empty
expect(Payment.all).to be_empty

block.call(filename)

expect(Company.all).to have(10).items
expect(Company.all.pluck(:created_at)).to all(be_within(10.minutes).of(Time.now))
expect(Company.all.pluck(:updated_at)).to all(be_within(10.minutes).of(Time.now))
Expand All @@ -124,20 +136,20 @@ def clean_files
end
end

it_behaves_like "writing SQL commands to a stream", :postgres, lambda { |f|
it_behaves_like "loading data and writing SQL commands to a file", :postgres, lambda { |f|
`PGPASSWORD=test psql -h localhost -p 2345 -U test -f #{f}`
}
it_behaves_like "writing SQL commands to a stream", :mysql, lambda { |f|
it_behaves_like "loading data and writing SQL commands to a file", :mysql, lambda { |f|
File.read(f).split("\n").each { |line| ::ActiveRecord::Base.connection.execute(line) }
}
it_behaves_like "writing SQL commands to a stream", :sqlite3, lambda { |f|
it_behaves_like "loading data and writing SQL commands to a file", :sqlite3, lambda { |f|
File.read(f).split("\n").each { |line| ::ActiveRecord::Base.connection.execute(line) }
}

it "sets the statement timeout for postgres scripts", :postgres do
ActiveRecordDataLoader.configure do |c|
c.logger = ::ActiveRecord::Base.logger
c.output = { type: :file, filename: filename }
c.output = filename
c.statement_timeout = "10min"
end

Expand All @@ -154,7 +166,7 @@ def clean_files

ActiveRecordDataLoader.configure do |c|
c.logger = ::ActiveRecord::Base.logger
c.output = { type: :file, filename: filename }
c.output = filename
end

loader.load_data
Expand Down

0 comments on commit 5b1c7c6

Please sign in to comment.