Skip to content

Commit

Permalink
Moj 574 speedup hr uploads processing (localytics#23)
Browse files Browse the repository at this point in the history
* MOJ-574 Updates to the ODBC adapter for a new merge_all functionality. Similar functionality to active record's insert/upsert all, but uses the SQL merge syntax instead of insert with update capabilities.

* MOJ-574 Fixes to the rails 7 changes and added the ability to prune duplicate records in merge_all

* MOJ-574 Removed extra debug statement

* MOJ-574 Optimization to reduce memory and/or cpu usage. Large merges are killing sidekiq.

* MOJ-574  Cleaning up the delete code, but ultimately leaving it intentionally disabled and inaccessible. It'll get worked on/tested if/when it's needed.

* MOJ-574 I forgot to remove the delete_key code from the persistence portion of the adapter...

* MOJ-574 And forgot to correct delete_keys in updatable and insertable columns methods
  • Loading branch information
ACerka-Springbuk committed May 11, 2023
1 parent dbe56e2 commit 91d9c13
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 15 deletions.
43 changes: 30 additions & 13 deletions lib/active_record/connection_adapters/odbc_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,24 +155,41 @@ def next_sequence_value(table_name = nil)
exec_query("SELECT #{table_name}.NEXTVAL as new_id").first["new_id"]
end

def build_merge_sql(merge) # :nodoc:
<<~SQL
MERGE #{merge.into} AS TARGET USING (#{merge.values_list}) AS SOURCE ON #{merge.match}
#{merge.merge_delete}
#{merge.merge_update}
#{merge.merge_insert}
SQL
end

def exec_merge_all(sql, name) # :nodoc:
exec_query(sql, name)
end

protected

#Snowflake ODBC Adapter specific
def initialize_type_map(map)
map.register_type :boolean, Type::Boolean.new
map.register_type :date, Type::Date.new
map.register_type :string, Type::String.new
map.register_type :datetime, Type::DateTime.new
map.register_type :time, Type::Time.new
map.register_type :binary, Type::Binary.new
map.register_type :float, Type::Float.new
map.register_type :integer, ::ODBCAdapter::Type::SnowflakeInteger.new
map.register_type(:decimal) do |_sql_type, column_data|
Type::Decimal.new(precision: column_data.precision, scale: column_data.scale)
map.register_type %r(boolean)i, Type::Boolean.new
map.register_type %r(date)i, Type::Date.new
map.register_type %r(varchar)i, Type::String.new
map.register_type %r(time)i, Type::Time.new
map.register_type %r(timestamp)i, Type::DateTime.new
map.register_type %r(binary)i, Type::Binary.new
map.register_type %r(double)i, Type::Float.new
map.register_type(%r(decimal)i) do |sql_type|
scale = extract_scale(sql_type)
if scale == 0
::ODBCAdapter::Type::SnowflakeInteger.new
else
Type::Decimal.new(precision: extract_precision(sql_type), scale: scale)
end
end
map.register_type :object, ::ODBCAdapter::Type::SnowflakeObject.new
map.register_type :array, ::ODBCAdapter::Type::ArrayOfValues.new
map.register_type :variant, ::ODBCAdapter::Type::Variant.new
map.register_type %r(struct)i, ::ODBCAdapter::Type::SnowflakeObject.new
map.register_type %r(array)i, ::ODBCAdapter::Type::ArrayOfValues.new
map.register_type %r(variant)i, ::ODBCAdapter::Type::Variant.new
end

# Translate an exception from the native DBMS to something usable by
Expand Down
192 changes: 192 additions & 0 deletions lib/active_record/merge_all.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# frozen_string_literal: true

require "active_support/core_ext/enumerable"

module ActiveRecord
class MergeAll # :nodoc:
attr_reader :model, :connection, :merges, :keys
attr_reader :perform_inserts, :perform_updates, :delete_key

def initialize(model, merges, perform_inserts: true, perform_updates: true, prune_duplicates: false)
raise ArgumentError, "Empty list of attributes passed" if merges.blank?

# TODO: Implement perform_deletes. Most of the code is here, but all completely untested.
@model, @connection, @merges, @keys = model, model.connection, merges, merges.first.keys.map(&:to_s)
@perform_inserts, @perform_updates, @delete_key = perform_inserts, perform_updates, nil

if model.scope_attributes?
@scope_attributes = model.scope_attributes
@keys |= @scope_attributes.keys
end
@keys = @keys.to_set

ensure_valid_options_for_connection!

if prune_duplicates
do_prune_duplicates
end
end

def execute
message = +"#{model} "
message << "Bulk " if merges.many?
message << "Merge"
connection.exec_merge_all to_sql, message
end

def updatable_columns
keys - readonly_columns - [delete_key]
end

def insertable_columns
keys - [delete_key]
end

def insertable_non_primary_columns
insertable_columns - primary_keys
end

def primary_keys
Array(connection.schema_cache.primary_keys(model.table_name))
end

def map_key_with_value
merges.map do |attributes|
attributes = attributes.stringify_keys
attributes.merge!(scope_attributes) if scope_attributes

verify_attributes(attributes)

keys.map do |key|
yield key, attributes[key]
end
end
end

def perform_deletes
!delete_key.nil?
end

private
attr_reader :scope_attributes

def ensure_valid_options_for_connection!

end

def do_prune_duplicates
unless primary_keys.to_set.subset?(keys)
raise ArgumentError, "Pruning duplicates requires presense of all primary keys in the merges"
end
@merges = merges.reverse
merges.uniq! do |merge|
primary_keys.map { |key| merge[key] }
end
merges.reverse!
end

def to_sql
connection.build_merge_sql(ActiveRecord::MergeAll::Builder.new(self))
end

def readonly_columns
primary_keys + model.readonly_attributes.to_a
end

def verify_attributes(attributes)
if keys != attributes.keys.to_set
raise ArgumentError, "All objects being merged must have the same keys"
end
end

class Builder # :nodoc:
attr_reader :model

delegate :keys, to: :merge_all

def initialize(merge_all)
@merge_all, @model, @connection = merge_all, merge_all.model, merge_all.connection
end

def into
# "INTO #{model.quoted_table_name} (#{columns_list})"
"INTO #{model.quoted_table_name}"
end

def values_list
types = extract_types_from_columns_on(model.table_name, keys: keys)

values_list = merge_all.map_key_with_value do |key, value|
connection.with_yaml_fallback(types[key].serialize(value))
end

values = connection.visitor.compile(Arel::Nodes::ValuesList.new(values_list))

"SELECT * FROM (#{values}) AS v1 (#{columns_list})"
end

def match
quote_columns(merge_all.primary_keys).map { |column| "SOURCE.#{column}=TARGET.#{column}" }.join(" AND ")
end

def merge_delete
merge_all.perform_deletes ? "WHEN MATCHED AND SOURCE.#{quote_column(merge_all.delete_key)} = TRUE THEN DELETE" : ""
end

def merge_update
merge_all.perform_updates ? "WHEN MATCHED THEN UPDATE SET #{updatable_columns.map { |column| "TARGET.#{column}=SOURCE.#{column}" }.join(",")}" : ""
end

def merge_insert
if merge_all.perform_inserts
<<~SQL
WHEN NOT MATCHED AND #{quote_columns(merge_all.primary_keys).map { |column| "SOURCE.#{column} IS NOT NULL" }.join(" AND ")} THEN INSERT (#{insertable_columns_list}) VALUES (#{quote_columns(merge_all.insertable_columns).map { |column| "SOURCE.#{column}"}.join(",")})
WHEN NOT MATCHED AND #{quote_columns(merge_all.primary_keys).map { |column| "SOURCE.#{column} IS NULL" }.join(" OR ")} THEN INSERT (#{insertable_non_primary_columns_list}) VALUES (#{quote_columns(merge_all.insertable_non_primary_columns).map { |column| "SOURCE.#{column}"}.join(",")})
SQL
else
""
end
end

private
attr_reader :connection, :merge_all

def columns_list
format_columns(merge_all.keys)
end

def insertable_columns_list
format_columns(merge_all.insertable_columns)
end

def insertable_non_primary_columns_list
format_columns(merge_all.insertable_non_primary_columns)
end

def updatable_columns
quote_columns(merge_all.updatable_columns)
end

def extract_types_from_columns_on(table_name, keys:)
columns = connection.schema_cache.columns_hash(table_name)

unknown_column = (keys - columns.keys).first
raise UnknownAttributeError.new(model.new, unknown_column) if unknown_column

keys.index_with { |key| model.type_for_attribute(key) }
end

def format_columns(columns)
columns.respond_to?(:map) ? quote_columns(columns).join(",") : columns
end

def quote_columns(columns)
columns.map(&method(:quote_column))
end

def quote_column(column)
connection.quote_column_name(column)
end
end
end
end
14 changes: 14 additions & 0 deletions lib/active_record/merge_all_persistence.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
require 'active_record/merge_all'

module ActiveRecord
# = Active Record \Persistence
module MergeAllPersistence
extend ActiveSupport::Concern

module ClassMethods
def merge_all!(attributes, perform_inserts: true, perform_updates: true, prune_duplicates: false)
MergeAll.new(self, attributes, perform_inserts: perform_inserts, perform_updates: perform_updates, prune_duplicates: prune_duplicates).execute
end
end
end
end
1 change: 1 addition & 0 deletions lib/odbc_adapter.rb
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Requiring with this pattern to mirror ActiveRecord
require 'active_record/connection_adapters/odbc_adapter'
require 'active_record/merge_all_persistence'
2 changes: 1 addition & 1 deletion lib/odbc_adapter/quoting.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def quoted_date(value)
end

def lookup_cast_type_from_column(column) # :nodoc:
type_map.lookup(column.type)
type_map.lookup(column.sql_type)
end

def quote_hash(hash:)
Expand Down
14 changes: 13 additions & 1 deletion lib/odbc_adapter/schema_statements.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def columns(table_name, _name = nil)
col_nullable = nullability(col_name, col[17], col[10])

# This section has been customized for Snowflake and will not work in general.
args = { sql_type: col_native_type, type: col_native_type, limit: col_limit }
args = { sql_type: construct_sql_type(col_native_type, col_limit, col_scale), type: col_native_type, limit: col_limit }
args[:type] = case col_native_type
when "BOOLEAN" then :boolean
when "VARIANT" then :variant
Expand Down Expand Up @@ -177,5 +177,17 @@ def name_regex(name)
/^#{name}$/i
end
end

# Changes in rails 7 mean that we need all of the type information in the sql_type column
# This reconstructs sql types using limit (which is precision) and scale
def construct_sql_type(native_type, limit, scale)
if scale > 0
"#{native_type}(#{limit},#{scale})"
elsif limit > 0
"#{native_type}(#{limit})"
else
native_type
end
end
end
end

0 comments on commit 91d9c13

Please sign in to comment.