Skip to content
fangxing edited this page Dec 27, 2018 · 20 revisions

This guide needs updating! Work in progress

Introduction

The largest and most complex phase of any data warehouse project is the Extract-Transform-Load process. This process involves the extraction of data from source systems, transformation of that data so it conforms to standardized values and conventions and loading of the data into the data warehouse. ActiveWarehouse ETL, a subproject of the ActiveWarehouse open source project, will help with the ETL development process.

Note: this documentation covers ActiveWarehouse ETL version 0.9. There is no guarantee that functionality described here will work the same or at all in previous versions.

Installing

To install ActiveWarehouse ETL you must first install Ruby and Rubygems. Once you have a functioning Ruby and Rubygems installation you can install by simply running gem install:

  gem install activewarehouse-etl

If you are installing this into a Ruby on Rails project then you may instead use activewarehouse-etl as a plugin.

[rails root]$ script/plugin install git://github.com/activewarehouse/activewarehouse-etl.git

Or you may simply clone the repository into some place on your Ruby load path.

git clone http://github.com/activewarehouse/activewarehouse-etl.git

Note that if you do not use the gem install you will still require Ruby Gems and all the other dependencies listed below. Further, the etl executable will reside in activewarehouse-etl/bin. You must either link to etl from somewhere in your PATH or run it by specifying the full directory path /path/to/activewarehouse-etl/bin/etl

ActiveWarehouse ETL depends on FasterCSV, ActiveRecord, ActiveSupport and SQLite3. If these libraries are not already installed they should be installed along with ActiveWarehouse ETL. Naturally, if you use any other DBMS you need the adapter for that as well.

Finally, if you are running the etl command line tool, ensure that you are either including a config file that requires rails, or add require 'rails/all' to the bin/etl script.
You may also need to add ActiveRecord::Base.default_timezone = :utc if your databases use UTC timestamps (Rails default behavior).

Setting Up

First create a database.yml in the directory of your ETL control files that contains information on how to connect to the etl_execution database. This database stores meta data about each ETL execution. Here is an example of the YAML file from the example application rails_warehouse in the ActiveWarehouse subversion repository:

etl_execution:
  database: rails_warehouse_etl_execution
  adapter: mysql
  host:    localhost
  username: root
  password:

Next create the database defined above in your database server. The first time you run the etl script it will execute the necessary migrations to construct the ETL execution environment database.

Defining Data Connections

In addition to the etl_execution connection you will also define all of your connections to operational databases as well as the data warehouse environment in the database.yml file in the directory of your ETL control files. You will then reference these connections by name through your control files using the :target option.

Control Files

Each ETL process is defined using an ETL control file. The common naming convention for ETL control files in ActiveWarehouse is destination.ctl where destination is the destination table name. For example, if you are loading data into a store dimension called store_dimension then the control file would be called store_dimension.ctl. If you are including the ETL control files as part of your Rails application I recommend putting your etl directory right in the RAILS_ROOT directory.

How It Works

ETL control files define a pipeline for the ETL process. It is possible to both read from multiple sources and write to multiple destinations. Sometimes it is beneficial to write your ETL process to take advantage of this pipeline concept in order to reduce ETL processing time. This is especially true if you are dealing with source data that contains many records since you may be able to implement the ETL process as a single pass.

Data passes through the ETL pipeline as Hashes. Specifically, each row is a Hash where the key is the name of the field and the value is the current value of the field. All records are initially extracted as text and should be transformed using the type transform or date and time conversions if necessary. In many cases though you may just want to leave the rows as Hashes of symbols mapped to strings.

One of the most important things to keep in mind is that control files are Ruby code, therefore you can use normal Ruby expressions. This can even include importing other libraries as well as your own libraries and support code. So for example, you could put the following at the top of your control file:

require File.dirname(__FILE__) + "/support"

This would require the file support.rb in the same directory as the control file.

Batching

As of version 0.9, ActiveWarehouse ETL now includes a batch execution system. Batch files end with .ebf which stands for “ETL Batch File”. The batch file contains directives which are applicable to the entire process. Directives are executed in the order in which they appear in the batch file.

Run Directive

The run directive indicates an ETL control file to run. For example, to execute the person_dimension.ctl control file you would include the following in the batch file:

  run "person_dimension.ctl"

Use Temp Tables

The use_temp_tables directive indicates that temporary tables should be used during ETL processing. If the ETL process fails during ETL execution then the temp tables will survive and the production tables will be untouched.

The temp tables directive will affect the following processors:

  • bulk import
  • check_exist
  • surrogate_key
  • truncate

It also affects the database source and database destination implementations.

Error threshold

By default, processing will stop if more than 100 errors are encountered. You can customize this threshold in each control file or batch file:

set_error_threshold 50

Extract

Extraction takes data out of your operational databases and files and brings it into the ETL pipeline for processing. ActiveWarehouse ETL supports three types of extraction methods: flat file, database and in-memory enumeration. When you define a source to extract data from you may provide a variety of options to control aspects of the extraction. Some options are usable across all sources while some are source dependent. The source options which are applicable for all sources are:

:store_locally
Option that sets whether or not the source data should be extracted to a local file first before processing. When this option is true any data will first be stored in the directory called source_data in the current working directory as a CSV (comma-delimited text file). The default value is true.

There are several benefits for storing the data locally before processing. For the database source there is the benefit that the connection to the database does not need to be maintained, which is good in that it frees up resources. Another benefit is that the memory needed for processing is reduced because the data will be loaded line-by-line from the CSV file as it is processed. Finally, by storing locally you have an audit trail and you can go back and look through the original source data used during any ETL process.

When store locally is used the data will be written to a time-stamped file in a directory that is based on the source location. For example, if the source is a database on localhost, the source data might be written to source_data/localhost/rails_production_database/people/20070308110032.csv. In addition to the source data an empty file ending in .trig will be written once the data download is complete. This is used by the ETL engine so that it knows that the source data was fully written.

The format for the source definition in a control file is:

source :name, configuration, definition

The configuration argument is always a Hash. The definition argument may be a Hash or an Array, depending on the source type.

Flat File Extraction

Flat file extraction pulls data from a flat text file. AW-ETL comes with a variety of parsers that can be used to parse different text formats. The following parsers are included with ActiveWarehouse ETL:

  • Delimited
  • Fixed-width
  • SAX
  • XML
  • Apache Combined Log

The definition argument will vary depending on the parser used. For example, the delimited parser expects an Array corresponding to the fields in each row in the order they will appear in the source file, whereas the fixed-width parser expects a hash where each name/value pair has the field name and a Hash of options for that field.

All parsers share the following common configuration elements:

:file
The file to read from. This may be an absolute or relative path. For relative paths it is relative to the control file, not the current working directory. This configuration element is required.
:parser
The parser name, either as a String or Symbol, a Class that extends from ETL::Parser::Parser or a Hash with two records: :name and :options. If a String or Symbol is given then it will be converted to camel case and Parser will be appended to it, so :delimited becomes DelimitedParser. If a Hash is provided the :name will correspond to the :parser name and the :options Hash will be passed to the parser when it is instantiated. This element is required.

Delimited Parser

Deprecation warning: the delimited parsed will be renamed to csv parser in the next release (change already committed to git master

The delimited parser will be used when the configuration Hash contains a :file element and a :parser element with a value of :delimited.

The delimited parser includes the following configuration options:

:skip_lines
The number of lines that should be skipped. This is useful for skipping header lines. This element is optional and will default to 0.

The definition when using the delimited parser is an Array of field names in the order which they appear in the source file.

For example, consider the following CSV file, called people.csv:

first_name,last_name,email
Bob,Smith,bsmith@foo.com
Jane,Doe,jdoe@bar.com

The source definition might look like this:

source :in, {
  :file => "people.csv",
  :parser => :delimited,
  :skip_lines => 1
}, 
[
  :first_name,
  :last_name,
  :email
]

You could also pass parameters to the parser. Consider if you have a data file with tabs as the separator:

first_name  last_name  email
Bob Smith bsmith@foo.com
Jane  Doe jdoe@bar.com

The configuration would look like this:

source :in, {
  :file => 'people.txt',
  :parser => {
    :name => :delimited,
    :options => {
      :col_sep => "\t"
    }
  },
  :skip_lines => 1
},
[
  :first_name,
  :last_name,
  :email
]

Fixed-width Parser

The fixed-width parser will be used when the configuration Hash contains a :file element and a :parser element with a value of :fixed_width.

The fixed-width parser includes the following configuration options:

:skip_lines
The number of lines that should be skipped. This is useful for skipping header lines. This element is optional and will default to 0.

The definition when using the fixed-width parser is a Hash. Each element in the Hash maps the field name to a Hash of options. The Hash of options may include the following elements:

:start
The start position of the element. The first starting position is always 1.
:length
The length of the field.
:end
The end of the field.

Either :start and :length or :start and :end must be specified.

Consider the following source file:

first    last        email
Bob      Smith       bsmith@foo.com
Jane     Doe         jdoe@bar.com

The source would look like this:

source :in, {
  :file => "people.txt",
  :parser => :fixed_width
}, 
{
  :first_name => {
    :start => 1,
    :length => 9
  },
  :last_name => {
    :start => 10,
    :length => 12
  },
  :ssn => {
    :start => 22,
    :length => 50
  }
}

Note that the fixed width parser will automatically trim each value.

SAX Parser

The SAX parser will be used when the configuration Hash contains a :file element and a :parser element with a value of :sax. While a DOM-based XML parser is included in the ActiveWarehouse ETL library, you should always favor the SAX implementation as it performs and scales in a way that is not possible with the DOM-based XML parser.

Unlike the other parsers, the SAX parser passes its options directly via the definition. The definition takes two elements:

:write_trigger
A path that defines the element that will cause the current row to be written.
:fields
A Hash of fields where each field name is mapped to the XPath-like path for finding the field in the XML stream.

Consider the following XML document:

<?xml version="1.0"?>
<people>
  <person ssn="123456789">
    <first_name>Bob</first_name>
    <last_name>Smith</last_name>
    <email>bsmith@foo.com</email>
  </person>
  <person ssn="111223333">
    <first_name>Jane</first_name>
    <last_name>Doe</last_name>
    <email>jdoe@bar.com</email>
  </person>
</people>

The source would look like this:

source :in, {
  :file => "people.xml",
  :parser => :sax
}, 
{
  :write_trigger => "people/person",
  :fields => {
    :first_name => "people/person/first_name",
    :last_name => "people/person/last_name",
    :email => "people/person/email",
    :ssn => "people/person[ssn]"
  }
}

The path parser currently only handles simple expressions, specifically the path to the element or attribute. The syntax is XPath-like but it is not XPath.

XML Parser

The XML parser uses REXML and is unsuitable for processing very large documents. However, it does provide full XPath access to the DOM tree and this may be required for documents where simple paths do not suffice to select the desired nodes.

:collection
An XPath expression that defines the DOM nodes to be selected for processing
:fields
A Hash of field hashes, each hash comprising a :name attribute and an :xpath attribute, where each field :name is mapped to the text value of the specified :xpath selector. If the :xpath attribute is not specified then one is inferred from the :name attribute.

Consider the following XML document:

<?xml version="1.0"?>
<people>
  <person ssn="123456789" type="employee">
    <first_name>Bob</first_name>
    <last_name>Smith</last_name>
    <email>bsmith@foo.com</email>
    <colours>
      <eyes>brown</eyes>
      <hair>black</hair>
      <skin>fair</skin>
    </colours>
  </person>
  <person ssn="111223333" type="client">
    <first_name>Jane</first_name>
    <last_name>Doe</last_name>
    <email>jdoe@bar.com</email>
    <colours>
      <eyes>blue</eyes>
      <hair>blond</hair>
      <skin>medium</skin>
    </colours>
  </person>
  <person ssn="133244566" type="client">
    <first_name>Jake</first_name>
    <last_name>Smithsonian</last_name>
    <email>jake@example.com</email>
    <colours>
      <eyes>brown</eyes>
      <hair>black</hair>
      <skin>dark</skin>
    </colours>
  </person>
</people>

To select only client type nodes for processing the following source should suffice:

source  :in, {
  :file =>  "people.xml",
  :parser => :xml
  },
  { 
  :collection => 'people/person[@type="client"]',
  :fields => [
    { :name => :first_name,
        :xpath => 'first_name'  },
    { :name => :surname,
        :xpath => 'last_name'  },,
    { :name => :social_security_number,
        :xpath => '@ssn'  },                # @ssn gets the value of the tag attribute ssn
    { :name => :hair_colour,
        :xpath => 'colours/hair'  },,
    { :name => :email }                     # :xpath => "email"
   ]

Apache Combined Log Parser

The Apache combined log parser will be used when the configuration Hash contains a :file element and a :parser element with a value of :apache_combined_log.

The Apache combined log parser does not include any configuration options.

The definition when using the Apache combined log parser is not required and will be ignored. The Apache combined log parser will always inject the following fields into the ETL pipeline:

  • :ip_address
  • :identd
  • :user
  • :timestamp
  • :request
  • :response_code
  • :bytes
  • :referrer
  • :user_agent

Values in the above that are a dash will be converted to nil. The timestamp value will be converted to a Ruby Time instance.

The following elements will be parsed from the :referrer string, if possible, and injected into the ETL pipeline:

  • :scheme
  • :host
  • :port
  • :uri_path
  • :domain

In addition, the following elements will be parsed from the :user_agent string if possible and injected into the ETL pipeline:

  • :browser
  • :browser_version_major
  • :browser_version_minor
  • :ostype
  • :os_version
  • :os

Custom Parser

You can also define your own parser by extending ETL::Parser::Parser. For example:

class MyParser < ETL::Parser::Parser
  def each
    [{:name => "Bob"},{:name => "Jane"},{:name => "Joe"}].each do |row|
      yield row
    end
  end
end

source :in, {
  :file => "",
  :parser => MyParser
}, 
[ 
  :name
]

Database Extraction

Database extraction uses the ActiveRecord library to extract data from an operational database. As described in the beginning of this section, the data will be extracted to a local file first prior to being processed through the ETL pipeline.

Database connections always require the following configuration elements:

:target
The target database connection.
:table
The name of the table to extract from.

In addition, the following optional elements may be provided:

:join
Specify the join portion of the query that will be used to extract the data. The value must be a SQL fragment if specified.
:select
Specify the select portion of the query that will be used to extract the data. Defaults to an asterisk. The value must be a SQL fragment if specified.
:group
Specify the group by portion of the query that will be used to extract the data. The value must be a SQL fragment if specified.
:order
Specify the order by portion of the query that will be used to extract the data. The value must be a SQL fragment if specified.
:new_records_only
Specify the column to use when comparing timestamps against the last successful ETL job execution for the current control file.
You may need to set ActiveRecord::Base.default_timezone = :utc in a config file if your source timestamps are UTC (Rails default behavior).
Note that this will also affect the ETL execution database.
:conditions
Specifies an SQL fragment that will be added to the WHERE clause to filter the rows that will be extracted
:query
Specifies a raw SQL query. This can be used as an alternative to the configurations above.

Given the following database table:

create table people (
  first_name varchar(50),
  last_name varchar(50),
  email varchar(150)
);

The source might look like this:

source :in, {
  :type => :database,
  :target => :operational_database
  :table => "people"
},
[ 
  :first_name,
  :last_name,
  :email
]

In versions before 0.9, a common pattern is to grab the configuration YAML from ActiveRecord::Base"s configuration Hash and merge that with the other options to be passed to the database source:

operational_db = ActiveRecord::Base.configurations["operational"].symbolize_keys
source :in, operational_db.merge({
  :database => "my_app",
  :table => "people"
}),
[ 
  :first_name,
  :last_name,
  :email
]

As of 0.9 this will no longer work. Rather you should use the :target option to select a named connection that is defined in database.yml as shown in the first example.

The options for :join, :select, :group, :conditions and :order are useful for tailoring the query to get only the results you need to push through the ETL pipeline:

source :in, {
  :type => :database,
  :target => :operational_database
  :table => "people",
  :join => "addresses on people.address_id = addresses.id",
  :select => "people.email, addresses.city, addresses.state, people.created_at"
  :conditions => "people.unsubscribed = 0 AND people.date_of_death IS NULL"
  :order => "people.created_at"
},
[ 
  :email,
  :city,
  :state
]

Large Resultsets and MySQL

When using MySQL as a database source you can run into problems if you want to pull a large number of records from a resultset (>100,000). This is due to the way the Ruby MySQL drivers work, they attempt to build in memory representations of all the results before returning them to Ruby. This can be very heavy on memory and also means that you can’t start processing any records until all are returned. A workaround exists, this allow us to bypass the Ruby MySQL gem and use the mysql command line tool directly to pull the data into Activewarehouse ETL. We use the —quick option on the mysql command line which stops it trying to build in memory representations of all the data. To use it simple add the option :mysqlstream => true.

source :in, {
  :type => :database,
  :target => :operational_database
  :table => "people",
  :join => "addresses on people.address_id = addresses.id",
  :select => "people.email, addresses.city, addresses.state, people.created_at"
  :conditions => "people.unsubscribed = 0 AND people.date_of_death IS NULL"
  :order => "people.created_at",
  :mysqlstream => true
},
[ 
  :email,
  :city,
  :state
]

Enumerable Extraction

The enumerable source provides a means of injecting records into the ETL pipeline from any object that mixes in the Ruby Enumerable module. For example:

source :default, { :type => :enumerable, :enumerable => [
  {
    :first_name => "Bob",
    :last_name => "Smith",
    :email => "bsmith@foo.com"
  },
  {
    :first_name => "Jane",
    :last_name =>  "Doe",
    :email => "jdoe@bar.com"
  }
]}

This feature is often used to inject a default record into the head of the ETL pipeline when populating a dimension. For example, if you had records in your operational database tied to people and if it is possible for the record to have a null reference in it"s person_id foreign key, then you might want an “Unknown” person:

source :default, { :type => :enumerable, :enumerable => [
  {
    :first_name => "Unknown",
    :last_name => "Unknown",
    :email => "Unknown"
  }
]}

Read Locally

When the —read-locally option is specified on the command line during the etl execution the source will bypass querying the actual source and will instead use the last successfully downloaded source cache file in source_data.

Transform

The transformation portion of the ETL pipeline converts data in all kinds of ways to help standardize the way the data is recorded in the target system. ActiveWarehouse ETL supports two ways to transform, either by field transforms or by row processors. There are quite a few built in transforms and processors and making your own is a matter of using a block or creating a custom transform class or row processor class.

Field Transforms

Field-level transforms are only applied to a single field in each row. Any transform of this type must implement the method transform(name, value, row) where name is the field name, value is the current value of the field and row is the entire row (in case the transformation depends on other fields in the row). The transform must return the value for the field after the transform is complete

Field transforms are defined in one of two ways. For built-in transforms they are defined by name:

transform :ssn, :sha1, {}

The first argument is the field name, the second is the name of the transform and the third is an options Hash.

The second form uses a block or class:

transform(:ssh){|n,v,r| v.substring(0,24) }

The block takes three arguments, the name, the value and the row. The block must return a single value that is the new, transformed value.

Note that you can transform a field as many times as you want. Also note that the order in which transforms appear is the order in which they will be executed.

For example:

transform :ssh, :sha1
transform(:ssh){|n,v,r| v.substring(0,24)}
transform :ssh, :default, :default_value => "Unknown"

Decode Transform

Uses an external decode file to convert from coded value into something that is consistent and understandable without needing to know how coded values are decoded.

For example:

transform :sex, :decode, :decode_table_path => "sex_decode.txt"

Default Transform

This transform will return the given default value if the value passed in is blank, according to the rules for blank?, otherwise it will just return the original value.

For example:

transform :name, :default, :default_value => "No Name"

If the value is either nil or an empty String then the row value would be changed to “No Name”, otherwise the original value would be returned.

Date-to-String Transform

This transform converts a date into a formatted String. Options:

:format
A format string using the built in Ruby date formatting syntax.

For example:

transform :created_at, :date_to_string, :format => "%m/%d/%Y"

String-to-Date Transform

This transform converts a String into a Ruby Date instance.

String-to-Time Transform

This transform converts a String into a Ruby Time instance.

Foreign Key Lookup Transform

Transform a natural key into its foreign key equivalent.

For example, given:

  • the field attendee_id initially contains the string value “My Name is Smith” ;
  • the target model ActiveRecord class is Attendee; and
  • the attribute of the target model that defines the unique constraint is display_name

Then to replace the string value with the actual id of the belongs_to association:

transform :attendee_id, :foreign_key_lookup,
  {
    :resolver => ActiveRecordResolver.new(
      Attendee, :find_by_display_name
    ),
    :default => nil
  }

After the transform :foreign_key_lookup is finished the field :attendee_id will either contain the id value of the found Attendee row or nil.

If the :default option is not provided then a warning will be generated on the console if the lookup fails. In that case the field value will be changed to nil.

A :cache option is also available that will preload the FK mappings if the underlying resolver supports it. This is currently supported by SQLResolver.

Built-in resolvers

The foreign key lookup transform includes a number of built-in resolvers (although you can write your own easily):

  • ActiveRecordResolver
  • SQLResolver
  • IncrementalCacheSQLResolver
  • FlatFileResolver

Please read foreign_key_lookup_transform.rb for more information on required parameters.

Hierarchy Lookup Transform

Resolve a field up its hierarchy until a non-null value is found. This transform expects the operational system have the notion of a parent_id field which can be used to walk up the tree.

Configuration options:

:target
The target connection (REQUIRED)
:table
The table to search (REQUIRED)
:parent_id_field
The parent ID field, defaults to :parent_id

For example:

transform :fearless_leader, :hierarchy_lookup, 
  :target => :operational_database, 
  :table => 'people', 
  :parent_id_field => :p_id

If :parent_id_field is not specified then :parent_id will be used.

Ordinalize Transform

Convert a number into its ordinalized form. For example:

transform :creation_order, :ordinalize

If creation_order is 12 then it would be changed to 12th.

SHA-1 Transform

Pass the value through a one-way SHA-1 Hash.

For example:

transform :ssn, :sha1

Trim Transform

Trim the value using Ruby’s #strip method.

For example:

transform :name, :trim

Note that you are responsible to ensure that the field :name is not nil before calling :trim. If it is nil then this transform will throw an error since the Nil class does not possess the #strip method. An alternative way of safely doing the same thing is:

transform(:name) { |n,v,r| v.to_s.strip }

or you can first cast the field into a string by using the :type transform explained below:

transform :name, :type, :type => :string
transform :name, :trim

Type Transform

Convert the value to a specific type.

For example:

transform :age, :type, :type => :number

The supported types are:

  • :integer or :number converts using to_i
  • :string converts using to_s
  • :float converts using to_f
  • :decimal converts using BigDecimal, the :significant option can be used to specify the number of significant places to keep

Row Processors

Row processors are like transforms except they work on an entire row rather than just a single field. Row processors accept the current row as an argument and may return 0 to N number of rows after the transformation. This means that row transforms can be used to filter out rows which should not be kept, modify multiple dependent fields in the row at once, add or remove fields from the row or even take a single row and return many rows (which is used when exploding hierarchies).

Row processors can be applied either directly after the data is read from the source by specifying the processor using after_read or directly before the data is written to the destination by using before_write.

Check Exist Processor

Row processor that checks to see whether or not the row already exists in the destination data store, then removes the row from the pipeline unless it already exists in the destination.

Options:

:table
The name of the table to check for existence of a row (REQUIRED).
:target
The target connection (REQUIRED).
:skip
A symbol or array of symbols that should not be included in the existence check. If this option is not specified then all of the columns will be included in the existence check (unless :columns is specified).
:columns
An array of symbols for columns that should be included in the query conditions. If this option is not specified then all of the columns in the row will be included in the conditions (unless :skip is specified).

If there are no rows in the target table then this processor will be bypassed. If there are rows then each row will be checked against the target database table.

Note that this transform actually goes to the target database each time through so it will be a significant bottleneck in processing.

As an example, given a product dimension with the attributes Brand and Category:

  before_write :check_exist, :target => :data_warehouse, :table => "product_dimension"

This would check each row with a query:

  SELECT * FROM product_dimension WHERE brand = "the_brand" AND category = "the_category"

Where the_brand and the_category are the brand and category, respectively, for each row.

If you decided that only the brand name is a natural key, then you could do:

  before_write :check_exist, :table => "product_dimension", :columns => [:brand]

Would result in the query:

  SELECT * FROM product_dimension WHERE brand = "the_brand"

Check Unique Processor

Row processor that checks to see whether or not the row is unique. This processor stores a cache of compound keys and checks to see if the compound key exists in the cache. If it does then it will not return the row, thus removing duplicates.

Options:

:keys
An array of keys to use as the compound key.

For example:

before_write :check_unique, :keys => [:first_name, :last_name, :email]

If the row {:first_name => "John", :last_name => "Smith", :email => "jsmithfoo.com"}@ appeared twice during the ETL process only the first instance would be written to the destination.

Copy Field Processor

The copy field processor will copy the value of a field to another field.

For example:

after_read :copy_field, :source => :name, :dest => :full_name

There is also a shortcut for this:

copy :name, :full_name

Note that the copy shortcut executes as an after_read processor. If you need to copy a field after applying other transforms then you will need to use the :copy_field processor explicitly.

Ensure Fields Presence Processor

The ensure fields presence processor will ensure each row includes at least the specified keys.

For example:

after_read :ensure_fields_presence, { :fields => [:first_name, :last_name]}

If one row doesn’t have a :first_name key, the processing will raise an error on this row.

Note that the value provided for the key can be nil without raising an error.

Rename Field Processor

The rename field processor renames the specified field.

For example:

after_read :rename_field, :source => "name", :dest => "customer_name"

There is also a shortcut for this:

rename :name, :customer_name

Note that the rename shortcut executes as an after_read processor. If you need to rename a field after applying other transforms then you will need to use the :rename_field processor explicitly.

Hierarchy Exploder

This processor will convert a Rails-style acts_as_tree hierarchy into a dimensional bridge which is common to data warehouse systems.

Options:

:target
The target connection
:table
The database table to use
:id_field
The name of the id field, defaults to :id
:parent_id_field
The name of the parent id field, defaults to :parent_id

Print Row Processor

A debugging processor that will print the row.

Sequence Processor

Generate the next number in a context-sensitive sequence. The context can be specified as a configuration option, thus allowing independently incrementing sequences.

:dest
The field that will receive the incrementing sequence.
:context
A context name. The default is the current ETL run.

Example:

before_write :sequence, :dest => :product_number

Surrogate Key Processor

The surrogate key processor is used to generate surrogate keys for dimension records.

:target
The target connection
:table
The table to find the initial surrogate key.
:column
The column name for find the initial surrogate key.
:destination
The field that will hold the surrogate key (defaults to :id)

Require Non Blank Processor

This processor requires that the specified fields are not blank, otherwise it will throw away the current row.

  before_write :require_non_blank, :fields => [:first_name, :last_name]

Load

The final part of the ETL process is the loading of the data into the destination system. ActiveWarehouse ETL supports two types of destinations: file and database. In almost every case you should use the file destination to write out the processed data and then use a bulk loader via a post processor to load the data.

All destinations have some common options:

:buffer_size
The amount of rows to buffer before flushing. The default is 100.
:condition
A block that must return true in order for the row to be written. The block argument is the row itself.
:append_rows
Array of rows to append.

File Destination

A file destination will be used if the destination configuration includes a :file element.

Options:


:append

Set to true to append to the output file rather than overwrite it
destination :out, {
  :file => 'people_dimension.txt',
  :append => true
},
{
  :order => [:id, :first_name, :last_name, :email]
}

Note: the :order option also specifies which fields will be included in the output file.

Database Destination

A database destination will be used if the destination configuration includes a :database element. The configuration options are:

:database
The database to connect to.
:target
The target connection.
:table
The table to write to.
:truncate
Set to true to truncate prior to storing the data.
:unique
Set to true to only store unique records.
:append_rows
Array of rows to append

Virtual Fields

Virtual fields can be added to the final output by specifying the :virtual Hash in the destination configuration. For example:

destination :out, {
  :file => 'people_dimension.txt'
},
{
  :order => [:id, :first_name, :last_name, :email, :loaded_at],
  :virtual => {
    :id => :surrogate_key,
    :loaded_at_ => Time.now
  },
}

Another example, this time specifying the query for the initial surrogate key:

:virtual => {
  :id => ETL::Generator::SurrogateKeyGenerator.new(
    :query => 'SELECT MAX(id) FROM product_dimension'
  )
}

Slowly Changing Dimensions

Slowly changing dimensions require special treatment in a data warehouse. There are 3 types of slowly changing dimensions. Type 1 replaces old values when an update to a record occurs. Type 2 creates a new row and changes the expiration_date for the original record to the date the change occurred. Type 3 slowly changing dimensions has one column for the original value and one column for the current value.

Currently ActiveWarehouse ETL supports type 1 and type 2 slowly changing dimension implementation. Note that type 2 slowly changing dimensions will be relatively slow because they must delete records during execution to avoid duplicates.

Population of slowly changing dimensions can be implemented by specifying the :scd hash as well as the :natural_key and :scd_fields arrays in the destination configuration. For example, the following implements a type 2 SCD:

destination :dimension_out, {
  :type => :database,
  :target => :database_target,
  :database => "database_name",
  :table => "dimension_table_name",
  :truncate => false,
  :natural_key => [ :customer_code ],
  :scd_fields => [ :customer_address,
                   :customer_city,
                   :customer_zone,
                   :customer_postal_code,
                   :customer_country ],
  :scd => {
    :dimension_target => :database_target,
    :dimension_table => "dimension_table_name",
    :type => 2,
    :effective_date_field => :effective_start,
    :end_date_field => :effective_end,
    :latest_version_field => :latest
  }
},
{ :order => [
    :id,
    :customer_code,
    :customer_address,
    :customer_city,
    :customer_zone,
    :customer_postal_code,
    :customer_country,
    :effective_start,
    :effective_end,
    :latest ],
  :virtual => {
    :id => ETL::Generator::SurrogateKeyGenerator.new( :target => :database_target,
                                                      :table => "dimension_table_name",
                                                      :column => "id" )
  }
}
:natural_key
An array of symbols corresponding to the columns that uniquely identify a member.
:scd_fields
An array of symbols corresponding to the columns that can change with respect to that entity (in the above example it consisted of customer address data)
:scd
A hash containing SCD workflow configuration data.

Within the :scd hash:

:dimension_target
database connection (defined in database.yml) where the dimension table lives.
:dimension_table
String containing the name of the dimension table. This is necessary (especially when using a file destination) because activewarehouse-etl must query the table during execution.
:type
Integer corresponding to the type of change to be implemented on the dimension as a whole. Currently only 1 or 2 is supported.
:effective_date_field
The name of the column that represents the “start date” of the record.
:end_date_field
The name of the column that represents the “end date” of the record. This will be set to the date 9999-12-31 for currently active members.
:latest_version_field
The name of the column that holds a boolean indicating whether the record is the latest version (each member, defined by the natural key, will have only one record with this column set to true)
:non_evolving_fields
An array of columns that should not have any changes applied (source changes will be lost). The primary key is added to this array during the etl process. This is currently only valid for type 1 SCD changes.

Pre Processors

Pre-processors are executed before the ETL pipeline begins.

Truncate Pre-Processor

Truncate the target table before the ETL process starts.

Post Processes

Post-processes execute after all of the data has been processed through the ETL pipeline and has been written to the destination(s).

Bulk Import

The bulk import post processor uses adapter extensions to load data using native bulk load tools. Currently MySQL, PostgreSQL and MSSQL Server are supported.

The following is an example of executing a bulk import:

post_process :bulk_import, {
  :file => 'person_dimension.txt',
  :columns => [:first_name, :last_name, :email],
  :field_separator => ',',
  :target => :data_warehouse,
  :table => 'person_dimension'
}

Encode

The encode processor uses the Ruby Iconv library to convert a file from one encoding to another, line by line.

Here is an example:

post_process :encode, { 
  :source_file => 'date_dimension.txt',
  :source_encoding => 'utf-8',
  :target_file => 'date_dimension_latin1.txt',
  :target_encoding => 'latin1'
}

Screens

Screens provide a means for checking data validity after data has passed through the ETL pipeline. Screens can be configured to either report a warning message, raise an error that causes just the current job to terminate, or exit the system completely if the error is fatal.

Testing inside screen blocks is accomplished using the assertion methods available in Test::Unit::Assertions from the Ruby core library.

Screens come in two flavours.

screen

“Regular” screens are executed before post-processes, to provide an opportunity to detect an error before the data is loaded into the production system.

  screen(:warning) {
    op_conn = ETL::Engine.connection(:operational_database)
    dw_conn = ETL::Engine.connection(:data_warehouse)
    assert_equal op_conn.select_value("SELECT count(*) FROM people").to_i + 1, dw_conn.select_value("SELECT count(*) FROM person_dimension").to_i
  }

In the example above, two connections are retrieved, one to the operational database and one to the data warehouse. Then a SQL query is executed against each connection and the results are tested for equality. Note that the result from the operational database is incremented by 1, which would be common if you include an “Unknown” person in the person_dimension to handle cases of people missing from the dimension but referenced in a fact record.

after_post_process_screen

You can also test out things after the post-processes have been executed, dependending on your needs.

  after_post_process_screen(:fatal) {
    ...
    assert_equal ...
  }

Execution Engine

The ETL process is controlled by the execution engine. The engine coordinates the execution of the control file and captures metadata about the execution. Some of the metadata in only known during the execution while other metadata is persisted beyond the current ETL execution.

Statistics

The ETL engine collects and reports some interesting statistics each time it runs. These statistics include:

  • The number of lines read from sources
  • The number of lines written to destinations
  • Total execution time
  • The average number of rows processed per second
  • Average number of rows processed in after_reads
  • Average number of rows processed in before_writes
  • Average number of rows processed in writes
  • Average time writing execution records

These statistics are output each time the ETL process is executed and can provide valuable information on the location of bottlenecks in the ETL process.

Here is an example of output from an ETL process:

Using AdapterExtensions
Starting ETL process
Processing upload_fact.ctl
Source: localhost/reports/products
.....................................
Executing post processes
Post-processing complete

Read 37705 lines from sources
Wrote 37705 lines to destinations
Completed upload_fact.ctl in 1 minutes, 35 seconds with 0 errors.
Processing average: 424.78251320913 rows/sec)
Avg after_reads: 69589.5558703258 rows/sec
Avg before_writes: 65219.6208387736 rows/sec
Avg transforms: 534.787733802788 rows/sec
Avg writes: 10479.4134304288 rows/sec
Avg time writing execution records: 0
ETL process complete

Persistent Job History

Batch and job data is persisted automatically to the ETL execution database.

Command-Line Tool

When you install the ActiveWarehouse ETL Gem the etl script will automatically be installed in your bin path. This script is used to execute ETL control files and has several options for customizing how the control file is executed.

Command-Line Options

—help or -h
Display a help message.
—config or -c
Specify the database.yml config file to use for the ETL execution engine’s persistent data. This will default to database.yml if it is not specified.
—limit N or -l N
Specify the limit. This is used to process only N records before exiting and is useful for debugging issues with a particular data source. Note that for database sources this will actually modify the SQL query so the limit is part of the query.
—offset N or -o N
Specify the offset. This is used to skip N records before beginning the processing and is useful when debugging issues with a particular data source. Note that for database sources this will actually modify the SQL query so it starts at the specified offset.
—newlog or -n
Force a new etl.log file to be created. If this option is not specified then log entries will be appended to etl.log.
—skip-bulk-import or -s
Do not execute any of bulk import processes. This is useful if you are debugging and do not want to actually load the data into the target database (assuming you are loading using a bulk loader).
—read-locally
Read from the last available locally stored source file rather than from the actual source.
—rails-root
Absolute path to the Rails application root folder. Will try and load the config/database.yml file from that appication

Execution Example

An example of executing an ETL control file:

etl person_dimension.ctl

Patches

In order to get proper logging in ActiveSupport you may need to apply the active_support_logger.patch included in the release. This patch has been applied to the ActiveSupport library and will be included in the next release, however this patch is important for backwards compatibility.

License

Copyright (c) 2006-2009 Anthony Eden

Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software 
without restriction, including without limitation the rights to use, copy, modify, merge, 
publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons 
to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or 
substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, 
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 
PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE 
FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 
DEALINGS IN THE SOFTWARE.