Skip to content

Commit

Permalink
Added appropriate caching code to FillRowProcessor. Cleanup up some
Browse files Browse the repository at this point in the history
bugs in the implementation
  • Loading branch information
cdimartino committed Mar 4, 2010
1 parent 3247f57 commit fd35e38
Showing 1 changed file with 70 additions and 18 deletions.
88 changes: 70 additions & 18 deletions lib/etl/processor/fill_row_processor.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'digest/sha1'

module ETL
module Processor

Expand Down Expand Up @@ -33,6 +35,8 @@ class FillRowProcessor < ETL::Processor::RowProcessor
# <tt>:use_first</tt>: Whether or not to use the first returned value if
# multiple rows are returned. Defaults to FALSE. Will raise error if
# FALSE and multiple rows are found
# <tt>:use_cache</tt>: Whether or not to cache the results as they are found. Defaults to true.
# <tt>:preload_cache</tt>: Whether or not to preload the cache. Defaults to true.
#
def initialize(control, configuration)
super
Expand All @@ -41,10 +45,28 @@ def initialize(control, configuration)
@match = configuration[:match] || raise(ETL::ControlError, ":match must be specified")
@target = configuration[:target] || raise(ETL::ControlError, ":target must be specified")
@table = configuration[:table] || raise(ETL::ControlError, ":table must be specified")
@use_first = ( configuration[:use_first] || false )
@overwrite = ( configuration[:overwrite] || true )
@use_first = configuration[:use_first] || false
@overwrite = configuration[:overwrite] === false ? false : true
@use_cache = configuration[:use_cache] === false ? false : true

@connection = ETL::Engine.connection(target)

preload_cache unless configuration[:preload_cache] === false
end

# The cache store
def cache
@cache ||= Hash.new
end

def cache_key(values, side = :left)
Digest::SHA1.hexdigest(
match.send(side == :left ? :keys : :values).collect { |i|
# Db keys will come back as strings, whereas etl rows are keyed on symbols
side == :left || i = i.to_s
values[i]
}.to_s
)
end

# Whether or not to overwrite existing values in the row (uses blank? rules if false)
Expand All @@ -57,6 +79,25 @@ def overwrite?(existing, update)
end
end

def preload_cache
query = select_stmt
connection.select_all(query).each do |v|
key = cache_key(v, :right)
if cache[key] and not use_first?
raise TooManyResultsError, "Too many results found (and use_first not set) using the following value: #{v}"
end

next if cache[key] and use_first?
cache[key] = v
end
ETL::Engine.logger.debug("Cache looks like: #{cache.inspect}")
end

# Whether or not to cache the results as they are found
def use_cache?
@use_cache
end

# Whether or not to use the first returned row if there are multiple values returned.
# If not true, then any multiple row return will result in an error
def use_first?
Expand All @@ -65,31 +106,42 @@ def use_first?

# Process the row and modify it as appropriate
def process(row)
conditions = []
match.each_pair do |key, column|
conditions << "#{column.to_s} = #{connection.quote(row[key])}"
end
q = "SELECT #{values.to_a.collect { |a| a.each(&:to_s).join(' ') }.join(', ')} FROM #{table.to_s} WHERE "
q << conditions.join(' AND ')
value = cache[cache_key(row)]
if not @preload_cache and not value
query = select_stmt + conditions(row)
ETL::Engine.logger.debug("Looking up row using query: #{query}")

#ETL::Engine.logger.debug("Looking up row using query: #{q}")

value = connection.select_all(q)
if value.length > 1 and not use_first?
raise TooManyResultsError, "Too many results found (and use_first not set) using the following query: #{q}"
value = connection.select_all(query)
if value.length > 1 and not use_first?
raise TooManyResultsError, "Too many results found (and use_first not set) using the following query: #{q}"
end
value = value.first
cache[cache_key(row)] = value
end

if value.empty?
ETL::Engine.logger.info("Unable to find FillRow match for row: #{row.inspect}")
if value.nil? or value.empty?
ETL::Engine.logger.debug("Unable to find FillRow match for row: #{row.inspect}")
else
value.first.each_pair do |key, col_value|
#ETL::Engine.logger.debug("Before update: #{row.inspect}")
value.each_pair do |key, col_value|
row[key.to_sym] = col_value if overwrite?(row[key.to_sym], col_value)
#ETL::Engine.logger.debug("After update: #{row.inspect}")
end
end
row
end

private

def select_stmt
@select_stmt ||= "SELECT #{values.to_a.collect { |a| a.each(&:to_s).join(' ') }.join(', ')}, #{match.values.collect { |a| a.to_s}.join(', ')} FROM #{table.to_s}"
#@select_stmt ||= "SELECT #{values.to_a.collect { |a| a.each(&:to_s).join(' ') }.join(', ')} FROM #{table.to_s}"
end

def conditions(r)
" WHERE " +
match.each_pair.collect { |key, column|
"#{column.to_s} = #{connection.quote(r[key])}"
}.join(" AND ")
end
end
end
end
Expand Down

0 comments on commit fd35e38

Please sign in to comment.