Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #85 from activewarehouse/83-error-callback

First pass at allowing to declare error handlers from the etl scripts
  • Loading branch information...
commit 3b9b26b005b2223197461eaaae5bb5d40978b475 2 parents 13c7c01 + cb43b06
@thbar thbar authored
View
1  CHANGELOG
@@ -1,4 +1,5 @@
1.0.0 - release candidate
+* Support for error handlers in etl: can be declared with on_error { |error| } (thbar)
* Support for mysql streaming for better performance on large data sets - use :mysqlstream => true on database source (main code by pdodds, tests and fixes by sgrgic and thbar)
* EnsureFieldsPresenceProcessor treat symbols and strings equally (thbar)
* BREAKING CHANGE: you must require 'iconv' if you use the :encode_processor from now on (thbar)
View
12 lib/etl/control/control.rb
@@ -41,7 +41,12 @@ def depends_on(*args)
def dependencies
control.dependencies
end
-
+
+ # Register an error handler
+ def on_error(&block)
+ control.error_handlers << block
+ end
+
# Define a source.
def source(name, configuration={}, definition={})
if configuration[:type]
@@ -376,6 +381,11 @@ def after_post_process_screens
:warn => []
}
end
+
+ # A array of Procs to be invoked for errors notifications
+ def error_handlers
+ @error_handlers ||= []
+ end
# Get the error threshold. Defaults to 100.
def error_threshold
View
19 lib/etl/engine.rb
@@ -235,6 +235,14 @@ def say_on_own_line(message)
def errors
@errors ||= []
end
+
+ # First attempt at centralizing error notifications
+ def track_error(control, msg)
+ errors << msg
+ control.error_handlers.each do |handler|
+ handler.call(msg)
+ end
+ end
# Get a Hash of benchmark values where each value represents the total
# amount of time in seconds spent processing in that portion of the ETL
@@ -352,7 +360,8 @@ def process_control(control)
end
rescue => e
msg = "Error processing rows after read from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}"
- errors << msg
+ # TODO - track more information: row if possible, full exception...
+ track_error(control, msg)
Engine.logger.error(msg)
e.backtrace.each { |line| Engine.logger.error(line) }
exceeded_error_threshold?(control) ? break : next
@@ -374,10 +383,10 @@ def process_control(control)
end
rescue ResolverError => e
Engine.logger.error(e.message)
- errors << e.message
+ track_error(control, e.message)
rescue => e
msg = "Error transforming from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}"
- errors << msg
+ track_error(control, msg)
Engine.logger.error(msg)
e.backtrace.each { |line| Engine.logger.error(line) }
ensure
@@ -403,7 +412,7 @@ def process_control(control)
end
rescue => e
msg = "Error processing rows before write from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}"
- errors << msg
+ track_error(control, msg)
Engine.logger.error(msg)
e.backtrace.each { |line| Engine.logger.error(line) }
exceeded_error_threshold?(control) ? break : next
@@ -423,7 +432,7 @@ def process_control(control)
end
rescue => e
msg = "Error writing to #{Engine.current_destination}: #{e}"
- errors << msg
+ track_error(control, msg)
Engine.logger.error msg
e.backtrace.each { |line| Engine.logger.error(line) }
exceeded_error_threshold?(control) ? break : next
View
13 test/engine_test.rb
@@ -33,6 +33,19 @@ class EngineTest < Test::Unit::TestCase
assert_equal 1, engine.errors.size
end
+
+ should 'call error callbacks' do
+ engine = ETL::Engine.new
+
+ $our_errors = []
+ engine.process ETL::Control::Control.parse_text <<CTL
+ source :in, { :type => :enumerable, :enumerable => (1..100) }
+ on_error { |error| $our_errors << error }
+ after_read { |row| raise "Failure" }
+CTL
+ assert_equal 100, $our_errors.size
+ assert_match /on line 100: Failure$/, $our_errors.last
+ end
end
Please sign in to comment.
Something went wrong with that request. Please try again.