In [12]:
import requests
import pandas as pd
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.exc import IntegrityError
from sqlalchemy import inspect

[[34m2024-12-18T17:51:30.850+0200[0m] {[34mutils.py:[0m160} INFO[0m - NumExpr defaulting to 8 threads.[0m


### DataBase connection

In [13]:
DATABASE_URL = "postgresql://etl:etl@localhost:5432/etlpipline_db"
engine = create_engine(DATABASE_URL)
Base = declarative_base()
Session = sessionmaker(bind=engine)
session = Session()
Base = declarative_base()
# Clear metadata to avoid duplicate definitions (optional in Jupyter Notebooks)
Base.metadata.clear()

### Define Database Models

In [14]:
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    first_name = Column(String)
    last_name = Column(String)
    age = Column(Integer)
    email = Column(String)

    # Relationships
    hair = relationship("Hair", back_populates="user", cascade="all, delete-orphan")
    address = relationship("Address", back_populates="user", cascade="all, delete-orphan")

# Hair Table
class Hair(Base):
    __tablename__ = 'hair'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    color = Column(String)
    type = Column(String)

    user = relationship("User", back_populates="hair")

# Address Table
class Address(Base):
    __tablename__ = 'address'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    address = Column(String)
    city = Column(String)
    state = Column(String)

    user = relationship("User", back_populates="address")

# Check if the 'users' table exists before creating tables
inspector = inspect(engine)

if not inspector.has_table("users"):
    Base.metadata.create_all(engine)

### Extract function to fetch data from the API

In [15]:
def extract_users():
    url = "https://dummyjson.com/users"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json().get("users", [])
    else:
        raise Exception(f"Failed to fetch data: {response.status_code}")

### Transform function to clean and prepare data

In [16]:
def transform_user(user):
    # Extract flat fields
    user_data = {
        "id": user["id"],
        "first_name": user["firstName"],
        "last_name": user["lastName"],
        "age": user["age"],
        "email": user["email"],
    }

    # Extract nested fields
    hair_data = user.get("hair", {})
    address_data = user.get("address", {})

    return user_data, hair_data, address_data

### Load

In [17]:
def load_data(users):
    for user in users:
        try:
            # Transform data
            user_data, hair_data, address_data = transform_user(user)

            # Check if the user already exists in the database
            existing_user = session.query(User).filter_by(id=user_data["id"]).one_or_none()
            if existing_user:
                print(f"User with ID {user_data['id']} already exists. Skipping.")
                continue

            # Create User object
            user_obj = User(
                id=user_data["id"],
                first_name=user_data["first_name"],
                last_name=user_data["last_name"],
                age=user_data["age"],
                email=user_data["email"],
            )

            # Create Hair object if hair data exists
            if hair_data:
                hair_obj = Hair(color=hair_data.get("color"), type=hair_data.get("type"))
                user_obj.hair.append(hair_obj)  # Assuming a relationship is defined

            # Create Address object if address data exists
            if address_data:
                address_obj = Address(
                    address=address_data.get("address"),
                    city=address_data.get("city"),
                    state=address_data.get("state"),
                )
                user_obj.address.append(address_obj)  # Assuming a relationship is defined

            # Add user object to the session
            session.add(user_obj)

        except IntegrityError as e:
            print(f"IntegrityError: {e}. Rolling back transaction.")
            session.rollback()  # Rollback the current transaction to avoid blocking
        except Exception as e:
            print(f"Unexpected error: {e}. Rolling back transaction.")
            session.rollback()  # Rollback any other errors

    # Commit all data after processing all users
    try:
        session.commit()
        print("All data committed successfully.")
    except Exception as e:
        print(f"Error committing data: {e}. Rolling back.")
        session.rollback()


### ETL Pipeline

In [18]:
def etl_pipeline():
    print("Starting ETL pipeline...")
    
    # Extract
    users = extract_users()
    
    # Load
    load_data(users)
    
    print("ETL pipeline completed successfully.")

if __name__ == "__main__":
    etl_pipeline()

Starting ETL pipeline...
All data committed successfully.
ETL pipeline completed successfully.


### Automate with Airflow

In [19]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

In [20]:
# ETL function for Airflow task
def run_etl():
    df = extract_users()
    transformed_df = transform_users(df)
    load_users(transformed_df)

# Define Airflow DAG
with DAG(
    "dummyjson_etl_pipeline",
    start_date=datetime(2024, 6, 8),
    schedule="@daily",  # Adjust the interval as needed
    catchup=False
) as dag:

    etl_task = PythonOperator(
        task_id="run_etl_pipeline",
        python_callable=run_etl
    )

etl_task

<Task(PythonOperator): run_etl_pipeline>