# ETL pipeline and Data warehouse using Python and Postgresql

## About This ETL pipeline
1. **Extract data** from CSV files i-e product_info.csv and product_reviews.csv. These data belong to an ecommerce website where product_info.csv contains description of products and product_reviews.csv contain the reviews given by customers for that product. The data is loaded from csv files to a dataframe using Pandas library
2. **Transform data**. After extracting data from source dataset, the dataset will be transformed to a suitable form. Transformation will include steps like data cleaning, new columns generation etc. Data transformation is done using Pandas library
3. **Load data**. Finally load the transformed data into the datawarehouse. Data warehouse is created using Postgresql

## About the Datawarehouse
Here, in this project, I have created a simple datawarehouse to be stored in postgresql. This datawarehouse follows star schema will a fact table and few dimension tables.  
The data model for the datawarehouse I created is shown below:  
![Data Warehouse Data Model](diagrams/data_warehouse_schema.jpg)

Here **Fact Table:** tbl_product_reviews -> Customer Reviews of products  
And **Dimension Tables:**  
1. tbl_product: Details of product reviewed
2. tbl_brand: Details of Brand on product reviewed
3. tbl_reviewer: Details of reviewer who reviewed the product
4. tbl_date: Details of date when product was reviewed

### Importing necessary libraries

In [1]:
# For postgresql db connection
import psycopg2
# for preprocessing
import pandas as pd 
import numpy as np
# for loading data from env file
import os
from dotenv import load_dotenv
# To check time
import time 

#### Dataset paths

In [2]:
dataset1_path = "datasets/product_info.csv"
dataset2_path = "datasets/product_reviews.csv"

#### Environment variables

In [3]:
load_dotenv()
db_host = os.environ.get('DB_HOST')
db_user = os.environ.get('DEFAULT_PG_USER')
db_pwd = os.environ.get('DEFAULT_PG_PASSWORD')
db_default_db = os.environ.get('DEFAULT_DB_NAME')
datawarehouse_name = os.environ.get('DATAWAREHOUSE_NAME')

#### SQL Queries

In [4]:
CREATE_TABLE_QUERIES = [
    """ 
    CREATE TABLE IF NOT EXISTS tbl_product(
        product_id TEXT PRIMARY KEY,
        product_name TEXT,
        avg_product_rating FLOAT,
        product_price FLOAT,
        product_reviews_count FLOAT,
        favorites_count INT,
        variations_count INT,
        product_category TEXT
    )
    """,
    """
    CREATE TABLE IF NOT EXISTS tbl_brand(
        brand_id INT PRIMARY KEY,
        brand_name TEXT UNIQUE
    )
    """,
    """ 
    CREATE TABLE IF NOT EXISTS tbl_reviewer(
        reviewer_id BIGINT PRIMARY KEY,
        reviewer_skin_tone TEXT,
        reviewer_skin_type TEXT,
        reviewer_eye_color TEXT,
        reviewer_hair_color TEXT
    )
    """,
    """ 
    CREATE TABLE IF NOT EXISTS tbl_date(
        date_id TIMESTAMP PRIMARY KEY,
        year INT,
        month INT,
        day INT
    )
    """,
    """ 
    CREATE TABLE IF NOT EXISTS tbl_product_reviews(
        review_id INT PRIMARY KEY,
        product_id TEXT REFERENCES tbl_product(product_id),
        brand_id INT REFERENCES tbl_brand(brand_id),
        reviewer_id BIGINT REFERENCES tbl_reviewer(reviewer_id),
        date_id TIMESTAMP REFERENCES tbl_date(date_id),
        review_title TEXT,
        review_text TEXT,
        review_rating INT
    )
    """
]

DROP_TABLE_QUERIES = [
    """ 
    DROP TABLE tbl_product_reviews;
    """,
    """ 
    DROP TABLE tbl_product;
    """,
    """ 
    DROP TABLE tbl_brand;
    """,
    """ 
    DROP TABLE tbl_reviewer;
    """,
    """ 
    DROP TABLE tbl_date;
    """
]

TRUNCATE_TABLE_QUERIES = [
    """ 
    TRUNCATE TABLE tbl_product_reviews;
    """,
    """ 
    TRUNCATE TABLE tbl_product CASCADE;
    """,
    """ 
    TRUNCATE TABLE tbl_brand CASCADE;
    """,
    """ 
    TRUNCATE TABLE tbl_reviewer CASCADE;
    """,
    """ 
    TRUNCATE TABLE tbl_date CASCADE;
    """
]

INSERT_TABLE_QUERIES = {
    "product": 
        """ 
        INSERT INTO tbl_product(product_id, product_name, avg_product_rating, product_price, product_reviews_count, favorites_count, variations_count, product_category) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
        """,
    "brand":
        """ 
        INSERT INTO tbl_brand(brand_id, brand_name) VALUES (%s,%s)
        """,
    "reviewer":
        """ 
        INSERT INTO tbl_reviewer(reviewer_id, reviewer_skin_tone, reviewer_skin_type, reviewer_eye_color, reviewer_hair_color) VALUES (%s,%s,%s,%s,%s)
        """,
    "date":
        """ 
        INSERT INTO tbl_date(date_id, year, month, day) VALUES (%s,%s,%s,%s)
        """,
    "reviews":
        """ 
        INSERT INTO tbl_product_reviews(review_id, product_id, brand_id, reviewer_id, date_id, review_title, review_text, review_rating) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
        """
}


#### Utility Functions

In [5]:
# This function will check whether the value is a numeric value or not
def checkNumeric(val):
    return True if str(val).isdigit() else False

In [6]:
# This function is used for creating datawarehouse
def create_datawarehouse():
    print("Started creating data warehouse")
    # connection to connect to default postgresql database
    db_conn_string = f"host={db_host} dbname={db_default_db} user={db_user} password={db_pwd}"
    conn = psycopg2.connect(db_conn_string)
    conn.set_session(autocommit=True)
    cur = conn.cursor() 
    
    # creating the datawarehouse 
    cur.execute(f"DROP DATABASE IF EXISTS {datawarehouse_name}")
    cur.execute(f"CREATE DATABASE {datawarehouse_name}")
    # close connection to default database
    print("Data warehouse created succesfully")
    conn.close()

In [7]:
# This fuctio is used for connecting to the created datawarehouse
def connect_datawarehouse():
    # creating connection to datawarehouse 
    dw_string = f"host={db_host} dbname={datawarehouse_name} user={db_user} password={db_pwd}"
    dw_conn = psycopg2.connect(dw_string)
    # creating datawarehouse cursor
    dw_cur = dw_conn.cursor()
    print(f"Connected to datawarehouse")
    return dw_conn, dw_cur

In [8]:
# This function is used to restart the datawarehouse connection in case of errors
def restart_connection(conn):
    # close the current datawarehouse connection
    conn.close()
    # start new connection
    dw_conn, dw_cur = connect_datawarehouse() 
    return dw_conn, dw_cur

In [9]:
# Creating tables for fact table and dimension table in the datawarehouse
def create_tables(cur,conn,create_table_queries):
    print(f"Creating tables inside data warehouse")
    try:
        for query in create_table_queries:
            cur.execute(query)
        conn.commit() 
        print("Tables created successfully")
    except Exception as e:
        conn.rollback()
        print(f"Tables weren't dropped.\n Following error encountered: {e}")

In [10]:
# This function is used to drop all the existing tables in datawarehouse
def drop_existing_tables(cur,conn,drop_table_queries):
    print("Dropping existing tables from datawarehouse")
    try:
        for query in drop_table_queries: 
            cur.execute(query)
        conn.commit() 
        print("Existing tables dropped")
    except Exception as e:
        conn.rollback() 
        print(f"Tables weren't dropped.\n Following error encountered: {e}")

In [11]:
# This function is used to truncate all the existing tables in datawarehouse
def truncate_existing_tables(cur,conn,truncate_table_queries):
    print("Truncating existing tables from datawarehouse")
    try:
        for query in truncate_table_queries: 
            cur.execute(query)
        conn.commit() 
        print("Existing tables truncated")
    except Exception as e:
        conn.rollback() 
        print(f"Tables weren't truncated.\n Following error encountered: {e}")

In [22]:
# This function is used to insert data to the tables in datawarehouse
def insert_data_to_tables(cur, conn, insert_table_queries,product_df,brand_df,reviewer_df,date_df,reviews_df):
    last_val = 0
    try:
        # insert into product table from product df
        for i, row in product_df.iterrows():
            cur.execute(insert_table_queries["product"],list(row))
        print("Loaded data to product table")
        # insert into brands table from brand df
        for i, row in brand_df.iterrows():
            cur.execute(insert_table_queries["brand"],list(row))
        print("Loaded data to Brand table")
        # insert into reviewer table from reviewer df
        for i, row in reviewer_df.iterrows():
            last_val = i
            cur.execute(insert_table_queries["reviewer"],list(row))
        print("Loaded data to Reviewer table")
        # insert into date table from date df 
        for i,row in date_df.iterrows():
            cur.execute(insert_table_queries["date"],list(row))
        print("Loaded data to Date table")
        # insert into reviews table from reviews df 
        for i,row in reviews_df.iterrows():
            last_val = i
            cur.execute(insert_table_queries["reviews"],list(row))
        print("Loaded data to Product Reviews table")
        conn.commit()
        print("All Data was inserted successfully into tables")
    except Exception as e:
        print(last_val)
        print("Query execution failed, so data wasn't inserted")
        print(f"Following error occured: {e}")
        conn.rollback()

#### Extract Step

In [13]:
def extract_data():
   print("Starting Extract Step . . .")
   df1 = pd.read_csv(dataset1_path)
   df2 = pd.read_csv(dataset2_path,index_col=0)
   # considering only the required columns from both dataframes
   df1 = df1[['product_id', 'brand_id', 'loves_count','rating', 'reviews','primary_category','child_count']]
   df2 = df2[['author_id', 'rating', 'submission_time', 'review_text',
      'review_title', 'skin_tone', 'eye_color', 'skin_type', 'hair_color',
      'product_id', 'product_name', 'brand_name', 'price_usd']]
   print("Extract Step Completed!!!")
   return df1, df2 

#### Transform Step

In [14]:
def transform_data(df1,df2):
    print("Starting Transform Step . . .")
    # merge two datasets based on product_id 
    df_merged = pd.merge(df1,df2,how="inner",on="product_id")
    # Rename Columns 
    rename_dict = {
    "loves_count": "favorites_count",
    "rating_x": "avg_product_rating",
    "reviews": "product_reviews_count",
    "primary_category": "product_category",
    "child_count": "variations_count",
    "rating_y": "review_rating",
    "submission_time": "full_date",
    "skin_tone": "reviewer_skin_tone",
    "skin_type": "reviewer_skin_type",
    "eye_color": "reviewer_eye_color",
    "hair_color": "reviewer_hair_color",
    "price_usd": "product_price",
    "author_id": "reviewer_id"
    }
    df_merged = df_merged.rename(columns=rename_dict)
    # cleaning nan values
    # 1. drop rows with nan values in review_text as it is most essential for product_reviews
    df_merged = df_merged.dropna(subset=["review_text"])
    #2. review_title is optional, so for rows with nan in review_title but certain values in review_text, fill the nan values with a default placeholder value
    df_merged["review_title"] = df_merged["review_title"].fillna("Review Provided")
    # 3. For categorical columns like reviewer_skin_tone, reviewer_skin_type, reviewer_hair_color, reviewer_eye_color replace the nan values with mode
    df_merged["reviewer_skin_tone"] = df_merged["reviewer_skin_tone"].fillna(df_merged["reviewer_skin_tone"].mode()[0])
    df_merged["reviewer_eye_color"] = df_merged["reviewer_eye_color"].fillna(df_merged["reviewer_eye_color"].mode()[0])
    df_merged["reviewer_skin_type"] = df_merged["reviewer_skin_type"].fillna(df_merged["reviewer_skin_type"].mode()[0])
    df_merged["reviewer_hair_color"] = df_merged["reviewer_hair_color"].fillna(df_merged["reviewer_hair_color"].mode()[0])
    # Creating new columns for date 
    df_merged["full_date"] = pd.to_datetime(df_merged["full_date"])
    df_merged["year"] = df_merged["full_date"].dt.year
    df_merged["month"] = df_merged["full_date"].dt.month
    df_merged["day"] = df_merged["full_date"].dt.day
    # Converting the values in reviewer_id column to int from object
    # checking for non-numeric reviewer_id
    df_merged["is_numeric"] = df_merged["reviewer_id"].apply(checkNumeric)
    # removing non numeric reviewer ids
    df_merged = df_merged[df_merged["is_numeric"] == True]
    # dropping the is_numeric column
    df_merged.drop(columns=["is_numeric"],inplace=True)
    # converting reviewer_id to int
    df_merged["reviewer_id"] = df_merged["reviewer_id"].apply(lambda x: int(x))
    # Creating multiple dataframes for fact and dimension tables
    # Product Reviews Table: Fact Table
    reviews_df = df_merged[['product_id','brand_id','reviewer_id','full_date','review_title','review_text','review_rating']]
    reviews_df = reviews_df.rename(columns={'full_date': 'date_id'})
    reviews_df = reviews_df.reset_index(drop=True)
    reviews_df.insert(0, 'review_id', reviews_df.index + 1)
    # Product table: Dimension table
    product_df = df_merged[['product_id', 'product_name', 'avg_product_rating', 'product_price', 'product_reviews_count', 'favorites_count', 'variations_count', 'product_category']]
    # To keep only unique product descriptions in product_df
    product_df = product_df.drop_duplicates("product_id").reset_index(drop=True)
    # Brand table: dimension table
    brand_df = df_merged[['brand_id', 'brand_name']]
    # To keep only the unique brand details in brands dataframe
    brand_df = brand_df.drop_duplicates("brand_id").reset_index(drop=True)
    # Reviewer table: dimension table
    reviewer_df = df_merged[['reviewer_id', 'reviewer_skin_tone', 'reviewer_skin_type', 'reviewer_eye_color', 'reviewer_hair_color']]
    # To keep only unique reviewer details
    reviewer_df = reviewer_df.drop_duplicates("reviewer_id").reset_index(drop=True)
    # Date table: dimension table
    date_df = df_merged[['full_date', 'year', 'month', 'day']]
    date_df = date_df.rename(columns={"full_date": "date_id"})
    date_df = date_df.drop_duplicates("date_id").reset_index(drop=True)
    print("Transform Step Completed")
    # Return the dataframes for fact and dimension tables
    return reviews_df, product_df, brand_df, reviewer_df, date_df

#### Load Step

In [23]:
def load_data(reviews_df,product_df,brand_df,reviewer_df,date_df):
    print("Starting Load Step . . .")
    # firstly datawarehouse named etl_dw is created
    create_datawarehouse()
    try:
        # Then we connect to this datawarehouse using the user credentials
        dw_conn, dw_cur = connect_datawarehouse()
    except:
        dw_conn, dw_cur = restart_connection()
    # Then create tables in etl_dw for fact and dimension tables
    create_tables(dw_cur,dw_conn,CREATE_TABLE_QUERIES)
    #----------------------------------------------------------------------
    # # In some cases we may need to truncate the tables
    # truncate_existing_tables(dw_cur,dw_conn,TRUNCATE_TABLE_QUERIES)
    # # In some cases we may need to drop the tables
    # drop_existing_tables(dw_cur,dw_conn,DROP_TABLE_QUERIES)
    #----------------------------------------------------------------------
    # To insert the data to database tables from dataframe
    insert_data_to_tables(dw_cur, dw_conn, INSERT_TABLE_QUERIES,product_df,brand_df,reviewer_df,date_df,reviews_df)
    # To close the connection with data warehouse
    dw_conn.close()

    print("Load Step Completed.\nCheck your datawarehouse")

### Complete ETL Pipeline

In [20]:
def run_etl_pipeline():
    df1,df2 = extract_data()
    reviews_df, product_df, brand_df, reviewer_df, date_df = transform_data(df1,df2)
    load_data(reviews_df, product_df, brand_df, reviewer_df, date_df)

### Running the ETL pipeline

In [24]:
start_time = time.time()
print("Started running etl pipeline")
run_etl_pipeline() 
end_time = time.time()
print(f"ETL pipeline ran succesfully\nData was succesfully loaded\n Total time taken: {end_time - start_time} seconds")

Started running etl pipeline
Starting Extract Step . . .


  df2 = pd.read_csv(dataset2_path,index_col=0)


Extract Step Completed!!!
Starting Transform Step . . .
Transform Step Completed
Starting Load Step . . .
Started creating data warehouse
Data warehouse created succesfully
Connected to datawarehouse
Creating tables inside data warehouse
Tables created successfully
Loaded data to product table
Loaded data to Brand table
Loaded data to Reviewer table
Loaded data to Date table
Loaded data to Product Reviews table
All Data was inserted successfully into tables
Load Step Completed.
Check your datawarehouse
ETL pipeline ran succesfully
Data was succesfully loaded
 Total time taken: 33.44708037376404 seconds


### Final Outputs
#### 1. Data Warehouse
![Data Warehouse](diagrams/op1.png)
#### 2. Tables in Data Warehouse
![Data warehouse tables](diagrams/op2.png)
#### 3. ER Diagram of Data Warehouse
![ERD](diagrams/erd_datawarehouse.png)
#### 4. Query results for Data Warehouse
![Query1](diagrams/product_table_query_op.png)
![Query2](diagrams/brand_table_query_op.png)
![Query3](diagrams/reviewer_table_query_op.png)
![Query4](diagrams/date_table_query_op.png)
![Query5](diagrams/reviews_table_query_op.png)