## Building ETL Pipelines

> This document of fundamental ETL pipeline is directly referenced from [Buliding ETL pipeline's article of freecodecamp](https://www.freecodecamp.org/news/sqlalchemy-makes-etl-magically-easy-ab2bd0df928/) written by Peter Gleeson. 

## Defining a schema

A __database schema defines the structure of a database system, in terms of tables, columns, fields, and the relationships between them.__ 

Schemas can be defined in raw SQL, or through the use of SQLAlchemy’s ORM feature.

Below is an example showing how to define a schema of two tables for an imaginary blogging platform. __One is a table of users, and the other is a table of posts uploaded.__

1. First, __import__ everything you need from SQLAlchemy. Then, use __`create_engine(connection_string)` to connect to your database.__


2. Next, __start defining your table classes.__ 
    - __The first one in the example is `Users`.__ __Each column in this table is defined as a class variable__ using SQLAlchemy’s `Column(type)`, where type is a data type (such as `Integer`, `String`, `DateTime` and so on). __Use `primary_key=True` to denote columns which will be used as primary keys.__
    - __The next table defined here is `Uploads`.__ It’s very much the same idea — each column is defined as before.
    
The final two lines actually create the tables. The __`checkfirst=True` parameter ensures that new tables are only created if they do not currently exist in the database.__

In [1]:
%rm demo.db
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import *

engine = create_engine('sqlite:///demo.db')
Base = declarative_base()

class Users(Base):
    __tablename__ = "users"
    UserId = Column(Integer, primary_key=True)
    Title = Column(String)
    FirstName = Column(String)
    LastName = Column(String)
    Email = Column(String)
    Username = Column(String)
    DOB = Column(DateTime)

class Uploads(Base):
    __tablename__ = "uploads"
    UploadId = Column(Integer, primary_key=True)
    UserId = Column(Integer)
    Title = Column(String)
    Body = Column(String)
    Timestamp = Column(DateTime)

Users.__table__.create(bind=engine, checkfirst=True)
Uploads.__table__.create(bind=engine, checkfirst=True)

In [2]:
Users.__table__

Table('users', MetaData(bind=None), Column('UserId', Integer(), table=<users>, primary_key=True, nullable=False), Column('Title', String(), table=<users>), Column('FirstName', String(), table=<users>), Column('LastName', String(), table=<users>), Column('Email', String(), table=<users>), Column('Username', String(), table=<users>), Column('DOB', DateTime(), table=<users>), schema=None)

In [3]:
Uploads.__table__

Table('uploads', MetaData(bind=None), Column('UploadId', Integer(), table=<uploads>, primary_key=True, nullable=False), Column('UserId', Integer(), table=<uploads>), Column('Title', String(), table=<uploads>), Column('Body', String(), table=<uploads>), Column('Timestamp', DateTime(), table=<uploads>), schema=None)

## Extract

Once the schema has been defined, the next task is to __extract the raw data from its source.__ The exact details can vary wildly from case to case, depending on how the raw data is provided. Maybe your app calls an in-house or third-party API, or perhaps you need to read data logged in a CSV file.

The example below __uses two APIs to simulate data for the fictional blogging platform__ described above. 

- __The Users table will be populated with profiles randomly generated at `randomuser.me`__, and 

- __the Uploads table will contain psudo-inspired data courtesy of `JSONPlaceholder`.__

Python’s __`Requests` module can be used to call these APIs__, as shown below:

In [4]:
import requests

url = 'https://randomuser.me/api/?results=10'
users_json = requests.get(url).json()
url2 = 'https://jsonplaceholder.typicode.com/posts/'
uploads_json = requests.get(url2).json()

__The data is currently held in two objects (`users_json` and `uploads_json`) in JSON format.__ The next step will be to transform and load this data into the tables defined earlier.

In [24]:
uploads_json[0:2]

[{'userId': 1,
  'id': 1,
  'title': 'sunt aut facere repellat provident occaecati excepturi optio reprehenderit',
  'body': 'quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto'},
 {'userId': 1,
  'id': 2,
  'title': 'qui est esse',
  'body': 'est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla'}]

## Transform

__Before the data can be loaded into the database, it is important to ensure that it is in the correct format.__ 

- __The JSON objects created in the code above are nested, and contain more data than is required for the tables defined.__


- __An important intermediary step is to "transform" the data__ __"<u>from its current nested JSON</u>" format <u>to a "FLAT format"</u> that can be safely written to the database without error.__

However, our data are relatively simple, and won’t need much transformation. The code below creates two lists: `users` and `uploads`, are used for final steps.


1. __The main step here is to iterate through the JSON objects created before.__ 

    - __For each result, create a new Python dictionary object with keys corresponding to each column defined for the relevant table in the schema.__ 
    
    - __This ensures that the data is no longer nested, and keeps only the data needed for the tables.__
    
    
2. __The other step is to use Python’s `datetime` module to manipulate dates, and transform them into `DateTime type objects` that can be written to the database.__ 

    - For the sake of this example, random `DateTime` objects are generated using the `timedelta()` method from Python’s DateTime module.

In [5]:
from datetime import datetime, timedelta
from random import randint

users, uploads = [], []

for i, result in enumerate(users_json['results']):
    row = {}
    row['UserId'] = i
    row['Title'] = result['name']['title']
    row['FirstName'] = result['name']['first']
    row['LastName'] = result['name']['last']
    row['Email'] = result['email']
    row['Username'] = result['login']['username']
    dob = datetime.strptime(result['dob']['date'],'%Y-%m-%dT%H:%M:%S.%fZ') 
#     dob = datetime.strptime(result['dob'],'%Y-%m-%d %H:%M:%S')    
    row['DOB'] = dob.date()
    
    users.append(row)
    
for result in uploads_json:
    row = {}
    row['UploadId'] = result['id']
    row['UserId'] = result['userId']
    row['Title'] = result['title']
    row['Body'] = result['body']
    delta = timedelta(seconds=randint(1,86400))
    row['Timestamp'] = datetime.now() - delta
    
    uploads.append(row)

In [6]:
dob.date()

datetime.date(1980, 9, 16)

## Load

Finally, the __data is in a form that can be loaded into the database.__ SQLAlchemy __makes this step straightforward through its Session API.__

- The __Session API__ acts a bit like a middleman, or __“holding zone,” for Python objects__ that you have either loaded from or associated with the database. 
__These objects can be manipulated within the session before being committed (i.e. carried out) to the database.__


- __The code below creates a new session object, adds rows to it, then merges and commits them to the database:__

In [7]:
Session = sessionmaker(bind=engine)
session = Session()

for user in users:
    row = Users(**user)
    session.add(row)
    
for upload in uploads:
    row = Uploads(**upload)
    session.add(row)

session.commit()

In [8]:
session

<sqlalchemy.orm.session.Session at 0x10301fd50>

- __The `sessionmaker` factory is used to generate newly-configured `Session` classes.__ 
    `Session` is an everyday Python class that is instantiated on the second line as `session`.
    
    
- __Next up are two loops which iterate through the `users` and `uploads` lists created earlier.__ __The elements of these lists are dictionary objects__ whose keys correspond to the columns __given in the `Users` and `Uploads`__ classes defined previously.


- __Each object is used to instantiate a new instance of the relevant class__ (using Python’s handy `some_function(**some_dict)` trick). __This object is added to the current session with `session.add()`.__


__Finally__, when the session contains the rows to be added, __`session.commit(`) is used to commit the transaction to the database.__

## Aggregating

Different versions of SQL have somewhat incompatible syntaxes, but __SQLAlchemy’s Expression Language__ acts as a __lingua franca (common language)__ between them.


Once the basic `Users` and `Uploads` tables have been created and populated, a next step might be to __create an aggregated table:__

- __for instance, showing how many articles each user has posted, and the time they were last active. First, define a class for the aggregated table:__

In [9]:
class UploadCounts(Base):
    __tablename__ = "upload_counts"
    UserId = Column(Integer, primary_key=True)
    LastActive = Column(DateTime)
    PostCount = Column(Integer)

UploadCounts.__table__.create(bind=engine, checkfirst=True)

This table will have __three columns__. __For each `UserId`, it will store the timestamp of when they were last active, and a count of how many posts they have uploaded.__

In plain SQL, this table would be populated using a query along the lines of:

In [None]:
"""
INSERT INTO upload_counts
SELECT
  UserId,
  MAX(Timestamp) AS LastActive,
  COUNT(UploadId) AS PostCount
FROM
  uploads
GROUP BY 1;
"""

In SQLAlchemy, this would be written as:

In [10]:
connection = engine.connect()

query = select([Uploads.UserId,
    func.max(Uploads.Timestamp).label('LastActive'),
    func.count(Uploads.UploadId).label('PostCount')]).\
    group_by('UserId')

results = connection.execute(query)

for result in results:
    row = UploadCounts(**result)
    session.add(row)
    
session.commit()

__The first line creates a `Connection` object using the engine object’s `connect()` method.__ Next, a query is defined using the `select()` function.

- Similarly to SQL version given above, it `selects` the `UserId` column from the `uploads` table. It also applies `func.max()` to the `Timestamp` column, which identifies the most recent timestamp. This is labelled `LastActive` using the `label()` method.


- Likewise, the query applies `func.count()` to count the number of records that appear in the Title column. This is labelled `PostCount`.


- Finally, the query uses `group_by()` to group results by `UserId`.



__To use the results of the query, a for-loop iterates over the row objects returned by `connection.execute(query)`.__

__Each row is used to instantiate an instance of the `UploadCounts` table class. As before, each row is added to the `session` object, and finally the `session` is `committed` to the database.__

*** 

## Check database out

Now we can check that the data have been written correctly into the `demo.db` database created earlier. Here's seen below:

In [11]:
ls

[1m[36mETL Pipeline[m[m/            [34mdb_credentials.py[m[m*       [1m[36mpythondataanalysis-main[m[m/
[34mETL Pipelines.ipynb[m[m*     demo.db                  [1m[36msqlalchemy-demo[m[m/
[1m[36m__pycache__[m[m/             [34metl.py[m[m*                  [34msub_queries.py[m[m*
data.db                  [34mmain.py[m[m*


In [12]:
import sqlite3
import pandas as pd   
dat = sqlite3.connect('demo.db')
dat

<sqlite3.Connection at 0x11924ab90>

In [13]:
df_users = pd.read_sql_query("SELECT * FROM users", dat)
df_users.head()

Unnamed: 0,UserId,Title,FirstName,LastName,Email,Username,DOB
0,0,Mrs,Sumana,Mathew,sumana.mathew@example.com,bigfrog691,1994-08-24 00:00:00.000000
1,1,Miss,Deusete,Nunes,deusete.nunes@example.com,redbear183,1964-05-02 00:00:00.000000
2,2,Mr,Richard,Davis,richard.davis@example.com,redpeacock709,1978-11-02 00:00:00.000000
3,3,Monsieur,Ahmad,Renard,ahmad.renard@example.com,tinysnake321,1996-06-11 00:00:00.000000
4,4,Mr,Quinn,Johnson,quinn.johnson@example.com,smalltiger254,1945-11-06 00:00:00.000000


In [14]:
df_uploads = pd.read_sql_query("SELECT * FROM uploads", dat)
df_uploads.head()

Unnamed: 0,UploadId,UserId,Title,Body,Timestamp
0,1,1,sunt aut facere repellat provident occaecati e...,quia et suscipit\nsuscipit recusandae consequu...,2022-11-10 21:04:44.167559
1,2,1,qui est esse,est rerum tempore vitae\nsequi sint nihil repr...,2022-11-10 19:53:12.167580
2,3,1,ea molestias quasi exercitationem repellat qui...,et iusto sed quo iure\nvoluptatem occaecati om...,2022-11-11 13:04:23.167589
3,4,1,eum et est occaecati,ullam et saepe reiciendis voluptatem adipisci\...,2022-11-11 00:16:46.167598
4,5,1,nesciunt quas odio,repudiandae veniam quaerat sunt sed\nalias aut...,2022-11-11 10:49:18.167607


In [15]:
df_upload_cnt = pd.read_sql_query("SELECT * FROM upload_counts", dat)
df_upload_cnt.head()

Unnamed: 0,UserId,LastActive,PostCount
0,1,2022-11-11 13:04:23.167589,10
1,2,2022-11-11 15:59:20.167729,10
2,3,2022-11-11 10:44:19.167756,10
3,4,2022-11-11 12:19:15.167872,10
4,5,2022-11-11 11:27:47.168112,10


Now we could use these tables to write queries for further analysis, or to build dashboards for visualisation purposes in the future.

***

## Summary

In [20]:
%rm demo.db
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import *
from random import randint

# connect to sqlite database
engine = create_engine('sqlite:///demo.db')

# define schema
Base = declarative_base()

class Users(Base):
    __tablename__ = "users"
    UserId = Column(Integer, primary_key=True)
    Title = Column(String)
    FirstName = Column(String)
    LastName = Column(String)
    Email = Column(String)
    Username = Column(String)
    DOB = Column(DateTime)

class Uploads(Base):
    __tablename__ = "uploads"
    UploadId = Column(Integer, primary_key=True)
    UserId = Column(Integer)
    Title = Column(String)
    Body = Column(String)
    Timestamp = Column(DateTime)

# create tables
Users.__table__.create(bind=engine, checkfirst=True)
Uploads.__table__.create(bind=engine, checkfirst=True)

# extract simulated data
import requests

url = 'https://randomuser.me/api/?results=10'
users_json = requests.get(url).json()

url2 = 'https://jsonplaceholder.typicode.com/posts/'
uploads_json = requests.get(url2).json()

# transform data, ready for loading stage
from datetime import datetime, timedelta

users, uploads = [], []

for i, result in enumerate(users_json['results']):
    row = {}
    row['UserId'] = i
    row['Title'] = result['name']['title']
    row['FirstName'] = result['name']['first']
    row['LastName'] = result['name']['last']
    row['Email'] = result['email']
    row['Username'] = result['login']['username']
    dob = datetime.strptime(result['dob']['date'],'%Y-%m-%dT%H:%M:%S.%fZ') 
    row['DOB'] = dob.date()
    users.append(row)

for result in uploads_json:
    row = {}
    row['UploadId'] = result['id']
    row['UserId'] = result['userId']
    row['Title'] = result['title']
    row['Body'] = result['body']
    delta = timedelta(seconds=randint(1,86400))
    row['Timestamp'] = datetime.now() - delta
    uploads.append(row)

# create new Session and commit to database
Session = sessionmaker(bind=engine)
session = Session()

for user in users:
    row = Users(**user)
    session.add(row)

for upload in uploads:
    row = Uploads(**upload)
    session.add(row)

session.commit()

# Aggregations
# define new table
class UploadCounts(Base):
    __tablename__ = "upload_counts"
    UserId = Column(Integer, primary_key=True)
    LastActive = Column(DateTime)
    PostCount = Column(Integer)

# create table
UploadCounts.__table__.create(bind=engine, checkfirst=True)

# connect to database and execute query
connection = engine.connect()

query = select([Uploads.UserId,
    func.max(Uploads.Timestamp).label('LastActive'),
    func.count(Uploads.UploadId).label('PostCount')]).group_by('UserId')

results = connection.execute(query)

# loop through results and commit to aggregates table
for result in results:  
    row = UploadCounts(**result)
    session.add(row)

session.commit()
session.close()


# Display the aggregated table
dat = sqlite3.connect('demo.db')
df_upload_cnt = pd.read_sql_query("SELECT * FROM upload_counts", dat)
df_upload_cnt.head()

Unnamed: 0,UserId,LastActive,PostCount
0,1,2022-11-11 17:17:37.627078,10
1,2,2022-11-11 17:45:56.627194,10
2,3,2022-11-11 15:54:07.627306,10
3,4,2022-11-11 17:52:15.627390,10
4,5,2022-11-11 17:44:07.627469,10


***