# Beginners guide to data engineering - Part I

## The Hierachy of Analytics 

*Think of Artificial Intelligence as the top of a pyramid of needs. Yes, self-actualization (AI) is great, but you first need food, water, and shelter (data literacy, collection, and infrastructure).*

** Use the bottom up approch **

![data_engineering_pyramid.png](attachment:data_engineering_pyramid.png)

## What is data engineering 

*Data engineering field could be thought of as a superset of business intelligence and data warehousing that brings more elements from software engineering. This discipline also integrates specialization around the operation of so called “big data” distributed systems, along with concepts around the extended Hadoop ecosystem, stream processing, and in computation at scale.*

![data_engineering_concept.png](attachment:data_engineering_concept.png)

## ETL: Extract, Transform and Load (Batch Processing Data)

ETL, which stands for Extract, Transform, and Load. These three conceptual steps are how most data pipelines are designed and structured. They serve as a blueprint for how raw data is transformed to analysis-ready data.

![data_engineering_etl.png](attachment:data_engineering_etl.png)

**Extract**: this is the step where sensors wait for upstream data sources to land (e.g. a upstream source could be machine or user-generated logs, relational database copy, external dataset … etc). Upon available, we transport the data from their source locations to further transformations.

**Transform**: This is the heart of any ETL job, where we apply business logic and perform actions such as filtering, grouping, and aggregation to translate raw data into analysis-ready datasets. This step requires a great deal of business understanding and domain knowledge.

**Load**: Finally, we load the processed data and transport them to a final destination. Often, this dataset can be either consumed directly by end-users or it can be treated as yet another upstream dependency to another ETL job, forming the so called data lineage

** Author recommand using functional programming to declare DAGs since functional programming is stateless. **



## Choosing ETL Framework

**Configuration**: ETLs are naturally complex, and we need to be able to succinctly describe the data flow of a data pipeline. As a result, it is important to evaluate how ETLs are authored. Is it configured on a UI, a domain specific language, or code? Nowadays, the concept of configuration as code is gaining popularity, because it allows users to expressively build pipelines programmatically that are customizable.

**UI, Monitoring, Alerts**: Long running batch processes inevitably can run into errors (e.g. cluster failures) even when the job itself does not have bugs. As a result, monitoring and alerting are crucial in tracking the progress of long running processes. How well does a framework provide visual information for job progress? Does it surface alerts or warnings in a timely and accurate manner?

**Backfilling**: Once a data pipeline built, we often need to go back in time and re-process the historical data. Ideally, we do not want to build two separate jobs, one for backfilling historical data and another for computing current or future metrics. How easy does a framework support backfilling? Can it do so in a way that is standardized, efficient, and scalable? All these are important questions to consider.

## Two Paradigms : SQL-Centric vs JVM-Centric ETL

**JVM-centric ETL**: Is typically built in a JVM-based language (like Java or Scala). Engineering data pipelines in these JVM languages often involves thinking data transformation in a more imperative manner, e.g. in terms of key-value pairs. Writing User Defined Functions (UDFs) are less painful because one does not need to write them in a different language, and testing jobs can be easier for the same reason. This paradigm is quite popular among engineers.

**SQL-centric ETL**: Is typically built in languages like SQL, Presto, or Hive. ETL jobs are often defined in a declarative way, and almost everything centers around SQL and tables. Writing UDFs sometimes is troublesome because one has to write it in a different language (e.g. Java or Python), and testing can be a lot more challenging due to this. This paradigm is popular among data scientists.

**SQL-centric ETL way tends to ease up data engineering understanding**

# Beginners guide to data engineering - Part II

## Data Modeling

*A design process where one carefully defines table schemas and data relations to capture business metrics and dimensions.*

#### Online transaction processing (OLTP)

A good example of the OLTP would be a website were user have their profile stored and they want to gain access to it.



*In order to serve them accurately and on time to users, it is critical to optimize the production databases for **online transaction processing (OLTP)***

#### Online analytical processing system (OLAP)

The designer need to focus on insight generation, meaning analytical reasoning can be translated into queries easily and statistics can be computed efficiently. This analytics-first approach often involves a design process called data modeling

### Data Modeling, Normalization, and Star Schema

Design decision could be to which extent should we normalize table in the databases

Normalization implies simpler schemas, more standardized data, and carry less redundancy which mean querying pattern will be more complex

Much easier to query on a denormalized table (joint), data processing for large table is sloweer and involves more upstream dependencies.This makes maintenance of ETL Pipeline more difficult.

**The star schema is a data design patterns in which tables are in a star-like pattern.**


### The star schema

Focus on building normalized tables, specifically fact and dimensions tables. When needed, denormalized tables can be built from thes smaller normalized tables. The design strives for a balance between ETL maintainability and ease of analytics.

![data_engineering_star_schema.png](attachment:data_engineering_star_schema.png)

### Facts & Dimension Tables

**Fact tables** typically contain point-in-time transactional data. **Each row in the table can be extremely simple and is often represented as a unit of transaction**. Because of their simplicity, **they are often the source of truth tables from which business metrics are derived**. For example, at Airbnb, we have various fact tables that track transaction-like events such as bookings, reservations, alterations, cancellations, and more.

**Dimension tables** typically contain slowly changing attributes of specific entities, and attributes sometimes can be organized in a hierarchical structure. These attributes are often called “dimensions”, and can be joined with the fact tables, as long as there is a foreign key available in the fact table. At Airbnb, we built various dimension tables such as users, listings, and markets that help us to slice and dice our data.

## Data Partitioning

*A practice that enables more efficient querying and data backfilling*

### Data partitioning by datestamp

 In addition to following SQL best practices such as “filter early and often”, “project only the fields that are needed”, one of the most effective techniques to improve query performance is to partition data.

** Main idea: Instead of storing all the data in one chunck, we break it up into independent, self-contained chunks. Data from the same chunk will be assigned to the same partition key, which means that any subset of the data can be lokked up extremely quickly.This technique can greatly improve query perfomance. **

One common partition key is datestamp (ds for short) because:
* ETL jobs are base on datestamp
* Many analytical questions involve counting events that occured in a specific time range

### Backfilling Historical Data

Using datestamp as partition key ease backfiling of data (Usefull to revisit the historical trends and movements)

** Backfilling is so common that Hive built in the functionality of dynamic partitions, a construct that perform the same SQL operations over many partitions and perform multiple insertions at once **

 The beauty of dynamic partitions is that we wrap all the same work that is needed with a GROUP BY ds and insert the results into the relevant ds partitions all at once. This query pattern is very powerful and is used by many of Airbnb’s data pipelines. In a later section, I will demonstrate how one can write an Airflow job that incorporates backfilling logic using Jinja control flow.

## The Anatomy of an Airflow Pipeline

### Defining the Directed Acyclic Graph (DAG)

* In real life ETL jobs are often complex, consisting of many combinations of E,T,L Task
* **Visualization of the data flow using graph is often usefull**

![data_engineering_etl_graph.png](attachment:data_engineering_etl_graph.png)

**DAGs describe how to run data pipeline.**

* **Node**: Represent a task
* **Arrow**: Represent a dependency on another task

A nice feature of Airflow is **Code as Configuration** (This graph is a representation of the code)


![data_engineering_etl_graph_airflow.png](attachment:data_engineering_etl_graph_airflow.png)

### Operators: Sensors, Operators and Transfer

***Operators describe what to do in a data pipeline.***

**Sensors**: waits for a certain time, external file, or upstream data source

* Unblock the data flow after a certain time, or after data source become available ex:**NamedHivePartitionSensors**


**Operators**: triggers a certain action (e.g. run a bash command, execute a python function, or execute a Hive query, etc)

* Trigger data transformation ex: **HiveOperator**, **BashOperator**, **PythonOperator**
* Once instanciated they become **Task** and should be **deterministic ** and **idempotent**

**Transfers**: moves data from one location to another

* ex: **MySqlToHiveTransfer**, **S3ToHiveTransfer**

### A simple example

In [None]:
"""
A DAG docstring might be a good way to explain at a high level
what problem space the DAG is looking at.
Links to design documents, upstream dependencies etc
are highly recommended.
"""
from datetime import datetime, timedelta
from airflow.models import DAG  # Import the DAG class
from airflow.operators.sensors import NamedHivePartitionSensor
from airflow.operators.hive_operator import HiveOperator

### You can import more operators as you see fit!
# from airflow.operators.bash_operator import BashOperator
# from airflow.operators.python_operator import PythonOperator

# setting some default arguments for the DAG
default_args = {
    'owner': 'you',
    'depends_on_past': False,
    'start_date': datetime(2018, 2, 9),
}

# Instantiate the Airflow DAG
dag = DAG(
    dag_id='anatomy_of_a_dag',
    description="This describes my DAG",
    default_args=default_args,
    schedule_interval=timedelta(days=1))   # This is a daily DAG.

# Put upstream dependencies in a dictionary
wf_dependencies = {
    'wf_upstream_table_1': 'upstream_table_1/ds={{ ds }}',
    'wf_upstream_table_2': 'upstream_table_2/ds={{ ds }}',
    'wf_upstream_table_3': 'upstream_table_3/ds={{ ds }}',
}

# Define the sensors for upstream dependencies
for wf_task_id, partition_name in wf_dependencies.iteritems():
    NamedHivePartitionSensor(
        task_id=wf_task_id,
        partition_names=[partition_name],
        dag=dag
    )

# Put the tasks in a list
tasks = [
    ('hql', 'task_1'),
    ('hql', 'task_2'),
]

# Define the operators in the list above
for directory, task_name in tasks:
    HiveOperator(
        task_id=task_name,
        hql='{0}/{1}.hql'.format(directory, task_name),
        dag=dag,
    )

# Put the dependencies in a map
deps = {
    'task_1': [
        'wf_upstream_table_1',
        'wf_upstream_table_2',
    ],
    'task_2': [
        'wf_upstream_table_1',
        'wf_upstream_table_2',
        'wf_upstream_table_3',
    ],
}

# Explicitly define the dependencies in the DAG
for downstream, upstream_list in deps.iteritems():
    for upstream in upstream_list:
        dag.set_dependency(upstream, downstream)

## ETL Best Practices

 **Partition Data Tables** : Data partitioning can be especially useful when dealing with large-size tables with a long history. When data is partitioned using datestamps, we can leverage dynamic partitions to parallelize backfilling.

In [None]:
CREATE TABLE IF NOT EXISTS fct_bookings (
    id_listing BIGINT COMMENT 'Unique ID of the listing'
  , id_host    BIGINT COMMENT 'Unique ID of the host who owns the listing'
  , m_bookings BIGINT COMMENT 'Denoted 1 if a booking transaction occurred'
)
PARTITION BY ( -- this is how we define partition keys
  ds STRING
);

**Load Data Incrementally** : Make your ETL more modular and manageable, especially when building dimension tables from the fact tables. In each run, we only need to append the new transactions to the dimension table from previous date partition instead of scanning the entire fact history.

In [None]:
- Not Recommended Approach: Scan the entire table and rebuild everyday
INSERT OVERWRITE TABLE dim_total_bookings PARTITION (ds = '{{ ds }}')
SELECT
     dim_market
  ,  SUM(m_bookings) AS m_bookings
FROM
  fct_bookings
WHERE
  ds <= '{{ ds }}' -- this is expensive, and can quickly run into scale issue
GROUP BY
  dim_market
;

-- Recommended Approach: Incremental Load
INSERT OVERWRITE TABLE dim_total_bookings PARTITION (ds = '{{ ds }}')
SELECT
    dim_market
  , SUM(m_bookings) AS m_bookings
FROM (
  SELECT
      dim_market
    , m_bookings
  FROM
    dim_total_bookings            -- a dim table
  WHERE
    ds = DATE_SUB('{{ ds }}', 1)  -- from the previous ds

  UNION
  
  SELECT
      dim_market
    , SUM(m_bookings) AS m_bookings
  FROM
    fct_bookings                  -- a fct table
  WHERE
    ds = '{{ ds }}'               -- from the current ds
  GROUP BY
     dim_market
) a
GROUP BY
  dim_market
;

**Enforce Idempotency** : Many data scientists rely on point-in-time snapshots to perform historical analysis. This means the underlying source table should not be mutable as time progresses, otherwise we would get a different answer. Pipeline should be built so that the same query, when run against the same business logic and time range, returns the same result. This property has a fancy name called Idempotency.



**Immutable data along with versioned logic are key to reproducibility.**

**Parameterize Workflow**: Just like how templates greatly simplified the organization of HTML pages, Jinja can be used in conjunction with SQL. As we mentioned earlier, one common usage of Jinja template is to incorporate the backfilling logic into a typical Hive query. Stitch Fix has a very nice post that summarized how they use this technique for their ETL. https://multithreaded.stitchfix.com/blog/2017/07/06/one-weird-trick/

In [None]:
{%- if backfill %}
INSERT OVERWRITE TABLE bookings_summary PARTITION (ds)
{%- else %}
INSERT OVERWRITE TABLE bookings_summary PARTITION (ds = '{{ ds }}')
{%- endif %}
SELECT
    dim_market
  , SUM(m_bookings) AS m_bookings
  {%- if backfill %}
  , ds
  {%- endif %}
FROM
  fct_bookings
WHERE
{%- if backfill %}
  ds BETWEEN '{{ start_date }}' AND '{{ end_date }}'
{%- else %}
  ds = '{{ ds }}'
{%- endif %}
GROUP BY
    dim_market
{%- if backfill %}
  , ds
{%- endif %}
;

**Add Data Checks Early and Often**: When processing data, it is useful to write data into a staging table, check the data quality, and only then exchange the staging table with the final production table. At Airbnb, we call this the **stage-check-exchange paradigm**. Checks in this 3-step paradigm are important defensive mechanisms — they can be simple checks such as counting if the total number of records is greater than 0 or something as complex as an anomaly detection system that checks for unseen categories or outliers.

In [None]:
# Define the CREATE TABLE statement here
{%- macro create_table() %}
...
{%- endmacro %}

# Main ETL logic, insert the results into a STAGING table
{%- macro main() %}
...
{%- endmacro %}

# A series of simple presto CHECKS on the staging table
{%- macro health_checks() %}
...
{%- endmacro %}

# Finally, EXCHANGE the staging table with the prod table

**Build Useful Alerts & Monitoring System** : Since ETL jobs can often take a long time to run, it’s useful to **add alerts and monitoring** to them so we do not have to keep an eye on the progress of the DAG constantly. Different companies monitor DAGs in many creative ways — at Airbnb, we regularly use EmailOperators to send alert emails for jobs missing SLAs. Other teams have used alerts to flag experiment imbalances. Yet another interesting example is from Zymergen where they report model performance metrics such as R-squared with a SlackOperator.

** Best ETL Practices by Maxime Beauchemin **: https://www.youtube.com/watch?v=dgaoqOZlvEA&feature=youtu.be

# Beginners guide to data engineering - Part III

The **antidote** to complexity is the power of **abstraction** This principle, of course, is no exception when it comes to data engineering.

In data engineering, abstraction often means identifying and automating **ETL patterns that are common in peoples' workflows **

We will define the concept of data engineering framework and disclose some examples frequently used @ Airbnb

## A Common Scenario

* Working in a compagny where different team work on the different part of a product each team has it's own OKR (Objective Key Results)

* Data scientist job in this team is to create a analytics dashboard
* After talking with other scientist it seem to you that the job of making dashboard seems repetitive

## From Pipelines to Frameworks

Since the begin of the articles we have limited ourself to the design of a single standalone pipeline bit we can apply the **same principles to pipeline generation** 

* **Pipeline generation**: A way to programmatically and dynamically generate DAGs on the fly. This is essentially what a ** data engineering framework does** : it generates different instantiations of Airflow DAGs that automate workflows 

Here is a definition by Maxime Beauchemain: 

To build workflows dynamically from code ... A very simple example would be an Airflow script that reads a YAML config file with a list of table names, and creates a little workflow for each table, that may do things like loading the table into a target database, perhaps apply rules from the config file around sampling, data retention, anonymization … Now you have this abstraction … you can create new chunks of workflows without doing much work. It turns out there are tons of use cases for this type of approach.

This framework approach will help your data scientist move up the value chain more quickly by

* Using an experiment reporting framework will help auto-generate user-level metrics and teirby save time to your data scientist. (More time for analysing and interpreting key metrics)
* Generating OLAP tables on the fly, data scientist can spend more time understanding trends, identifying gaps and relaying product changes to business changes.
* Using a framework that abstract away the engineering work required for productionizing an offline batch ML Model. No more packages dependencies , virutal env and deployment job to be done by data scientist

End Game : ** Usage of those framework drastically improve how data scientist work. **

## Design Patterns For Data Engineering Frameworks

 “There really is no magic, when you have done certain task enough times, you started to see patterns that can be automated.”

### Where to start

There are generally three layers of a well-designed data engineering framework: 
* Input layer.
* Data processing layer.
* Output layer.

![common_pattern.png](attachment:common_pattern.png)

* **Input**: Where the end user specifies how their DAGs shoud be configured. User experience really matters here.Typically, the input could be a static configuration file (e.g. YAML or HOCON), or it could be something as elaborate as a web UI. The goal here is to capture user needs.

* **Data Processing**: This is the core of any data engineering framework, ETL pipeline are instanciated dynamically (DAG Factory)

* **Output** : The DAGs generated from previous step create derived data and is often sent into a hive table or a well design web ui or consumed by downstream pipeline.

# Incremental Computation Framework

It is quite common for data scientists to calculate computationally intensive metrics like a cumulative sum or the time since the first or last event.

The naive approach would be to query a fact table and take the sum, max, or min over all date partitions in order to calculate these desired metrics. However, this querying pattern is rather inefficient.

Why? This solution violates the ETL principle of load data incrementally since the required computation scans the entire fact table. Ideally, we would build a summary table to pre-compute these metrics so an end-user only needs to reference the metric in a single or latest date partition of the summary table. This pattern is so common that our data engineer built a framework called Incremental Computation Framework.

* Input : A HOCON config file where a user specifies which metrics or events to pre-compute , which suject key to group by, and which fact table to query from in order to build the summary table.

* Data Processing: An Airflow script that builds the summary table incrementally : namely,union the summary table from the previous date partition with today's fact table to update the expensive metric
* Output : Optimized summary table where cumulative sum, days since first / last envent or other expensive metrics can be quired from one an only on signe date partiton from the summary table.

![workflow.png](attachment:workflow.png)

** This help users to avoid inneficient querying patterns and automates away the tedious aggregation that we otherise would need to do, one date partition at a time **

## Backfill Framework

Backfilling is an important but time-consuming step in any data engineering work.

“I do not ever want to hear the word backfill again”, which gives you some idea how tedious backfilling can be :)

For example, if we need to backfill a few years worth of data, it would be much more efficient to break and parallelize such a task into mini-backfills. However, managing these long-running parallel processes can be rather cumbersome. In addition, we often need to perform sanity checks before inserting backfilled data into a production table. Given that backfilling is such a common but far too often unpleasant experience, we built a Backfill Framework to automate this workflow:

* Input : A simple UI where users can specify the job name, the start_date and end_date of the backfill job , how many process we want to parallelize the backfill for, and jow many days each process should backfill for
* Data Processing: Once a user specifies how the backfill job would be run, the framework creates a Airflow pipeline that automatically parallelizes the backfill tasks, perform sanity-cheks, and swaps staging tables with production tables
* Ouput : A fully backfilled table ready for consumption

![backfill.png](attachment:backfill.png)

** This help us automate away many of the ad-hoc backfilling scripts people have to run on their own machines. It automates quality assurance by setting up automatic comparison.Swap stagin table to production after QA tests. **

## Global Metrics Framework

Up until recently, data scientists at Airbnb spent quite a lot of time when it came to building ETLs for analytics and dashboards. As we discussed earlier, a lot of work here is to identify the correct data sources, to define metrics and dimensions, and to create the final denormalized tables. While different teams might have different key performance metrics and, as a result, different fact tables, important dimensions for the business are usually quite consistent and slowly changing.

For example, data scientists who work on the host-side of the marketplace typically care about dimensional cuts such as the listing’s market, type, or capacity. Similarly, data scientists on the guest-side care about dimensions such as guest stage, origin market, or destination market. With this insight, it became clear that a lot of ETL pipelines actually involved joins of many fact tables with a much smaller set of dimension tables. This is what motivated the creation of Global Metrics Framework.

* Input: A much more involved HOCON config file that specifies one or more metrics in an atomic fact table, dimensin sets that one wishes to includ in the final table , pk , fk to used for joints and a slew of other usefull information to track table creations.
* Data Processing: Identifies the metrics and the dimensional cuts that it need to aggregate and cut by, joiuns the dimension tables with the atomic fact tables to create the denormalized tables automatically.
* Output: One or more Hive tables with the same set of metrics but possibly different sets of dimensions are created. This means that one or more denormalized tables can be created on the fly, and all these data sources are further made available in Druid for visualization in Superset.

https://cdn.embedly.com/widgets/media.html?src=https%3A%2F%2Fwww.youtube.com%2Fembed%2FrpgBge-qJnM%3Ffeature%3Doembed&url=http%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3DrpgBge-qJnM&image=https%3A%2F%2Fi.ytimg.com%2Fvi%2FrpgBge-qJnM%2Fhqdefault.jpg&key=a19fcc184b9711e1b4764040d3dc5c07&type=text%2Fhtml&schema=youtube#

**This framework does automate the common data engineering work that is required for the creation of denormalized tables that could later be used to generate dashboard, analytics and more. "The denormalization machine". **

## Experimentation Reporting Framework

 * Many data-driven technology compagnies have built theur own internal experimentation plateforms, and AirBnB is no exception. 
 
 * Allowing time to build such a framework is worthwhile in the long run since less data scientist will be required.

* Input: Instead of a simple configuration file or a simple UI, a fully fledged UI is built here so users can specify the type of experiment to run, which target or secondary metrics to track, what are the experiment buckets and their relative sizes, etc. Anything that is relevant for launching and computing the experiment data is captured in this step.

* Data Processing: The metrics pipeline that computes, for each experiment, the subject-level metrics and their corresponding dimensions. The sheer combinations of these metrics and dimensions are what makes the computation super complex. In fact, it is often the case that experimentation pipelines are the most complex ETL job at a company.

* Output: Instead of a simple output table, there is a lot of downstream processing involved in this step. For example, statistics such as p-value, confidence interval, significance, and minimum detectable effect are calculated here. Depending on the maturity of the reporting framework, users might be able to do metrics capping or variance reduction. Each step requires a separate calculation before being served in the final UI.

![data_analysis.png](attachment:data_analysis.png)

**This automate : The hundreds and thousands of experiment deep dives that data scientists otherwise need to carry out. **

# Source 

* https://medium.com/@rchang/a-beginners-guide-to-data-engineering-part-i-4227c5c457d7
* https://medium.com/@rchang/a-beginners-guide-to-data-engineering-part-ii-47c4e7cbda71
* https://medium.com/@rchang/a-beginners-guide-to-data-engineering-the-series-finale-2cc92ff14b0

## Other Source


 * https://gtoonstra.github.io/etl-with-airflow/principles.html