In [None]:
%%capture
import requests



Extract Data

In [10]:
API_URL = "https://jsonplaceholder.typicode.com/users"  # Replace with your API URL

def extract_data():
    response = requests.get(API_URL)
    if response.status_code == 200:
        return response.json()  # Assuming the API returns JSON
    else:
        raise Exception(f"Failed to fetch data: {response.status_code}")

In [11]:
user_data = extract_data()

In [16]:
user_data[1]

{'id': 2,
 'name': 'Ervin Howell',
 'username': 'Antonette',
 'email': 'Shanna@melissa.tv',
 'address': {'street': 'Victor Plains',
  'suite': 'Suite 879',
  'city': 'Wisokyburgh',
  'zipcode': '90566-7771',
  'geo': {'lat': '-43.9509', 'lng': '-34.4618'}},
 'phone': '010-692-6593 x09125',
 'website': 'anastasia.net',
 'company': {'name': 'Deckow-Crist',
  'catchPhrase': 'Proactive didactic contingency',
  'bs': 'synergize scalable supply-chains'}}

Create database based on user_data:
Three tables are created to store the data obtained from the API.
- users: Stores user information (id, name,username, email, phone, website)
- address : Stores user information (id, user_id(fk),street, suite, city, zipcode, latitiude,longitude )
- company: Stores user information (id, user_id(fk), name, catchPhrase, bs)

In [None]:
create database ETL_database;

use ETL_database;

create table users (
id int primary key,
name varchar(100),
username varchar(50),
email varchar(100),
phone varchar(50),
website varchar(100)
);

create table address(
id INT auto_increment primary key,
user_id INT,
street varchar(100),
suite varchar(50),
city varchar(100),
zipcode varchar(20),
latitude float,
longitude float,
foreign key (user_id) references users(id)
);

create table company(
id INT auto_increment primary key,
user_id int,
name varchar(100),
catch_phrase varchar(255),
bs varchar(255),
foreign key (user_id) references users(id)
);

In [30]:
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Float
from sqlalchemy.ext.declarative import declarative_base

In [32]:
Base = declarative_base()

  Base = declarative_base()


In [33]:
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(100))
    username = Column(String(50))
    email = Column(String(100))
    phone = Column(String(50))
    website = Column(String(100))
    addresses = relationship("Address", back_populates="user")
    companies = relationship("Company", back_populates="user")

class Address(Base):
    __tablename__ = 'address'
    id = Column(Integer, primary_key=True, autoincrement=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    street = Column(String(100))
    suite = Column(String(50))
    city = Column(String(100))
    zipcode = Column(String(20))
    latitude = Column(Float)
    longitude = Column(Float)
    user = relationship("User", back_populates="addresses")

class Company(Base):
    __tablename__ = 'company'
    id = Column(Integer, primary_key=True, autoincrement=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    name = Column(String(100))
    catch_phrase = Column(String(255))
    bs = Column(String(255))
    user = relationship("User", back_populates="companies")

In [None]:
try:
    engine = create_engine('mysql+pymysql://bishnu:bishow%4023@localhost/ETL_database')
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    print("Succesfully connected to database")
except:
    print("Error for creating engine")


SUccesfully connected to database


Transform Data

In [37]:
import re
def transform_data(raw_data):
    users = []
    addresses = []
    companies =[]
    for user in raw_data:
        cleaned_phone = re.sub(r'\s*x\d*$', '', user['phone'])
        users.append({
        'id': user['id'],
        'name': user['name'],
        'username': user['username'],
        'email': user['email'],
        'phone': cleaned_phone,
        'website': user['website']
        })


        addresses.append({
        'user_id': user['id'],
        'street': user['address']['street'],
        'suite': user['address']['suite'],
        'city': user['address']['city'],
        'zipcode': user['address']['zipcode'],
        'latitude': float(user['address']['geo']['lat']),
        'longitude': float(user['address']['geo']['lng'])
        })

        companies.append({
        'user_id': user['id'],
        'name': user['company']['name'],
        'catch_phrase': user['company']['catchPhrase'],
        'bs': user['company']['bs']
        })

    return users, addresses, companies

Load Data

In [38]:
def load_data(users, addresses, companies):
    for user in users:
        session.add(User(**user))

    for address in addresses:
        session.add(Address(**address))
    
    for company in companies:
        session.add(Company(**company))

    session.commit()

In [39]:
def etl_pipeline():
    raw_data = extract_data()
    users, addresses, companies = transform_data(raw_data)
    load_data(users, addresses, companies)
    print("ETL pipeline completed successfully")

In [40]:
etl_pipeline()

ETL pipeline completed successfully


Checking the tables in mysql server they are filled with information fetched from the API.
Therefore, the ETL pipeline works as expected.