In [1]:
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError

import psycopg2
from psycopg2 import OperationalError

import pandas as pd

In [2]:
def create_db_connection():
    """
    Create a database connection and return the connection object
    Returns:
        connection: SQLAlchemy connection object if connection is successful, None otherwise
    """
    try:
        engine = create_engine('postgresql://postgres:superadmin@localhost:5432/db-ecommerce')
        engine.connect()
        print("Connection to PostgreSQL DB successful")
    except OperationalError as e:
        print("Error occurred during connection:", e)
        connection = None

    return engine

# test connection
connection = create_db_connection()

Connection to PostgreSQL DB successful


In [3]:
def create_conn():
    """
    Create a database connection and return the connection object
    Returns:
        connection: psycopg2 connection object if connection is successful, None otherwise
    """
    conn = None
    try:
        conn = psycopg2.connect(
            database="db-ecommerce",
            user="postgres",
            password="superadmin",
            host="localhost",
            port="5432",
        )
        print("Connection to PostgreSQL DB successful")
    except OperationalError as e:
        print(f"The error '{e}' occurred")
    return conn

engine = create_conn()

Connection to PostgreSQL DB successful


In [4]:
def query_data_from_db(engine, query):
    """
    Select data from a database using a given SQL query
    Args:
        connection: A connection object
        query (str): SQL query
    Returns:
        df: A dataframe containing the selected data
    """
    df = None
    try:
        df = pd.read_sql_query(query, con=engine)
        print("Data fetched successfully.")
    except Exception as e:
        print(f"Error occurred during data fetching: {e}")
    return df

In [5]:
query = """
    SELECT 
        table_name, column_name, data_type 
    FROM 
        information_schema.columns
    WHERE 
        table_schema = 'public'
    ORDER BY 
        table_name, ordinal_position;
"""
table_df = query_data_from_db(engine, query)

table_df

Data fetched successfully.


  df = pd.read_sql_query(query, con=engine)


Unnamed: 0,table_name,column_name,data_type
0,product_information,sku_product,character varying
1,product_information,product_name,character varying
2,product_information,ordered_qty,integer
3,product_information,stock_lvl,integer
4,product_information,restocking_lead_time,integer
5,product_information,sentiment_score,real
6,product_information,sentiment_magnitude,real
7,product_list,sku_product,character varying
8,product_list,product_name_v2,character varying
9,transaction_records,id_unique_visitor,character varying


In [6]:
query = """
    SELECT 
        *
    FROM 
        transaction_records;
"""
table_df = query_data_from_db(engine, query)

table_df

Data fetched successfully.


  df = pd.read_sql_query(query, con=engine)


Unnamed: 0,id_unique_visitor,channel_group,hits_time,geo_network_country,geo_network_city,total_transaction_revenue,number_of_transactions,time_spent,page_views,date,...,product_category_v2,product_variant,item_currency_code,item_qty,item_revenue,transaction_revenue,transaction_id,page_title,page_search_keyword,page_path_lvl_1


In [7]:
# load csv files for each table
df_products = pd.read_csv("../raw-datasets/products.csv")
df_productlist = pd.read_csv("../raw-datasets/product_list.csv")
df_transactions = pd.read_csv("../raw-datasets/rev_transactions.csv")

# displaying each dataframe
display(df_products.head(3), df_productlist.head(3), df_transactions.head(3))

Unnamed: 0,SKU,name,orderedQuantity,stockLevel,restockingLeadTime,sentimentScore,sentimentMagnitude
0,GGADFBSBKS42347,PC gaming speakers,0,100,1,,
1,GGOEGAAX0581,Women's Colorblock Tee White,0,0,8,0.8,2.0
2,GGOEGAAX0596,Men's Quilted Insulated Vest Black,26,32,8,0.8,2.0


Unnamed: 0,productSKU,v2ProductName
0,10 55401,Lip Balm
1,9180842,Maze Pen
2,GGOEGGOA017399,Maze Pen


Unnamed: 0,fullVisitorId,channelGrouping,hits_time,geoNetwork_country,geoNetwork_city,totals_totalTransactionRevenue,totals_transactions,totals_timeOnSite,totals_pageviews,date,...,hits_product_v2ProductCategory,hits_product_productVariant,hits_item_currencyCode,hits_item_itemQuantity,hits_item_itemRevenue,hits_transaction_transactionRevenue,hits_transaction_transactionId,hits_page_pageTitle,hits_page_searchKeyword,hits_page_pagePathLevel1
0,7851878234225373633,Organic Search,1819831,Peru,La Victoria,42000000,1,1834,40,20160807,...,(not set),BLUE,USD,,,42000000,ORD20160807983,Checkout Confirmation,,/ordercompleted.html
1,7851878234225373633,Organic Search,1819831,Peru,La Victoria,42000000,1,1834,40,20160807,...,(not set),Single Option Only,USD,,,42000000,ORD20160807983,Checkout Confirmation,,/ordercompleted.html
2,7851878234225373633,Organic Search,1819831,Peru,La Victoria,42000000,1,1834,40,20160807,...,(not set),GREEN,USD,,,42000000,ORD20160807983,Checkout Confirmation,,/ordercompleted.html


In [8]:
# displaying information for each dataframe
display(df_products.info(), df_productlist.info(), df_transactions.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1092 entries, 0 to 1091
Data columns (total 7 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   SKU                 1092 non-null   object 
 1   name                1092 non-null   object 
 2   orderedQuantity     1092 non-null   int64  
 3   stockLevel          1092 non-null   int64  
 4   restockingLeadTime  1092 non-null   int64  
 5   sentimentScore      1091 non-null   float64
 6   sentimentMagnitude  1091 non-null   float64
dtypes: float64(2), int64(3), object(2)
memory usage: 59.8+ KB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2273 entries, 0 to 2272
Data columns (total 2 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   productSKU     2273 non-null   object
 1   v2ProductName  2273 non-null   object
dtypes: object(2)
memory usage: 35.6+ KB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20000 entries, 0 to 1

None

None

None

In [9]:
# rename columns from each dataframe so it will matched with the columns name in the tables
df_products = df_products.rename(columns={
    'SKU':'sku_product', 'name':'product_name', 'orderedQuantity':'ordered_qty', 'stockLevel':'stock_lvl', 'restockingLeadTime':'restocking_lead_time',
    'sentimentScore':'sentiment_score', 'sentimentMagnitude':'sentiment_magnitude'
})

df_productlist = df_productlist.rename(columns={
    'productSKU':'sku_product', 'v2ProductName':'product_name_v2'
})

df_transactions = df_transactions.rename(columns={
    'fullVisitorId':'id_unique_visitor', 'channelGrouping':'channel_group', 'geoNetwork_country':'geo_network_country',
       'geoNetwork_city':'geo_network_city', 'totals_totalTransactionRevenue':'total_transaction_revenue',
       'totals_transactions':'number_of_transactions', 'totals_timeOnSite':'time_spent', 'totals_pageviews':'page_views',
       'visitId':'id_visit', 'hits_type':'hits_type', 'hits_product_productRefundAmount':'product_refund_amount',
       'hits_product_productQuantity':'product_qty', 'hits_product_productPrice':'product_price',
       'hits_product_productRevenue':'product_revenue', 'hits_product_productSKU':'sku_product',
       'hits_product_v2ProductName':'product_name_v2', 'hits_product_v2ProductCategory':'product_category_v2',
       'hits_product_productVariant':'product_variant', 'hits_item_currencyCode':'item_currency_code',
       'hits_item_itemQuantity':'item_qty', 'hits_item_itemRevenue':'item_revenue',
       'hits_transaction_transactionRevenue':'transaction_revenue', 'hits_transaction_transactionId':'transaction_id',
       'hits_page_pageTitle':'page_title', 'hits_page_searchKeyword':'page_search_keyword',
       'hits_page_pagePathLevel1':'page_path_lvl_1'
})



display(df_products.head(3), df_productlist.head(3), df_transactions.head(3))

Unnamed: 0,sku_product,product_name,ordered_qty,stock_lvl,restocking_lead_time,sentiment_score,sentiment_magnitude
0,GGADFBSBKS42347,PC gaming speakers,0,100,1,,
1,GGOEGAAX0581,Women's Colorblock Tee White,0,0,8,0.8,2.0
2,GGOEGAAX0596,Men's Quilted Insulated Vest Black,26,32,8,0.8,2.0


Unnamed: 0,sku_product,product_name_v2
0,10 55401,Lip Balm
1,9180842,Maze Pen
2,GGOEGGOA017399,Maze Pen


Unnamed: 0,id_unique_visitor,channel_group,hits_time,geo_network_country,geo_network_city,total_transaction_revenue,number_of_transactions,time_spent,page_views,date,...,product_category_v2,product_variant,item_currency_code,item_qty,item_revenue,transaction_revenue,transaction_id,page_title,page_search_keyword,page_path_lvl_1
0,7851878234225373633,Organic Search,1819831,Peru,La Victoria,42000000,1,1834,40,20160807,...,(not set),BLUE,USD,,,42000000,ORD20160807983,Checkout Confirmation,,/ordercompleted.html
1,7851878234225373633,Organic Search,1819831,Peru,La Victoria,42000000,1,1834,40,20160807,...,(not set),Single Option Only,USD,,,42000000,ORD20160807983,Checkout Confirmation,,/ordercompleted.html
2,7851878234225373633,Organic Search,1819831,Peru,La Victoria,42000000,1,1834,40,20160807,...,(not set),GREEN,USD,,,42000000,ORD20160807983,Checkout Confirmation,,/ordercompleted.html


In [10]:
def rename_columns(csv_file, df):
    """
    Renames columns of the DataFrame based on the loaded CSV file.

    Args:
        csv_file (str): Path of the loaded CSV file.
        df (pandas.DataFrame): DataFrame to be renamed.

    Returns:
        pandas.DataFrame: DataFrame with renamed columns.
    """
    if csv_file == '../raw-datasets/products.csv':
        df = df.rename(columns={
            'SKU': 'sku_product',
            'name': 'product_name',
            'orderedQuantity': 'ordered_qty',
            'stockLevel': 'stock_lvl',
            'restockingLeadTime': 'restocking_lead_time',
            'sentimentScore': 'sentiment_score',
            'sentimentMagnitude': 'sentiment_magnitude'
        })

    elif csv_file == '../raw-datasets/product_list.csv':
        df = df.rename(columns={
            'productSKU': 'sku_product',
            'v2ProductName': 'product_name_v2'
        })

    elif csv_file == '../raw-datasets/rev_transactions.csv':
        df = df.rename(columns={
            'fullVisitorId': 'id_unique_visitor',
            'channelGrouping': 'channel_group',
            'geoNetwork_country': 'geo_network_country',
            'geoNetwork_city': 'geo_network_city',
            'totals_totalTransactionRevenue': 'total_transaction_revenue',
            'totals_transactions': 'number_of_transactions',
            'totals_timeOnSite': 'time_spent',
            'totals_pageviews': 'page_views',
            'visitId': 'id_visit',
            'hits_type': 'hits_type',
            'hits_product_productRefundAmount': 'product_refund_amount',
            'hits_product_productQuantity': 'product_qty',
            'hits_product_productPrice': 'product_price',
            'hits_product_productRevenue': 'product_revenue',
            'hits_product_productSKU': 'sku_product',
            'hits_product_v2ProductName': 'product_name_v2',
            'hits_product_v2ProductCategory': 'product_category_v2',
            'hits_product_productVariant': 'product_variant',
            'hits_item_currencyCode': 'item_currency_code',
            'hits_item_itemQuantity': 'item_qty',
            'hits_item_itemRevenue': 'item_revenue',
            'hits_transaction_transactionRevenue': 'transaction_revenue',
            'hits_transaction_transactionId': 'transaction_id',
            'hits_page_pageTitle': 'page_title',
            'hits_page_searchKeyword': 'page_search_keyword',
            'hits_page_pagePathLevel1': 'page_path_lvl_1'
        })

    return df

In [11]:
def insert_data_to_db(connection, csv_file, table_name):
    """
    Insert data from a CSV file into a database table using a given database connection
    Args:
        connection: A connection object
        csv_file (str): Path to the CSV file to be inserted
        table_name (str): Name of the database table into which data is to be inserted
    Returns:
        None
    """
    try:
        df = pd.read_csv(csv_file)
        # rename columns by calling the rename_columns function
        df = rename_columns(csv_file, df)
        
        # convert 'date' column to datetime if it exists
        if 'date' in df.columns:
            df['date'] = pd.to_datetime(df['date'], format='%Y%m%d')

        # convert columns of type uint64 to string
        for col in df.columns:
            if df[col].dtype == 'uint64':
                df[col] = df[col].astype(str)

        # drop duplicates if 'sku_product' column exists
        if 'sku_product' in df.columns:
            df = df.drop_duplicates(subset='sku_product', keep='first')

        df.to_sql(table_name, connection, if_exists='append', index=False)
        print(f"Data inserted successfully into table {table_name}")
    except Exception as e:
        print(f"Error occurred during data insertion into table {table_name}:", e)

In [12]:
# set dictionary
csv_files_and_tables = {
    '../raw-datasets/products.csv': 'product_information',
    '../raw-datasets/product_list.csv': 'product_list',
    '../raw-datasets/rev_transactions.csv': 'transaction_records',
}

# insert the data into the table
for csv_file, table_name in csv_files_and_tables.items():
    insert_data_to_db(connection, csv_file, table_name)

Data inserted successfully into table product_information
Data inserted successfully into table product_list
Data inserted successfully into table transaction_records


In [13]:
transaction_query = "SELECT * FROM transaction_records LIMIT 100;"
transaction_df = query_data_from_db(engine, transaction_query)

transaction_df

Data fetched successfully.


  df = pd.read_sql_query(query, con=engine)


Unnamed: 0,id_unique_visitor,channel_group,hits_time,geo_network_country,geo_network_city,total_transaction_revenue,number_of_transactions,time_spent,page_views,date,...,product_category_v2,product_variant,item_currency_code,item_qty,item_revenue,transaction_revenue,transaction_id,page_title,page_search_keyword,page_path_lvl_1
0,7851878234225373633,Organic Search,1819831,Peru,La Victoria,42000000.0,1,1834,40,2016-08-07,...,(not set),BLUE,USD,,,42000000.0,ORD20160807983,Checkout Confirmation,,/ordercompleted.html
1,7851878234225373633,Organic Search,1819831,Peru,La Victoria,42000000.0,1,1834,40,2016-08-07,...,(not set),Single Option Only,USD,,,42000000.0,ORD20160807983,Checkout Confirmation,,/ordercompleted.html
2,7851878234225373633,Organic Search,1819831,Peru,La Victoria,42000000.0,1,1834,40,2016-08-07,...,(not set),GREEN,USD,,,42000000.0,ORD20160807983,Checkout Confirmation,,/ordercompleted.html
3,7851878234225373633,Organic Search,1819831,Peru,La Victoria,42000000.0,1,1834,40,2016-08-07,...,(not set),Single Option Only,USD,,,42000000.0,ORD20160807983,Checkout Confirmation,,/ordercompleted.html
4,7851878234225373633,Organic Search,1819831,Peru,La Victoria,42000000.0,1,1834,40,2016-08-07,...,(not set),RED,USD,,,42000000.0,ORD20160807983,Checkout Confirmation,,/ordercompleted.html
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,3490941383536951175,Direct,3105868,Japan,Minato,29100000.0,1,3106,21,2017-05-23,...,Headgear,S/M,USD,,,29100000.0,ORD201705232232,Checkout Confirmation,,/ordercompleted.html
96,1246338221591989045,Referral,2198455,Japan,Minato,102950000.0,1,2198,23,2017-05-25,...,Apparel,12M,USD,,,102950000.0,ORD201705252263,Checkout Confirmation,,/ordercompleted.html
97,1246338221591989045,Referral,2198455,Japan,Minato,102950000.0,1,2198,23,2017-05-25,...,Apparel,18M,USD,,,102950000.0,ORD201705252263,Checkout Confirmation,,/ordercompleted.html
98,6392388121934645207,Referral,815704,Japan,Minato,26350000.0,1,1738,35,2017-05-30,...,Apparel,MD,USD,,,26350000.0,ORD201705302342,Checkout Confirmation,,/ordercompleted.html


In [14]:
engine.close()

In [15]:
connection.dispose()