In [65]:
import psycopg2
import pandas as pd
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.dates import days_ago

In [5]:
# Connect to the transactional PostgreSQL database
conn_transactional = psycopg2.connect(
    host="localhost",
    port="5433",
    database="transactional_db",
    user="user",
    password="password"
)

# Connect to the data warehouse PostgreSQL database
conn_dwh = psycopg2.connect(
    host="localhost",
    port="5434",
    database="dwh_db",
    user="user",
    password="password"
)


In [6]:
cur_transactional = conn_transactional.cursor()
cur_dwh = conn_dwh.cursor()

# Print a success message as test
print("Connected to both transactional and DWH databases successfully!")

Connected to both transactional and DWH databases successfully!


In [33]:
# SQL queries to create tables
create_users_table = """
CREATE TABLE IF NOT EXISTS users (
    user_id SERIAL PRIMARY KEY,
    user_name VARCHAR(50),
    email VARCHAR(50) UNIQUE,
    wallet_balance NUMERIC(10, 2),
    phone_number VARCHAR(20),
    address VARCHAR(100)
);
"""
create_author_table = """
CREATE TABLE IF NOT EXISTS authors (
    author_id SERIAL PRIMARY KEY,
    author_name VARCHAR(50),
    email VARCHAR(50) UNIQUE,
    nationality VARCHAR(50)
);
"""
create_book_table = """
CREATE TABLE IF NOT EXISTS books (
    book_id SERIAL PRIMARY KEY,
    title VARCHAR(100),
    publish_date DATE,
    isbn VARCHAR(20),
    genre VARCHAR(50),
    price NUMERIC(10, 2)
);
"""
create_book_author_table = """
CREATE TABLE IF NOT EXISTS book_author (
    book_id INT,
    author_id INT,
    CONSTRAINT fk_book_id FOREIGN KEY (book_id) REFERENCES books(book_id),
    CONSTRAINT fk_author_id FOREIGN KEY (author_id) REFERENCES authors(author_id),
    PRIMARY KEY (book_id, author_id)
);
"""
create_review_table = """
CREATE TABLE IF NOT EXISTS reviews (
    review_id SERIAL PRIMARY KEY,
    user_id INT,
    book_id INT,
    rate INT CHECK (rate >= 1 AND rate <= 5),
    review_text TEXT,
    review_date DATE,
    FOREIGN KEY (user_id) REFERENCES users(user_id),
    FOREIGN KEY (book_id) REFERENCES books(book_id)
);
"""
create_order_table = """
CREATE TABLE IF NOT EXISTS orders (
    order_id SERIAL PRIMARY KEY,
    user_id INT,
    order_date DATE,
    total_amount NUMERIC(10, 2),
    order_created DATE,
    order_completed DATE,
    FOREIGN KEY (user_id) REFERENCES users(user_id)
);
"""
create_order_book_table = """
CREATE TABLE IF NOT EXISTS order_book (
    order_id INT,
    book_id INT,
    quantity INT,
    PRIMARY KEY (order_id, book_id),
    FOREIGN KEY (order_id) REFERENCES orders(order_id),
    FOREIGN KEY (book_id) REFERENCES books(book_id)
);
"""

In [34]:
# Execute the table creation queries on the transacional database
cur_transactional.execute(create_users_table)
cur_transactional.execute(create_book_table)
cur_transactional.execute(create_author_table)
cur_transactional.execute(create_book_author_table)
cur_transactional.execute(create_review_table)
cur_transactional.execute(create_order_table)
cur_transactional.execute(create_order_book_table)

In [35]:
conn_transactional.commit()

In [36]:
try:
    # SQL statements to create the star schema tables
    create_dim_user = """
    CREATE TABLE IF NOT EXISTS dim_user (
        user_id SERIAL PRIMARY KEY,
        user_name VARCHAR(100),
        email VARCHAR(100) UNIQUE,
        phone_number VARCHAR(20),
        address VARCHAR(255)
    );
    """

    create_dim_author = """
    CREATE TABLE IF NOT EXISTS dim_author (
        author_id SERIAL PRIMARY KEY,
        author_name VARCHAR(100),
        nationality VARCHAR(50)
    );
    """

    create_dim_book = """
    CREATE TABLE IF NOT EXISTS dim_book (
        book_id SERIAL PRIMARY KEY,
        title VARCHAR(255),
        author_id INT,
        genre VARCHAR(50),
        price NUMERIC(10, 2),
        publish_date DATE,
        FOREIGN KEY (author_id) REFERENCES dim_author(author_id)
    );
    """

    create_dim_time = """
    CREATE TABLE IF NOT EXISTS dim_time (
        time_id SERIAL PRIMARY KEY,
        date DATE,
        year INT,
        month INT,
        day INT,
        quarter INT
    );
    """

    create_fact_sales = """
    CREATE TABLE IF NOT EXISTS fact_sales (
        sales_id SERIAL PRIMARY KEY,
        book_id INT,
        user_id INT,
        time_id INT,
        quantity_sold INT,
        total_amount NUMERIC(10, 2),
        FOREIGN KEY (book_id) REFERENCES dim_book(book_id),
        FOREIGN KEY (user_id) REFERENCES dim_user(user_id),
        FOREIGN KEY (time_id) REFERENCES dim_time(time_id)
    );
    """

    # Execute the table creation queries for the star schema
    cur_dwh.execute(create_dim_user)
    cur_dwh.execute(create_dim_author)
    cur_dwh.execute(create_dim_book)
    cur_dwh.execute(create_dim_time)
    cur_dwh.execute(create_fact_sales)

    # Commit the changes
    conn_dwh.commit()
    print("Star schema tables created successfully in the DWH database!")

except Exception as e:
    print(f"Error occurred: {e}")
    conn_dwh.rollback()

Star schema tables created successfully in the DWH database!


In [37]:
#alter and edit some columns to fit the data
cur_transactional.execute("ALTER TABLE authors ALTER COLUMN author_name TYPE VARCHAR(100);")
cur_transactional.execute("ALTER TABLE users ALTER COLUMN user_name TYPE VARCHAR(100);")
cur_transactional.execute("ALTER TABLE authors ALTER COLUMN email TYPE VARCHAR(100);")
cur_transactional.execute("ALTER TABLE books ALTER COLUMN genre TYPE VARCHAR(100);")


In [21]:
from sqlalchemy import create_engine
import pandas as pd

DATABASE_TYPE = 'postgresql'  
DBAPI = 'psycopg2'  
USER = 'user'
PASSWORD = 'password'
HOST = 'localhost'  
PORT = '5433'  
DATABASE = 'transactional_db'

# Create a database URI
db_uri = f"{DATABASE_TYPE}+{DBAPI}://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}"

# Create a SQLAlchemy engine
engine = create_engine(db_uri)

In [61]:
from sqlalchemy import create_engine
import pandas as pd

DATABASE_TYPE = 'postgresql'  
DBAPI = 'psycopg2'  
USER = 'user'
PASSWORD = 'password'
HOST = 'localhost'  
PORT = '5434'  
DATABASE = 'dwh_db'

# Create a database URI
db_uri = f"{DATABASE_TYPE}+{DBAPI}://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}"

# Create a SQLAlchemy engine
engine_dwh= create_engine(db_uri)

In [None]:
DATABASE_TYPE = 'postgresql'  
DBAPI = 'psycopg2'  
USER = 'user'
PASSWORD = 'password'
HOST = 'localhost'  
PORT = '5433'  
DATABASE = 'transactional_db'

# Create a database URI
db_uri = f"{DATABASE_TYPE}+{DBAPI}://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}"

# Create a SQLAlchemy engine
engine_tr= create_engine(db_uri)

In [66]:
from datetime import datetime

with DAG("ETL_DAG", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False) as dag:
    
    def extract():
        # Extract data from the transactional database
        users_query = "SELECT * FROM users;"
        authors_query = "SELECT * FROM authors;"
        books_query = "SELECT * FROM books;"
        reviews_query = "SELECT * FROM reviews;"
        orders_query = "SELECT * FROM orders;"
        order_book_query = "SELECT * FROM order_book;"

        users_df = pd.read_sql(users_query, engine_tr)
        authors_df = pd.read_sql(authors_query, engine_tr)
        books_df = pd.read_sql(books_query, engine_tr)
        reviews_df = pd.read_sql(reviews_query, engine_tr)
        orders_df = pd.read_sql(orders_query, engine_tr)
        order_book_df = pd.read_sql(order_book_query, engine_tr)

        return users_df, authors_df, books_df, reviews_df, orders_df, order_book_df

    def transform(users_df, authors_df, books_df, reviews_df, orders_df, order_book_df):
        # Transform the data
        users_df = users_df.rename(columns={"user_id": "user_id", "user_name": "user_name", "email": "email", 
                                             "wallet_balance": "wallet_balance", "phone_number": "phone_number", 
                                             "address": "address"})
        authors_df = authors_df.rename(columns={"author_id": "author_id", "author_name": "author_name", 
                                                 "email": "email", "national": "national"})
        books_df = books_df.rename(columns={"book_id": "book_id", "title": "title", "publish_date": "publish_date", 
                                             "isbn": "isbn", "genre": "genre", "price": "price"})
        reviews_df = reviews_df.rename(columns={"review_id": "review_id", "user_id": "user_id", 
                                                 "book_id": "book_id", "rate": "rate", 
                                                 "review_text": "review_text", "review_date": "review_date"})
        orders_df = orders_df.rename(columns={"order_id": "order_id", "user_id": "user_id", 
                                               "order_date": "order_date", "total_amount": "total_amount", 
                                               "order_created": "order_created", "order_completed": "order_completed"})
        order_book_df = order_book_df.rename(columns={"order_id": "order_id", "book_id": "book_id", 
                                                       "quantity": "quantity"})

        return users_df, authors_df, books_df, reviews_df, orders_df, order_book_df

    def load(users_df, authors_df, books_df, reviews_df, orders_df, order_book_df):
        # Load the data into the data warehouse
        users_df.to_sql("users", engine_dwh, if_exists="replace", index=False)
        authors_df.to_sql("authors", engine_dwh, if_exists="replace", index=False)
        books_df.to_sql("books", engine_dwh, if_exists="replace", index=False)
        reviews_df.to_sql("reviews", engine_dwh, if_exists="replace", index=False)
        orders_df.to_sql("orders", engine_dwh, if_exists="replace", index=False)
        order_book_df.to_sql("order_book", engine_dwh, if_exists="replace", index=False)

    # Define tasks
    extract_phase = PythonOperator(
        task_id="extract_phase",
        python_callable=extract,
        provide_context=True,
    )

    transform_phase = PythonOperator(
        task_id="transform_phase",
        python_callable=transform,
        op_args=["{{ task_instance.xcom_pull(task_ids='extract_phase') }}"],
        provide_context=True,
    )

    load_phase = PythonOperator(
        task_id="load_phase",
        python_callable=load,
        op_args=["{{ task_instance.xcom_pull(task_ids='transform_phase') }}"],
        provide_context=True,
    )

    email_status = EmailOperator(
        task_id='email_on_success',
        to='Kariemg05@gmail.com',
        subject='Pipeline Success',
        html_content='The pipeline has completed successfully.',
    )

    # Set task dependencies
    extract_phase >> transform_phase >> load_phase >> email_status


ArgumentError: Could not parse SQLAlchemy URL from string 'your_database_url'