Skip to content

Commit

Permalink
Configurable Type-2 SCD timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
Joel committed May 4, 2011
1 parent 3fc33e3 commit 0d559dd
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 9 deletions.
7 changes: 6 additions & 1 deletion lib/etl/control/destination.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,12 @@ def process_change(row)
return
end

@timestamp = Time.now
@timestamp = case configuration[:scd][:timestamp]
when Time, Date then configuration[:scd][:timestamp]
when Symbol then row[configuration[:scd][:timestamp]]
when nil then Time.now
else raise "Unknown timestamp: #{configuration[:scd][:timestamp].inspect}. Use Time or Date for a specific time, a symbol for a value from each row, or nil for the current time"
end

# See if the scd_fields of the current record have changed
# from the last time this record was loaded into the data
Expand Down
2 changes: 1 addition & 1 deletion test/scd/1.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Bob,Smith,200 South Drive,Boston,MA,32123
Bob,Smith,200 South Drive,Boston,MA,32123,2010-05-01 12:45:00
2 changes: 1 addition & 1 deletion test/scd/2.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Bob,Smith,1010 SW 23rd St,Los Angeles,CA,90392
Bob,Smith,1010 SW 23rd St,Los Angeles,CA,90392,2010-05-02 12:45:00
2 changes: 1 addition & 1 deletion test/scd/3.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Bob,Smith,280 Pine Street,Los Angeles,CA,90392
Bob,Smith,280 Pine Street,Los Angeles,CA,90392,2010-05-03 12:45:00
39 changes: 37 additions & 2 deletions test/scd_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,29 @@ class ScdTest < Test::Unit::TestCase
assert lines.empty?, "scheduled load expected to be empty, was #{lines.size} records"
end
end
context "scd timestamp" do
should 'use the supplied timestamp' do
do_type_2_run_with_fixed_timestamp(1)
do_type_2_run_with_fixed_timestamp(2)
do_type_2_run_with_fixed_timestamp(3)
bobs = find_bobs
assert_equal 3, bobs.length
bobs.each do |bob| assert_equal Date.yesterday, bob.effective_date.to_date; end
end

should 'use the timestamp in the row' do
do_type_2_run_with_row_timestamp(1)
do_type_2_run_with_row_timestamp(2)
do_type_2_run_with_row_timestamp(3)
bobs = find_bobs
assert_equal 3, bobs.length
bobs.each do |bob|
assert_equal 2010, bob.effective_date.year
assert_equal 5, bob.effective_date.month
assert_equal bob.id, bob.effective_date.day
end
end
end
context "merge_nils" do
should 'treat nil values like a change without merge_nils' do
do_type_2_run_without_merge_nils(1)
Expand Down Expand Up @@ -232,6 +255,18 @@ def do_type_2_run_without_merge_nils(run_num)
run_ctl_file("scd_test_type_2_without_merge_nils.ctl")
end
end
def do_type_2_run_with_row_timestamp(run_num)
ENV['run_number'] = run_num.to_s
assert_nothing_raised do
run_ctl_file("scd_test_type_2_row_timestamp.ctl")
end
end
def do_type_2_run_with_fixed_timestamp(run_num)
ENV['run_number'] = run_num.to_s
assert_nothing_raised do
run_ctl_file("scd_test_type_2_specific_timestamp.ctl")
end
end

def do_type_2_run_with_only_city_state_zip_scd(run_num)
ENV['type_2_scd_fields'] = Marshal.dump([:city, :state, :zip_code])
Expand Down Expand Up @@ -266,10 +301,10 @@ def bob.id
self["id"].to_i
end
def bob.effective_date
DateTime.parse(self["effective_date"])
DateTime.parse(self["effective_date"].to_s)
end
def bob.end_date
DateTime.parse(self["end_date"])
DateTime.parse(self["end_date"].to_s)
end
def bob.latest_version?
ActiveRecord::ConnectionAdapters::Column.value_to_boolean(self["latest_version"])
Expand Down
9 changes: 7 additions & 2 deletions test/scd_test_type_1.ctl
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ source :in, {
:address,
:city,
:state,
:zip_code
:zip_code,
:date
]

after_read do |row|
row.except(:date)
end

# NOTE: These are not usually required for a type 1 SCD dimension, but since
# we're sharing this table with the type 2 tests, they're necessary.
transform :effective_date, :default, :default_value => Time.now.to_s(:db)
Expand Down Expand Up @@ -40,4 +45,4 @@ post_process :bulk_import, {
:file => 'output/scd_test_type_1.txt',
:target => :data_warehouse,
:table => 'person_dimension'
}
}
7 changes: 6 additions & 1 deletion test/scd_test_type_2.ctl
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ source :in, {
:address,
:city,
:state,
:zip_code
:zip_code,
:date
]

after_read do |row|
row.except(:date)
end

destination :out, {
:file => 'output/scd_test_type_2.txt',
:natural_key => [:first_name, :last_name],
Expand Down
40 changes: 40 additions & 0 deletions test/scd_test_type_2_row_timestamp.ctl
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
source :in, {
:file => "scd/#{ENV['run_number']}.txt",
:parser => :delimited
},
[
:first_name,
:last_name,
:address,
:city,
:state,
:zip_code,
:date
]

destination :out, {
:file => 'output/scd_test_type_2_row_timestamp.txt',
:natural_key => [:first_name, :last_name],
:scd => {
:type => 2,
:merge_nils => true,
:timestamp => :date,
:dimension_target => :data_warehouse,
:dimension_table => 'person_dimension'
},
:scd_fields => ENV['type_2_scd_fields'] ? Marshal.load(ENV['type_2_scd_fields']) : [:address, :city, :state, :zip_code]
},
{
:order => [
:id, :first_name, :last_name, :address, :city, :state, :zip_code, :effective_date, :end_date, :latest_version
],
:virtual => {
:id => ETL::Generator::SurrogateKeyGenerator.new(:target => :data_warehouse, :table => 'person_dimension')
}
}

post_process :bulk_import, {
:file => 'output/scd_test_type_2_row_timestamp.txt',
:target => :data_warehouse,
:table => 'person_dimension'
}
40 changes: 40 additions & 0 deletions test/scd_test_type_2_specific_timestamp.ctl
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
source :in, {
:file => "scd/#{ENV['run_number']}.txt",
:parser => :delimited
},
[
:first_name,
:last_name,
:address,
:city,
:state,
:zip_code,
:date
]

destination :out, {
:file => 'output/scd_test_type_2_specific_timestamp.txt',
:natural_key => [:first_name, :last_name],
:scd => {
:type => 2,
:merge_nils => true,
:timestamp => Time.now - 1.day,
:dimension_target => :data_warehouse,
:dimension_table => 'person_dimension'
},
:scd_fields => ENV['type_2_scd_fields'] ? Marshal.load(ENV['type_2_scd_fields']) : [:address, :city, :state, :zip_code]
},
{
:order => [
:id, :first_name, :last_name, :address, :city, :state, :zip_code, :effective_date, :end_date, :latest_version
],
:virtual => {
:id => ETL::Generator::SurrogateKeyGenerator.new(:target => :data_warehouse, :table => 'person_dimension')
}
}

post_process :bulk_import, {
:file => 'output/scd_test_type_2_specific_timestamp.txt',
:target => :data_warehouse,
:table => 'person_dimension'
}

0 comments on commit 0d559dd

Please sign in to comment.