# Introduction to Data Engineering

## Chapter 1: Introduction to Data Engineering

### What is data engineering?

#### In comes the data engineer
* Data is scattered
* Not optimized for analyses
* Legacy code is causing corrupt data

**Data Engineer** to the rescue!

#### Data engineers: making your life easier
* Gather data from different sources
* Optimized database for analyses
* Removed corrupt data

**Data Scientist's** life just got easier!

#### Definition of the job
> An engineer that develops, constructs, tests, and maintains architectures such as databases and large-scale processing systems.

* Processing large amounts of data
* Use of clusters of machines

#### Data Enginner vs. Data Scientist
Data Engineer:
* Develop scalable data architecture
* Streamline data acquisition
* Set up processes to bring together data
* Clean corrupt data
* Well versed in cloud technology

Data Scientist:
* Mining data for patterns
* Statistical modeling
* Predictive models using machine learning
* Monitor business processes
* Clean outliers in data

### Tools of the data engineer

#### Databases
* Hold large amounts of data
* Support applications
* Other databases are used for analyses

#### Processing
* Clean data
* Aggregate data
* Join data

#### Scheduling
* Plan jobs with specific intervals
* Resolve dependency requirements of jobs

#### Existing tools

Databases:
* MySQL
* PostgreSQL

Processing:
* Spark
* Hive

Scheduling:
* Apache Airflow
* Oozie
* Cron

### Cloud providers

#### Data processing in the cloud
*Clusters of machines required*

**Problem:** self-host data-center
   * Cover electrical and maintenance costs
   * Peak vs. quiet moments: hard to optimize
   * **Solution:** use the cloud

#### Data storage in the cloud
*Reliability is required*

**Problem:** self-host data-center
* Disaster will strike
* Need different geographical locations
* **Solution:** use the cloud

#### The big three: AWS, Azure and Google

#### Storage
*Upload files, e.g. storing product images*

**Services**
* AWS S3
* Azure Blob Storage
* Google Cloud Storage

#### Computation
*Performs calculations, e.g. hosting a web server*

**Services**
* AWS EC2
* Azure Virtual Machines
* Google Compute Engine

#### Databases
*Hold structured information*

**Services**
* AWS RDS
* Azure SQL Database
* Google Cloud SQL

## Chapter 2: Data Engineering Toolbox

### Databases

#### What are databases?
> A usually large collection of data organized especially for rapid search and retrieval.

* Holds data
* Organizes data

#### Databases and file storage
* Databases: very organized, functionality like search, replication, etc.
* File systems: less organized, simple, less functionality

#### Structured vs. Unstructured data
* Structured: database schema
    * Relational database
* Semi-structured
    * JSON
* Unstructured: schemaless, more like files
    * Videos, photos
    
#### SQL and NoSQL
SQL
* Tables
* Database schema
* Relational databases
* Example: MySQL, PostgreSQL

No SQL
* Non-relational databases
* Structured or unstructured
* Key-value stores (e.g. caching)
* Document DB (e.g. JSON objects)
* Example: redis, mongoDB

#### SQL: The database schema

In [None]:
# SQL

"""
-- Create Customer Table
CREATE TABLE "Customer" (
    "id" SERIAL NOT NULL,
    "first_name" varchar,
    "last_name" varchar,
    PRIMARY KEY ("id")
);

-- Create Order Table (
    "id" SERIAL NTO NULL,
    "customer_id" integer REGERENCES "Customer",
    "product_name" varchar,
    "product_price" integer,
    PRIMARY KEY ("id")
)
"""

#### SQL: Star schema
> *The star schema consists of one or more fact tables referencing any number of dimention tables.*

* **Facts:** Things that happened (e.g. Product Orders)
* **Dimensions:** information on the world (e.g. Customer Information)

In [None]:
# Complete the SELECT statement
data = pd.read_sql("""
SELECT first_name, last_name FROM "Customer"
ORDER BY last_name, first_name 
""", db_engine)

# Show the first 3 rows of the DataFrame
print(data.head(3))

# Show the info of the DataFrame
print(data.info())

In [None]:
# Complete the SELECT statement
data = pd.read_sql("""
SELECT * FROM "Customer"
INNER JOIN "Order"
ON "Order"."customer_id"="Customer"."id"
""", db_engine)

# Show the id column of data
print(data.id)

### What is parallel computing

#### Idea behind parallel computing
*Basis of modern data processing tools*

* Memory
* Processing power

Idea:
* Split task into subtasks
* Distribute subtasks over several computers
* Work together to finish task

#### Benefits of parallel computing
* Processing power
* Memory: partition the dataset

#### Risks of parallel computing
*Overhead due to communication*

* Task needs to be large
* Need several processing units

`multiprocessing.Pool`

In [None]:
from multiprocessing import Pool

def take_mean_age(year_and_group):
    year, group = year_and_group
    return pd.DataFrame({"Age": group["Age"].mean()}, index=[year])

with Pool(4): as p:
    results = p.map(take_mean_age, athlete_event.groupby("Year"))
    
result_df = pd.concat(results)

`dask`

In [None]:
import dask.dataframe as dd

# Partition dataframe into 4
athlete_events_dask = dd.from_pandas(athlete_events, npartitions = 4)

# Run parallel computations on each partition
result_df = athlete_events_dask.groupby('Year').Age.mean().compute()

In [None]:
# Function to apply a function over multiple cores
@print_timing
def parallel_apply(apply_func, groups, nb_cores):
    with Pool(nb_cores) as p:
        results = p.map(apply_func, groups)
    return pd.concat(results)

# Parallel apply using 1 core
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 1)

# Parallel apply using 2 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 2)

# Parallel apply using 4 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 4)

In [None]:
import dask.dataframe as dd

# Set the number of pratitions
athlete_events_dask = dd.from_pandas(athlete_events, npartitions = 4)

# Calculate the mean Age per Year
print(athlete_events_dask.groupby('Year').Age.mean().compute())

### Parallel Computation Frameworks

#### Apache Hadoop

#### HDFS
* Distributed File System
* Similar to the file system on your computer, the only difference being that the files reside on multiple different computers

#### MapReduce
* Works similarly to what we've seen before
* Has flaws; one of which was that it was hard to write these MapReduce jobs
* Hive popped up to address this problem

#### Hive
* Runs on Hadoop
* Structured Query Language: Hive SQL
* Initially MapReduce, now other tools

#### Hive: an example

In [None]:
# Indistinguishable from a regular SQL query
SELECT year, AVG(age)
FROM views.athlete_events
GROUP BY year

#### Apache Spark
* Avoid disk writes
* Maintained by Apache Software Foundation

#### Resilient distributed datasets (RDD)
* Spark relies on them
* Similar to list of tuples
* Transformations: `.map()` or `.filter()`
* Actions: `.count()` or `.first()`

#### PySpark
* Python interface to Spark
* DataFrame abstraction
* Looks similar to Pandas

#### PySpark: an example

In [None]:
# Load the dataset into athlete_events_spark first

(athlete_events_spark
    .groupBy('Year')
    .mean('Age')
    .show())

In [None]:
# PySpark groupby example

# Print the type of athlete_events_spark
print(type(athlete_events_spark))

# Print the schema of athlete_events_spark
print(athlete_events_spark.printSchema())

# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean('Age'))

# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean('Age').show())

### Workflow scheduling frameworks

#### An example pipeline
* CSV --> Spark --> load into SQL database
* *How to schedule?*
    * Manually
    * `cron` scheduling tool
    * What about dependencies?

#### DAGs
*Directed Acyclic Graph*
* Set of nodes
* Directed edges
* No cycles

#### The tools for the job
* Linux's `cron`
* Spotify's Luigi
* Apache Airflow

#### Apache Airflow
* Created at Airbnb
* DAGs
* Python

#### Airflow: an example in code

In [None]:
# Create the DAG object
dag = DAG(dag_id="example_dag", ... , schedule_interval="0 * * * *")

# Define operations
start_cluster = StartClusterOperator(task_id='start_cluster', dag=dag)
ingest_customer_data = SparkJobOperator(task_id='ingest_customer_data', dag=dag)
ingest_product_data = SparkJobOperator(taks_id='ingest_product_data', dag=dag)
enrich_customer_data = PythonOperator(task_id='enrich_customer_data', ..., dag=dag)

# Set up dependency flow
start_cluster.set_downstream(ingest_customer_data)
ingest_cusomter_data.set_downstream(enrich_customer_data)
ingest_product_data.set_downstream(enrich_customer_data)

## Chapter 3: Extract, Transform and Load (ETL)

### Extract

#### Extracting data: what does it mean?
* Extracting the data from persistent storage, such as an Amazon file from S3 or a sql database

#### Extract form text files

Unstructured
* Plain test
* E.g. chapter from a book

Flat files
* Row = record
* Column = attribute
* E.g. `.tsv` or `.csv`

#### JSON
* JavaScript Object Notation
* Semi-structured
* Atomic
    * `number`
    * `string`
    * `boolean`
    * `null`
* Composite
    * `array`
    * `object`

In [5]:
# json package helps to parse json data into python dictionary
import json
result = json.loads('{"key_1": "value_1", "key_2": "value_2"}')
print(result["key_1"])

value_1


#### Data on the Web

Requests
* request page <--> response with data

Example
1. Browse to Google
2. Request to Google Server
3. Google responds

#### Data on the Web through APIs
* Send data in JSON format
* API: application programming interface
* Examples
    * Twitter API
    * Hackernews API
    
#### Data in databases

Application databases
* Tranactions
* Inserts or changes
* OLTP (Online transaction processing)
* Row-oriented

Analytical databases
* OLAP (Online analytical processing)
* Column-oriented

#### Extraction from databases

Connection string/URI:

`postgresql://[user[:password]@[host][:post]`

Use in Python

In [None]:
import sqlalchemy
connection_url = "postgresql://repl:password@localhost:5432/pagila"
db_engine = sqlalchemy.create_engine(connection_uri)

import pandas as pd
pd.read_sql("SELECT * FROM customer", db_engine)

#### Fetch from an API

In [6]:
import requests

# Fetch the Hackernews post
resp = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")

# Print the response parsed as JSON
print(resp.json())

# Assign the score of the test to post_score
post_score = resp.json()["score"]
print(post_score)

{'by': 'neis', 'descendants': 0, 'id': 16222426, 'score': 17, 'time': 1516800333, 'title': 'Duolingo-Style Learning for Data Science: DataCamp for Mobile', 'type': 'story', 'url': 'https://medium.com/datacamp/duolingo-style-learning-for-data-science-datacamp-for-mobile-3861d1bc02df'}
17


#### Read from a database

In [None]:
import sqlalchemy

# Function to extract table to a pandas DataFrame
def extract_table_to_pandas(tablename, db_engine):
    query = "SELECT * FROM {}".format(tablename)
    return pd.read_sql(query, db_engine)

# Connect to the database using the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/pagila" 
db_engine = sqlalchemy.create_engine(connection_uri)

# Extract the film table into a pandas DataFrame
extract_table_to_pandas("film", db_engine)

# Extract the customer table into a pandas DataFrame
extract_table_to_pandas("customer", db_engine)

### Transform

#### Kind of transformations
* Selection of attribute (e.g. 'email')
* Translation of code values (e.g. 'New York' -> 'NY')
* Data Validation (e.g. date input in 'created_at')
* Splitting columns into multiple columns
* Joining from multiple sources

#### An example: split (Pandas)

In [11]:
import pandas as pd

# Pandas DataFrame with customer data
customer_df = pd.DataFrame({'customer_id': [1]
                           , 'email':['jane.doe@theweb.com']})
customer_df

Unnamed: 0,customer_id,email
0,1,jane.doe@theweb.com


In [12]:
# Split email column into two columns
split_email = customer_df.email.str.split("@", expand=True)

# At this point, split_email will have 2 columns, a first
# one with everything before @, and a second one with
# everything after @

# Create 2 new columns using the resulting DataFrame.
customer_df = customer_df.assign(
    username=split_email[0],
    domain=split_email[1]
)

customer_df

Unnamed: 0,customer_id,email,username,domain
0,1,jane.doe@theweb.com,jane.doe,theweb.com


#### Transforming in PySpark

Extract data into PySpark

In [None]:
import pyspark.sql

spark = pyspark.sql.SparkSession.builder.getOrCreate()

spark.read.jdbc("jdbc:postgresql://localhost:5432/pagila",
                "customer",
                properties = {"user":"repl", "password":"password"})

#### An example: join (PySpark)

In [None]:
customer_df # PySpark DataFrame with customer data
ratings_df # PySpark DataFrame with ratings data

# Groupby ratings
ratings_per_customer = ratings_df.groupBy("customer_id").mean("rating")

# Join on customer ID
customer_df.join(
    ratings_per_cusotmer,
    customer_df.customer_id==ratings_per_customer.customer_id
)

### Loading

#### Analytics or applications databases
Analytics
* Aggregate queries
* Online ananlytical processing (OLAP)

Applications
* Lots of transactions
* Online transaction processing (OLTP)

#### Column- and row-oriented
Analytics
* Column-oriented
* Queries about subset of columns
* Parallelization

Applications
* Row-oriented
* Stored per record
* Added per transaction
* E.g. adding a customer is fast

#### MPP Databases
*Massively Parallel Processing Databases*
* Amazon Redshift
* Azure SQL Data Warehouse
* Google BigQuery

#### An example: Redshift
*Load from file to columnar storage format*

In [None]:
# Pandas .to_parquet() method
df.to_parquet("./s3://path/to/bucket/customer.parquet")

# PySpark .write.parquet() method
df.write.parquet("/.s3://path/to/bucket/customer.parquet")

In [None]:
COPY customer
FROM 's3://path/to/bucket/customer.parquet'
FORMAT as parquet

#### Load to PostgreSQL
`pandas.to_sql()`

In [None]:
# Transformation on data
recommendations = transform_find_recommendations(ratings_df)

# Load into PostgreSQL database
recommendations.to_sql("recommendations",
                       db_engine,
                       schema="store",
                       if_exists="replace")

In [None]:
# Finish the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/dwh"
db_engine_dwh = sqlalchemy.create_engine(connection_uri)

# Transformation step, join with recommendations data
film_pdf_joined = film_pdf.join(recommendations)

# Finish the .to_sql() call to write to store.film
film_pdf_joined.to_sql("film", db_engine_dwh, schema="store", if_exists="replace")

# Run the query to fetch the data
pd.read_sql("SELECT film_id, recommended_film_ids FROM store.film", db_engine_dwh)

### Putting it all together

#### The ETL function

In [None]:
def extract_table_to_df(tablename, db_engine):
    return pd.read_sql("SELECT * FROM {}".format(tablename), db_engine)

def split_columns_transform(df, column, pat, suffixes):
    # Converts column into str and splits it on pat...

def load_df_into_dwh(film_df, tablename, schema, db_engine):
    return pd.to_sql(tablename, db_engine, schema=schema, if_exists="replace")

db_engines = { ... } # Needs to be configured

def etl():
    
    # Extract
    film_df = extract_table_to_df("film", db_engines["store"])
    
    # Transform
    film_df = split_columns_transform(film_df, "rental_rate", '.', ["_dollar", "_cents"])
    
    # Load
    load_df_into_dwh(film_df, "film", "store", db_engine["dwh"])

#### Airflow refresher
* Workflow scheduler
* Python
* DAGs
* Tasks defined in operators (e.g. `BashOperator`)

#### Scheduling with DAGs in Airflow

In [None]:
from airflow.models import DAG

dag = DAG(dag_id="sample",
          ..., 
          schedule_interval="0 0 * * *")

"""
cron expression:
    * minute (0 - 59)
    * hour (0 - 23)
    * day of the month (1 - 31)
    * month (1 - 12)
    * day of the week (0 - 6) 
"""

To learn more about cron jobs, go to https://crontab.guru

#### The DAG definition file

In [None]:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(dag_id="etl_pipeline",
          schedule_interval="0 0 * * *")

etl_task = PythonOperator(task_id="etl_task",
                          python_callable=etl,
                          dag=dag)

etl_task.set_upstream(wait_for_this_task)