## Creating the Data Warehouse in a different schema

In [2]:
import pymysql

In [3]:
# Connecting to MySQL schema: lep_shop
connection = pymysql.connect(host ='localhost',
                             port=int(3306),
                             user='root',passwd='12345678')
# Creating a Cursor object
cursor = connection.cursor()

In [4]:
# Create a new schema called lep_store_dw
cursor.execute('''
drop schema if exists lep_store_dw;
''')

cursor.execute('''
create schema lep_store_dw;
''')

# Use the new schema
cursor.execute('''
use lep_store_dw;
''')

0

## Generating the Date Dimension

In [6]:
import numpy as np
import pandas as pd

In [13]:
# Create dates
start_date = pd.to_datetime('2020-07-07').date()
end_date = pd.to_datetime('2100-01-01').date()

# Get all the dates from 2020-07-07 to 2100-01-01
dates = pd.date_range(start=start_date, end=end_date, freq='D')

# Insert into a dataframe
dates_df = pd.DataFrame(dates, columns=['Date'])

In [30]:
# Date Key Column
dates_df['Date_key'] = (
    dates_df.Date.dt.year.astype(str) +
    dates_df.Date.dt.month.map("{:02d}".format) +
    dates_df.Date.dt.day.map("{:02d}".format)
)

# Day_name Column
dates_df['Day_name'] = dates.day_name()

# Month Column
month_dict = {1:'Jan',2:'Feb',3:'Mar',4:'Apr',
             5:'May',6:'Jun',7:'Jul',8:'Aug',9:'Sep',10:'Oct',
             11:'Nov',12:'Dec'}
dates_df['Month'] = dates_df.Date.dt.month
dates_df.Month.replace(month_dict,inplace=True)

# Year Column
dates_df['Year'] = dates_df.Date.dt.year

In [41]:
dates_df[dates_df['Date_key'].duplicated()]

Unnamed: 0,Date,Date_key,Day_name,Month,Year


In [33]:
len(dates_df.Date_key.unique())

29033

In [24]:
len(dates_df.Date)

29033

In [43]:
dates_df.dtypes

Date        datetime64[ns]
Date_key            object
Day_name            object
Month               object
Year                 int64
dtype: object

### Loading the Date Dimension to MySQL

In [44]:
query = '''
drop table if exists lep_dwh_date;
'''

cursor.execute(query)

query = '''
CREATE TABLE lep_dwh_date (
    Date_key VARCHAR(50) NOT NULL PRIMARY KEY,
    Dates DATE NOT NULL,
    Day_name VARCHAR(50) NOT NULL,
    Month_name VARCHAR(50) NOT NULL,
    Year_name INT NOT NULL
);
'''

cursor.execute(query)

0

In [46]:
# populate data to the dates_df table
records = [tuple(row) for row in dates_df.values]

# Create the parameterized query
query = "INSERT INTO lep_dwh_date (Dates, Date_key, Day_name, Month_name, Year_name) VALUES (%s, %s, %s, %s, %s)"

# Execute the bulk insert query with the list of tuples
cursor.executemany(query, records)

# Commit the transaction
connection.commit()

## Generating the Customers Dimension

In [47]:
# Drop the dwh_customers table if it exists
cursor.execute("DROP TABLE IF EXISTS lep_dwh_customers;")

# Create the dwh_customers table
cursor.execute("""
    CREATE TABLE lep_dwh_customers AS
    SELECT 
        c1.customer_id,
        c1.first_name,
        c1.last_name,
        c1.full_name,
        c2.country_id
    FROM lep_store.customers AS c1
    JOIN lep_store.countries AS c2 ON c1.country_id = c2.country_id
    ORDER BY customer_id;
""")

# Set customer_id as the primary key
cursor.execute("ALTER TABLE lep_dwh_customers MODIFY COLUMN customer_id INT NOT NULL PRIMARY KEY;")

# Commit the changes and close the connection
connection.commit()


## Generating the Employees Dimension

In [48]:
# Drop the table if it exists
cursor.execute("DROP TABLE IF EXISTS lep_dwh_employees;")

# Create the dwh_employees table
cursor.execute("""
    CREATE TABLE lep_dwh_employees AS
    SELECT 
        e.employee_id,
        e.first_name,
        e.last_name,
        e.full_name,
        d.department
    FROM lep_store.employees AS e
    JOIN lep_store.departments AS d ON e.department_id = d.department_id
    ORDER BY employee_id;
""")

# Set employee_id as the primary key
cursor.execute("ALTER TABLE lep_dwh_employees MODIFY COLUMN employee_id INT NOT NULL PRIMARY KEY;")

# Commit the changes and close the connection
connection.commit()

## Generating the Products Dimension

In [49]:
# Drop the table if it exists
cursor.execute("DROP TABLE IF EXISTS lep_dwh_products;")

# Create the dwh_employees table
cursor.execute("""
    CREATE TABLE lep_dwh_products AS
    SELECT *
    FROM lep_store.products
    ORDER BY product_id;
""")

# Set product_id as the primary key
cursor.execute("ALTER TABLE lep_dwh_products MODIFY COLUMN product_id VARCHAR(100) NOT NULL PRIMARY KEY;")

# Commit the changes and close the connection
connection.commit()

## Generate Fact table

In [50]:
# Drop table if exists
cursor.execute("DROP TABLE IF EXISTS lep_dwh_fact;")

# Create a new fact table
query = """
CREATE TABLE lep_dwh_fact AS 
SELECT c1.customer_id,
    e1.employee_id,
    p2.product_id,
    p2.price,
    p2.name,
    c2.country,
    d.Date_key,
    p1.date 
FROM
    lep_store.payments AS p1
        JOIN
    lep_store.customers AS c1 ON p1.customer_id = c1.customer_id
        JOIN
    lep_store.employees AS e1 ON p1.employee_id = e1.employee_id
        JOIN
    lep_store.products AS p2 ON p1.product_id = p2.product_id
        JOIN
    lep_store.countries AS c2 ON c1.country_id = c2.country_id
        JOIN
    lep_dwh_date AS d ON p1.date = d.Dates
ORDER BY d.Dates;
"""
cursor.execute(query)

# Setting the foreign keys for each dimension table
alter_table_queries = [
    "ALTER TABLE lep_dwh_fact ADD FOREIGN KEY (customer_id) REFERENCES lep_dwh_customers (customer_id);",
    "ALTER TABLE lep_dwh_fact ADD FOREIGN KEY (employee_id) REFERENCES lep_dwh_employees (employee_id);",
    "ALTER TABLE lep_dwh_fact ADD FOREIGN KEY (product_id) REFERENCES lep_dwh_products (product_id);",
    "ALTER TABLE lep_dwh_fact ADD FOREIGN KEY (Date_key) REFERENCES lep_dwh_date (Date_key);"
]

for alter_query in alter_table_queries:
    cursor.execute(alter_query)

# Commit the changes and close the cursor and connection
connection.commit()

## Create Triggers for Automating the Transfer of Data

Create various triggers that will load data from the central database into the Data Warehouse and sync the database and the Data Warehouse when certain situations occur in the organization.

**Trigger 1. insert_customer**

In [58]:
# Use the lep_store database
cursor.execute('''
use lep_store;
''')
# Create the trigger
create_trigger_query = """
CREATE TRIGGER insert_customer
AFTER INSERT ON lep_store.customers
FOR EACH ROW
INSERT INTO lep_store_dw.lep_dwh_customers
SELECT 
    c1.customer_id,
    c1.first_name,
    c1.last_name,
    c1.full_name,
    c2.country_id
FROM customers AS c1
JOIN countries AS c2 ON c1.country_id = c2.country_id
WHERE c1.customer_id = new.customer_id;
"""

cursor.execute(create_trigger_query)

# Commit the changes and close the cursor and connection
connection.commit()

In [59]:
# Insert a new value into the customers table
insert_query = """
INSERT INTO customers (customer_id, first_name, last_name, full_name, birthday, email, gender, city, street, country_id)
VALUES (1, 'John', 'Doe', 'John Doe', '1990-01-01', 'johndoe@example.com', 'Male', 'New York', '123 Main St', 1)
"""

# Execute the insert query
cursor.execute(insert_query)

# Commit the changes
connection.commit()

**Check new customer in the central database**

In [60]:
# Check the new customer in the central database
customer_id = 1  # Assuming customer_id is 1 for the new customer
select_query = f"SELECT * FROM customers WHERE customer_id = {customer_id}"

# Execute the select query
cursor.execute(select_query)

# Fetch the result
result = cursor.fetchone()

if result:
    print("New customer found:")
    print(result)
else:
    print("New customer not found.")

New customer found:
(1, 'John', 'Doe', 'John Doe', datetime.date(1990, 1, 1), 'johndoe@example.com', 'Male', 'New York', '123 Main St', 1)


**Check new customer in the data warehouse**

In [64]:
#use the data warehouse  
cursor.execute('''
use lep_store_dw;
''')
# Check the new customer in the data warehouse
customer_id = 1  # Assuming customer_id is 1 for the new customer
select_query = f"SELECT * FROM lep_store_dw.lep_dwh_customers WHERE customer_id = {customer_id}"

# Execute the select query
cursor.execute(select_query)

# Fetch the result
result = cursor.fetchone()

if result:
    print("New customer found in the data warehouse:")
    print(result)
else:
    print("New customer not found in the data warehouse.")

New customer found in the data warehouse:
(1, 'John', 'Doe', 'John Doe', 1)


**Trigger 2. insert_employee**

In [65]:
# Use the lep_store database
cursor.execute('''
use lep_store;
''')
# Create the trigger

create_trigger_query = """
CREATE TRIGGER insert_employee
AFTER INSERT ON lep_store.employees
FOR EACH ROW
INSERT INTO lep_store_dw.lep_dwh_employees
SELECT 
    e.employee_id,
    e.first_name,
    e.last_name,
    e.full_name,
    d.department
FROM employees AS e
JOIN departments AS d ON e.department_id = d.department_id
WHERE e.employee_id = NEW.employee_id;
"""

# Execute the create trigger query
cursor.execute(create_trigger_query)

# Commit the changes
connection.commit()

In [66]:
# Insert a new value into the employees table
insert_query = """
INSERT INTO employees (employee_id, first_name, last_name, full_name, email, birthday, 
                      employee_phone_number, gender, department_id, marital_status, salary, hire_date)
VALUES (1, 'John', 'Joe', 'John Joe', 'johne@example.com', '1999-01-01', '0326963733',
        'Male', 1, 'Single', 5000, '2022-01-01');
"""

# Execute the insert query
cursor.execute(insert_query)

# Commit the changes
connection.commit()

**Check new employee in the central database**

In [67]:
# Check the new customer in the central database
employee_id = 1  # Assuming customer_id is 1 for the new customer
select_query = f"SELECT * FROM employees WHERE employee_id = {employee_id}"

# Execute the select query
cursor.execute(select_query)

# Fetch the result
result = cursor.fetchone()

if result:
    print("New employee found:")
    print(result)
else:
    print("New employee not found.")

New employee found:
(1, 'John', 'Joe', 'John Joe', 'johne@example.com', datetime.date(1999, 1, 1), '0326963733', 'Male', 1, 'Single', 5000, datetime.date(2022, 1, 1))


**Check new employee in the data warehouse**

In [68]:
#use the data warehouse  
cursor.execute('''
use lep_store_dw;
''')
# Check the new customer in the data warehouse
employee_id = 1  # Assuming customer_id is 1 for the new customer
select_query = f"SELECT * FROM lep_store_dw.lep_dwh_employees WHERE employee_id = {employee_id}"

# Execute the select query
cursor.execute(select_query)

# Fetch the result
result = cursor.fetchone()

if result:
    print("New employee found in the data warehouse:")
    print(result)
else:
    print("New employee not found in the data warehouse.")

New employee found in the data warehouse:
(1, 'John', 'Joe', 'John Joe', 'IT')


**Trigger 3. new_payment**

In [69]:
# Use the lep_store database
cursor.execute('''
use lep_store;
''')
# Create the trigger
create_trigger_query = """
CREATE TRIGGER new_payment
AFTER INSERT ON payments
FOR EACH ROW
INSERT INTO lep_store_dw.lep_dwh_fact
SELECT 
    c.customer_id,
    e.employee_id,
    pr.product_id,
    pr.price,
    pr.name,
    co.country,
    d.Date_key,
    d.Dates
FROM payments AS p
JOIN customers AS c ON p.customer_id = c.customer_id
JOIN countries AS co ON c.country_id = co.country_id
JOIN employees AS e ON p.employee_id = e.employee_id
JOIN products AS pr ON p.product_id = pr.product_id
JOIN lep_store_dw.lep_dwh_date AS d ON d.Dates = p.date
WHERE p.payment_id = new.payment_id;
"""

# Execute the create trigger query
cursor.execute(create_trigger_query)

# Commit the changes
connection.commit()

In [88]:
cursor.execute('''
use lep_store;
''')

0

In [89]:
# Insert a new payment
insert_payment_query = """
INSERT INTO payments(payment_id, date, customer_id, employee_id, product_id, price)
VALUES(15825, CURDATE(), 1, 80, '1VA01927XT', 695000)
"""

# Execute the insert query
cursor.execute(insert_payment_query)

# Commit the changes
connection.commit()

**Check new payment in the central database**

In [90]:
# Check the new customer in the central database
payment_id = 15825  # Assuming customer_id is 1 for the new customer
select_query = f"SELECT * FROM payments WHERE payment_id = {payment_id}"

# Execute the select query
cursor.execute(select_query)

# Fetch the result
result = cursor.fetchone()

if result:
    print("New payment found:")
    print(result)
else:
    print("New payment not found.")

New payment found:
(15825, datetime.date(2023, 7, 8), 1, 80, '1VA01927XT', 695000.0)


**Check new employee in the data warehouse**

In [91]:
# Use the data warehouse
cursor.execute('''
USE lep_store_dw;
''')

# Check the new transaction in the data warehouse
customer_id = 1
employee_id = 80
product_id = '1VA01927XT'
select_query = f"SELECT * FROM lep_dwh_fact WHERE customer_id = {customer_id} AND employee_id = {employee_id} AND product_id = '{product_id}'"

# Execute the select query
cursor.execute(select_query)

# Fetch the result
result = cursor.fetchone()

if result:
    print("New payment found in the data warehouse:")
    print(result)
else:
    print("New payment not found in the data warehouse.")


New payment not found in the data warehouse.
