Skip to content

Commit

Permalink
Modified Engine:
Browse files Browse the repository at this point in the history
  - Allow for multiple environments for database config:
    database.<env>.yaml
  - Allow setting of log level by ENV['LOG_LEVEL'] = x.
    0 = Debug
    1 = Info
    etc
  - Added millisecond precision to timestamp method
  - Use UTC for created_at/completed_at for Batch/Job
  - Better handling of paths for control files
  - Beginning (not functional - Syntax broken - Code does not run!!)
    work on multiprocessing transforms
  • Loading branch information
cdimartino committed Jan 28, 2010
1 parent 1093ccb commit bcbcc02
Showing 1 changed file with 34 additions and 11 deletions.
45 changes: 34 additions & 11 deletions lib/etl/engine.rb
Expand Up @@ -30,7 +30,20 @@ def init(options={})
require File.join(@rails_root, 'config/environment') if @rails_root
options[:config] ||= 'database.yml'
options[:config] = 'config/database.yml' unless File.exist?(options[:config])
database_configuration = YAML::load(ERB.new(IO.read(options[:config])).result + "\n")

const_set(:RAILS_ENV, ENV['RAILS_ENV']) unless const_defined?(:RAILS_ENV)

if RAILS_ENV
env_db_file = File.join(@rails_root, 'config', "database.#{RAILS_ENV}.yml")
if File.exist?(env_db_file)
options[:env_config] = env_db_file
end
end

db_yml = IO.read(options[:config]) + "\n" + ( IO.read(options[:env_config]) rescue '' )

database_configuration = YAML::load(ERB.new(db_yml).result + "\n")

ActiveRecord::Base.configurations.merge!(database_configuration)
ETL::Base.configurations = database_configuration
#puts "configurations in init: #{ActiveRecord::Base.configurations.inspect}"
Expand Down Expand Up @@ -73,15 +86,15 @@ def logger #:nodoc:
else
@logger = Logger.new(File.open('etl.log', log_write_mode))
end
@logger.level = Logger::WARN
@logger.level = ( ( ! ENV[:LOG_LEVEL].nil? && ENV[:LOG_LEVEL].to_i ) || Logger::WARN )
@logger.formatter = Logger::Formatter.new
end
@logger
end

# Get a timestamp value as a string
def timestamp
Time.now.strftime("%Y%m%d%H%M%S")
Time.now.strftime("%Y%m%d%H%M%S%6N")
end

# The current source
Expand Down Expand Up @@ -276,12 +289,13 @@ def process_batch(batch)

ETL::Engine.batch = ETL::Execution::Batch.create!(
:batch_file => batch.file,
:status => 'executing'
:status => 'executing',
:created_at => Time.now.utc
)

batch.execute

ETL::Engine.batch.completed_at = Time.now
ETL::Engine.batch.completed_at = Time.now.utc
ETL::Engine.batch.status = (errors.length > 0 ? 'completed with errors' : 'completed')
ETL::Engine.batch.save!
end
Expand All @@ -292,9 +306,10 @@ def process_control(control)
say_on_own_line "Processing control #{control.file}"

ETL::Engine.job = ETL::Execution::Job.create!(
:control_file => control.file,
:control_file => File.basename(control.file),
:status => 'executing',
:batch_id => ETL::Engine.batch ? ETL::Engine.batch.id : nil
:batch_id => ETL::Engine.batch ? ETL::Engine.batch.id : nil,
:created_at => Time.now.utc
)

execute_dependencies(control)
Expand All @@ -312,7 +327,13 @@ def process_control(control)
say "Source: #{source}"
say "Limiting enabled: #{Engine.limit}" if Engine.limit != nil
say "Offset enabled: #{Engine.offset}" if Engine.offset != nil
source.each_with_index do |row, index|

# Going to try to multiprocess the Transforms here. This is the major bottleneck in the process:
# To do so, will need to return a dataset from inside the block and set the benchmarks and rows_read from the return.
# Only forkify if the number of rows to read is > 500 (to offset startup time for forkify/rinda::tuplespace

method = source.length > 500 ? :forkify : :each_with_index
source.send(method) do |row, index|
# Break out of the row loop if the +Engine.limit+ is specified and
# the number of rows read exceeds that value.
if Engine.limit != nil && Engine.rows_read >= Engine.limit
Expand All @@ -334,6 +355,7 @@ def process_control(control)
begin
Engine.logger.debug "Processing after read"
control.after_read_processors.each do |processor|
Engine.logger.debug "After read processor: #{processor}"
processed_rows = []
rows.each do |row|
processed_rows << processor.process(row)
Expand All @@ -354,6 +376,7 @@ def process_control(control)
Engine.logger.debug "Executing transforms"
rows.each do |row|
control.transforms.each do |transform|
Engine.logger.debug "Executing transform: #{transform}"
name = transform.name.to_sym
row[name] = transform.transform(name, row[name], row)
end
Expand Down Expand Up @@ -470,13 +493,13 @@ def process_control(control)
say "Avg transforms: #{Engine.rows_read/benchmarks[:transforms]} rows/sec" if benchmarks[:transforms] > 0
say "Avg writes: #{Engine.rows_read/benchmarks[:writes]} rows/sec" if benchmarks[:writes] > 0

# say "Avg time writing execution records: #{ETL::Execution::Record.average_time_spent}"
#
say "Avg time writing execution records: #{ETL::Execution::Record.average_time_spent}"

# ETL::Transform::Transform.benchmarks.each do |klass, t|
# say "Avg #{klass}: #{Engine.rows_read/t} rows/sec"
# end

ETL::Engine.job.completed_at = Time.now
ETL::Engine.job.completed_at = Time.now.utc
ETL::Engine.job.status = (errors.length > 0 ? 'completed with errors' : 'completed')
ETL::Engine.job.save!
end
Expand Down

0 comments on commit bcbcc02

Please sign in to comment.