# Implementing ETL Pipeline Using SQLAlchemy

SQLAlchemy is a Python module that allows data engineers and developers to define schemas, write queries, and manipulate SQL databases entirely through Python. SQLAlchemy’s Object Relational Mapper (ORM) and Expression Language functionalities allowing us to associate Python classes and constructs with data tables and expressions.

For this project, I will be extracting data from two different mock APIs which are:
- [Random User Generator](https://randomuser.me/) - to get user profiles data
- [JSONPlaceholder](https://jsonplaceholder.typicode.com/) - to get albums and photos data

The extracted data will be in JSON format, which will be transformed into a flat format that can be safely written to the database without error.

It will then be loaded into its database which is `gallery.db`. This database should contain three tables: **users, albums, photos** <br><br>

![Entity Relation Diagram](entity-relation-diagram.png)
<p style="text-align: center;"><i>Entity Relation Diagram for Photo Gallery database</i></p><br>

## Defining a database schema
A database schema defines the structure of a database system, in terms of tables, columns, fields, and the relationships between them. Schemas can be defined in raw SQL, or through the use of SQLAlchemy’s ORM feature.

In [1]:
# Install sqlalchemy
! pip install sqlalchemy



In [2]:
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import *

engine = create_engine('sqlite:///gallery.db')

Base = declarative_base()

class Users(Base):
    __tablename__ = "users"
    UserId = Column(Integer, primary_key=True)
    Title = Column(String)
    FirstName = Column(String)
    LastName = Column(String)
    Email = Column(String)
    Username = Column(String)
    DOB = Column(DateTime)
    
class Albums(Base):
    __tablename__="albums"
    AlbumId = Column(Integer, primary_key=True)
    UserId = Column(Integer)
    Title = Column(String)
    
class Photos(Base):
    __tablename__="photos"
    PhotoId = Column(Integer, primary_key=True)
    AlbumId = Column(Integer)
    Title = Column(String)
    Url = Column(String)
    
Users.__table__.create(bind=engine, checkfirst=True)
Albums.__table__.create(bind=engine, checkfirst=True)
Photos.__table__.create(bind=engine, checkfirst=True)

## Extract

Once the schema has been defined, the next task is to extract the raw data from its source.
The **Users** table will be populated with profiles randomly generated at `randomuser.me`, while the **Albums** and **Photos** table will contain data courtesy of JSONPlaceholder.

Python’s `Requests` module is used to call these APIs.

In [3]:
import requests

url = 'https://randomuser.me/api/?results=10'
users_json = requests.get(url).json()

url2 = 'https://jsonplaceholder.typicode.com/albums/'
albums_json = requests.get(url2).json()

url3 = 'https://jsonplaceholder.typicode.com/photos/'
photos_json = requests.get(url3).json()

In [4]:
import pprint
pprint.pprint(users_json)

{'info': {'page': 1,
          'results': 10,
          'seed': 'da82295f31b344a3',
          'version': '1.3'},
 'results': [{'cell': '(69) 1141-3075',
              'dob': {'age': 70, 'date': '1951-05-05T04:03:23.515Z'},
              'email': 'benta.barros@example.com',
              'gender': 'female',
              'id': {'name': '', 'value': None},
              'location': {'city': 'Várzea Paulista',
                           'coordinates': {'latitude': '-72.4530',
                                           'longitude': '107.8648'},
                           'country': 'Brazil',
                           'postcode': 77002,
                           'state': 'Rondônia',
                           'street': {'name': 'Rua Doze ', 'number': 2256},
                           'timezone': {'description': 'Brazil, Buenos Aires, '
                                                       'Georgetown',
                                        'offset': '-3:00'}},
              'login': {'

In [None]:
#pprint.pprint(albums_json)

In [None]:
#pprint.pprint(photos_json)

The data is currently held in three objects (`users_json`,`albums_json`, and `photos_json`) in JSON format. The next step will be to transform and load this data into the tables defined earlier.

## Transform

Transform the data from its current nested JSON format to a flat format that can be safely written to the database without error.
The code below creates three lists, `users`,`albums` and `photos`, which will be used in the final step:

In [5]:
from datetime import datetime, timedelta
import dateutil.parser

users, albums, photos = [],[],[]

for i, result in enumerate(users_json['results']):
    row = {}
    row['UserId'] = i
    row['Title'] = result['name']['title']
    row['FirstName'] = result['name']['first']
    row['LastName'] = result['name']['last']
    row['Email'] = result['email']
    row['Username'] = result['login']['username']
    d = dateutil.parser.parse(result['dob']['date'])
    dob = d.strftime('%Y-%m-%d %H:%M:&S')
    dob = datetime.strptime(dob, '%Y-%m-%d %H:%M:&S' )
    row['DOB']=dob
    users.append(row)

for result in albums_json:
    row = {}
    row['AlbumId'] = result['id']
    row['UserId'] = result['userId']
    row['Title'] = result['title']
    albums.append(row)
    
for result in photos_json:
    row = {}
    row['PhotoId'] = result['id']
    row['AlbumId'] = result['albumId']
    row['Title'] = result['title']
    row['Url'] = result['url']
    photos.append(row)

In [6]:
print(users)

[{'UserId': 0, 'Title': 'Mrs', 'FirstName': 'Benta', 'LastName': 'Barros', 'Email': 'benta.barros@example.com', 'Username': 'smallmouse460', 'DOB': datetime.datetime(1951, 5, 5, 4, 3)}, {'UserId': 1, 'Title': 'Mr', 'FirstName': 'Miroslaw', 'LastName': 'Weidmann', 'Email': 'miroslaw.weidmann@example.com', 'Username': 'blueduck690', 'DOB': datetime.datetime(1996, 5, 20, 10, 38)}, {'UserId': 2, 'Title': 'Mrs', 'FirstName': 'Nicoline', 'LastName': 'Mortensen', 'Email': 'nicoline.mortensen@example.com', 'Username': 'bigbird669', 'DOB': datetime.datetime(1987, 1, 3, 0, 19)}, {'UserId': 3, 'Title': 'Mrs', 'FirstName': 'Filippa', 'LastName': 'Poulsen', 'Email': 'filippa.poulsen@example.com', 'Username': 'heavywolf486', 'DOB': datetime.datetime(1970, 2, 20, 8, 39)}, {'UserId': 4, 'Title': 'Mrs', 'FirstName': 'Camille', 'LastName': 'Chow', 'Email': 'camille.chow@example.com', 'Username': 'beautifulleopard910', 'DOB': datetime.datetime(1954, 8, 28, 7, 40)}, {'UserId': 5, 'Title': 'Monsieur', 'Fir

In [None]:
#print(albums)

In [None]:
#print(photos)

## Load

Finally, the data is in a form that can be loaded into the database. SQLAlchemy makes this step straightforward through its Session API.
The code below creates a new session object, adds rows to it, then merges and commits them to the database:

In [7]:
Session = sessionmaker(bind=engine)

session = Session()  

for user in users:
    row = Users(**user)
    session.add(row)
    
for album in albums:
    row = Albums(**album)
    session.add(row)
    
for photo in photos:
    row = Photos(**photo)
    session.add(row)
    
session.commit()

## Check existing tables in database

In [8]:
%load_ext sql
%sql sqlite:///gallery.db

In [9]:
%sql SELECT count(*) FROM users;

 * sqlite:///gallery.db
Done.


count(*)
10


In [10]:
%sql SELECT * FROM users LIMIT 5;

 * sqlite:///gallery.db
Done.


UserId,Title,FirstName,LastName,Email,Username,DOB
0,Mrs,Benta,Barros,benta.barros@example.com,smallmouse460,1951-05-05 04:03:00.000000
1,Mr,Miroslaw,Weidmann,miroslaw.weidmann@example.com,blueduck690,1996-05-20 10:38:00.000000
2,Mrs,Nicoline,Mortensen,nicoline.mortensen@example.com,bigbird669,1987-01-03 00:19:00.000000
3,Mrs,Filippa,Poulsen,filippa.poulsen@example.com,heavywolf486,1970-02-20 08:39:00.000000
4,Mrs,Camille,Chow,camille.chow@example.com,beautifulleopard910,1954-08-28 07:40:00.000000


In [11]:
%sql SELECT count(*) FROM albums;

 * sqlite:///gallery.db
Done.


count(*)
100


In [12]:
%sql SELECT * FROM albums LIMIT 5;

 * sqlite:///gallery.db
Done.


AlbumId,UserId,Title
1,1,quidem molestiae enim
2,1,sunt qui excepturi placeat culpa
3,1,omnis laborum odio
4,1,non esse culpa molestiae omnis sed optio
5,1,eaque aut omnis a


In [13]:
%sql SELECT count(*) FROM photos;

 * sqlite:///gallery.db
Done.


count(*)
5000


In [14]:
%sql SELECT * FROM photos LIMIT 5;

 * sqlite:///gallery.db
Done.


PhotoId,AlbumId,Title,Url
1,1,accusamus beatae ad facilis cum similique qui sunt,https://via.placeholder.com/600/92c952
2,1,reprehenderit est deserunt velit ipsam,https://via.placeholder.com/600/771796
3,1,officia porro iure quia iusto qui ipsa ut modi,https://via.placeholder.com/600/24f355
4,1,culpa odio esse rerum omnis laboriosam voluptate repudiandae,https://via.placeholder.com/600/d32776
5,1,natus nisi omnis corporis facere molestiae rerum in,https://via.placeholder.com/600/f66b97


The End.