### Data pipeline project
- step1 : create database and tables | add dataset
- step2 : connect with datawardhouse and make timer for airflow
- step3 : add data for checking settings
- step4 : make datamart and connect data with datawarehouse
- step5 : connect BI and make dashboard for C-leves 

In [1]:
import pymysql
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')

In [2]:
from scripts.create_db_and_tables import (
    create_database,
    create_tables,
    alter_data_bulk,
    insert_data_bulk
)
from config.sql_queries import(
    insert_customer_table_sql,
    insert_order_table_sql,
    insert_product_table_sql,
    alt_customer_table_sql,
    alt_product_table_sql,
    alt_order_table_sql_1,
    alt_order_table_sql_2,
    alt_order_table_sql_3
)
from data.initial_data import(
    customer_data,
    product_data,
    order_data,
    customer_data_additional,
    product_data_additional,
    order_data_additional
)

from utils.db_helper import DBManager
db = DBManager(database='logistics_project')

### Step1. Create database and tables | add dataset


In [3]:
#### 1_1. Creat DB
create_database()

In [4]:
#### 1_2. Creat tables
create_tables()

In [3]:
#### 1_3. Alter table settings
alter_data_bulk(alt_customer_table_sql)
alter_data_bulk(alt_product_table_sql)

In [20]:
# 외래키 정보 리스트
foreign_keys = [
    {
        "name": "fk_customer",
        "column": "customer_id",
        "ref_table": "op_customer_table",
        "ref_column": "customer_id"
    },
    {
        "name": "fk_product",
        "column": "product_id",
        "ref_table": "op_product_table",
        "ref_column": "product_id"
    }
]

# # Step 1: 외래키 제거
# for fk in foreign_keys:
#     sql = f"ALTER TABLE op_order_table DROP FOREIGN KEY {fk['name']};"
#     db.execute_sql(sql)

# Step 2: 컬럼 수정
alter_sqls = [
    "ALTER TABLE op_order_table MODIFY COLUMN customer_id VARCHAR(20) NOT NULL;",
    "ALTER TABLE op_order_table MODIFY COLUMN product_id VARCHAR(20) NOT NULL;",
    "ALTER TABLE op_product_table ADD INDEX (product_id);"

]
for sql in alter_sqls:
    db.execute_sql(sql)

# Step 3: 외래키 다시 추가
for fk in foreign_keys:
    sql = f"""
    ALTER TABLE op_order_table 
    ADD CONSTRAINT {fk['name']} FOREIGN KEY ({fk['column']}) 
    REFERENCES {fk['ref_table']}({fk['ref_column']});
    """
    db.execute_sql(sql)


In [22]:
#### 1_4. intert_data to tables
sql_list = [insert_customer_table_sql, insert_product_table_sql, insert_order_table_sql]
table_list = [customer_data, product_data, order_data]

for sql, table in zip(sql_list, table_list):
    insert_data_bulk(sql, table)

In [23]:
display(db.fetch_table("op_customer_table").head())
display(db.fetch_table("op_product_table").head())
display(db.fetch_table("op_order_table").head())

Unnamed: 0,customer_id,phone,country,address,level_royalty
0,CUS001,010-1111-1111,Korea,"Seoul, Gangnam-gu",Gold
1,CUS002,010-2222-2222,USA,"New York, Manhattan",Silver
2,CUS003,010-3333-3333,Japan,"Tokyo, Shibuya",Platinum
3,CUS004,010-4444-4444,UK,"London, Soho",Gold
4,CUS005,010-5555-5555,France,"Paris, Champs-Elysees",Silver


Unnamed: 0,product_id,product_name,price,weight_kg,seller_company
0,PRD001,Wireless Earbuds,120.0,0.2,Samsung
1,PRD002,Smartphone Case,25.0,0.05,LG
2,PRD003,Portable Charger,50.0,0.3,Anker
3,PRD004,Bluetooth Speaker,85.0,1.2,Sony
4,PRD005,Smartwatch,250.0,0.5,Apple


Unnamed: 0,order_id,customer_id,delivery_id,delivery_fee,product_id,order_date,shipping_date,request
0,ORD001,CUS001,101,15.0,PRD001,2024-04-25 10:00:00,2024-04-26 12:00:00,Handle with care
1,ORD002,CUS002,102,20.0,PRD002,2024-04-25 11:00:00,2024-04-27 15:00:00,
2,ORD003,CUS003,103,18.0,PRD003,2024-04-25 12:30:00,2024-04-28 10:00:00,Gift wrap
3,ORD004,CUS004,104,25.0,PRD004,2024-04-25 14:00:00,2024-04-29 09:00:00,
4,ORD005,CUS005,105,22.0,PRD005,2024-04-25 15:00:00,2024-04-30 13:00:00,Urgent delivery


In [None]:
#### 1_4_alpha. intert_10 row more data to tables
table_list_add = [customer_data_additional, product_data_additional, order_data_additional]

for sql, table in zip(sql_list, table_list_add):
    insert_data_bulk(sql, table)

In [16]:
display(db.fetch_table("op_customer_table").tail())
display(db.fetch_table("op_product_table").tail())
display(db.fetch_table("op_order_table").tail())

Unnamed: 0,customer_id,phone,country,address,level_royalty
15,CUS016,010-6666-7777,Germany,"Munich, Marienplatz",Platinum
16,CUS017,010-7777-8888,Canada,"Vancouver, Downtown",Bronze
17,CUS018,010-8888-9999,Australia,"Melbourne, CBD",Gold
18,CUS019,010-9999-0000,Vietnam,"Hanoi, Old Quarter",Silver
19,CUS020,010-0000-1111,Singapore,Sentosa Island,Platinum


Unnamed: 0,product_id,product_name,price,weight_kg,seller_company
15,PRD016,Wireless Keyboard,70.0,0.9,Logitech
16,PRD017,e-Book Reader,130.0,0.5,Amazon
17,PRD018,Drone Mini,600.0,1.1,DJI
18,PRD019,Smart Lighting Kit,200.0,1.8,Philips Hue
19,PRD020,4K Webcam,150.0,0.3,Logitech


Unnamed: 0,order_id,customer_id,delivery_id,delivery_fee,product_id,order_date,shipping_date,request
15,ORD016,CUS016,116,23.0,PRD016,2024-04-27 15:00:00,2024-05-01 12:00:00,
16,ORD017,CUS017,117,20.0,PRD017,2024-04-27 16:30:00,2024-05-01 14:00:00,Quick shipping
17,ORD018,CUS018,118,28.0,PRD018,2024-04-27 17:00:00,2024-05-02 16:00:00,
18,ORD019,CUS019,119,24.0,PRD019,2024-04-27 18:00:00,2024-05-02 18:00:00,
19,ORD020,CUS020,120,26.0,PRD020,2024-04-27 19:30:00,2024-05-03 10:00:00,Urgent delivery


In [46]:
country = db.make_dataframe(""" select
                  distinct country
                  from op_customer_table
""")

### Adding Dataset
- make data with random package
- drop dataset
- insert new data

In [52]:
from faker import Faker
import random
import pandas as pd
from datetime import datetime, timedelta

fake = Faker()
random.seed(42)

# 국가 목록
country_list = country['country'].tolist()

# 1. 고객 테이블 생성
customer_data = []
for i in range(1, 301):
    customer_data.append([
        f"CUST{i:04d}",
        fake.phone_number(),
        random.choice(country_list),
        fake.address().replace('\n', ' '),
        random.choice(['Bronze', 'Silver', 'Gold', 'Platinum'])
    ])

# 2. 상품 테이블 생성
product_data = []
for i in range(1, 51):
    product_data.append([
        f"PROD{i:03d}",
        fake.word().capitalize(),
        round(random.uniform(5, 200), 2),
        round(random.uniform(0.1, 5.0), 2),
        fake.company()
    ])

# 3. 주문 테이블 생성
order_data = []
for i in range(1, 10001):
    cust = random.choice(customer_data)
    prod = random.choice(product_data)
    order_date = fake.date_time_between(start_date='-2y', end_date='-1y')
    shipping_date = order_date + timedelta(days=random.randint(1, 7))
    delivery_fee = round(random.uniform(3, 15), 2)
    # ✅ 30% 확률로 NULL, 아니면 문장 생성
    request_text = None if random.random() < 0.3 else fake.sentence(nb_words=5)
    order_data.append([
        f"ORD{i:05d}",
        cust[0],  # customer_id
        prod[0],  # product_id
        f"DLV{i:05d}",
        delivery_fee,
        order_date.strftime('%Y-%m-%d %H:%M:%S'),
        shipping_date.strftime('%Y-%m-%d %H:%M:%S'),
        request_text
    ])

# 👉 DataFrame으로 확인할 수 있어
df_customer = pd.DataFrame(customer_data, columns=['customer_id', 'phone', 'country', 'address', 'level_royalty'])
df_product = pd.DataFrame(product_data, columns=['product_id', 'product_name', 'price', 'weight_kg', 'seller_company'])
df_order = pd.DataFrame(order_data, columns=['order_id', 'customer_id', 'product_id', 'delivery_id', 'delivery_fee', 'order_date', 'shipping_date', 'request'])


In [None]:
#### 1_4. intert_data to tables
sql_list = [insert_customer_table_sql, insert_product_table_sql, insert_order_table_sql]
data_list = [df_customer, df_product, df_order]
table_list = ['op_order_table', 'op_customer_table', 'op_product_table']

In [71]:
for sql, data in zip(sql_list, data_list):
    insert_data_bulk(sql, data)

In [74]:
for table in table_list:
    display(db.fetch_table(table).head())

Unnamed: 0,order_id,customer_id,delivery_id,delivery_fee,product_id,order_date,shipping_date,request
0,ORD00001,CUST0155,DLV00001,9.41,PROD001,2024-04-04 09:58:03,2024-04-10 09:58:03,
1,ORD00002,CUST0028,DLV00002,11.37,PROD036,2024-01-28 00:51:17,2024-01-31 00:51:17,
2,ORD00003,CUST0252,DLV00003,3.15,PROD007,2024-02-23 15:13:40,2024-03-01 15:13:40,
3,ORD00004,CUST0246,DLV00004,5.21,PROD029,2023-11-14 02:10:38,2023-11-17 02:10:38,
4,ORD00005,CUST0245,DLV00005,3.78,PROD008,2023-12-25 01:14:16,2024-01-01 01:14:16,President now hit rather.


Unnamed: 0,customer_id,phone,country,address,level_royalty
0,CUST0001,001-830-979-4371x8855,USA,"6359 James Crossroad Ronaldside, PR 56083",Bronze
1,CUST0002,001-347-734-1533,France,"5443 Tamara Drive Apt. 464 South Chloe, FM 30925",Silver
2,CUST0003,749.833.4638x06228,UK,"8952 Jeffrey Mountain West Jonathan, AZ 29024",Silver
3,CUST0004,+1-732-249-8018x3500,USA,"8839 Tammy Walk Boyerfurt, AL 17488",Bronze
4,CUST0005,975.827.5971x3401,Singapore,Unit 9255 Box 9197 DPO AA 56889,Platinum


Unnamed: 0,product_id,product_name,price,weight_kg,seller_company
0,PROD001,Thus,123.15,4.82,"Rodriguez, Roberts and Morales"
1,PROD002,Put,145.08,3.91,Barrett Group
2,PROD003,Four,36.93,1.62,Abbott-Davidson
3,PROD004,Require,117.84,4.65,Velasquez and Sons
4,PROD005,Good,117.28,4.55,Robinson-Valdez


### Step2. Airflow & redshift

In [3]:
import boto3
import os

# S3 Bucket setting
BUCKET_NAME = 'logistics-data-pipeline'
PREFIX_MAP = {
    'op_customer_table': 'customer/',
    'op_product_table': 'product/',
    'op_order_table': 'order/'
}

# RDS connect setting
rds_config = {
    'host': 'db-project-pipeline.ctooemc4iu29.ap-northeast-2.rds.amazonaws.com',
    'user': 'damlalee',
    'password': 'projectpipeline3810!',
    'database': 'logistics_project',
    'port': 3306
}

# create S3 client
s3 = boto3.client('s3')

In [5]:
# 테이블 추출 -> csv 저장 -> S3업로드
def upload_table_to_s3(table_name):
    # get DF from RDS 
    conn = pymysql.connect(**rds_config)
    df = pd.read_sql(f"SELECT * FROM {table_name}", conn)
    conn.close()

    # saving files to .csv
    local_path = f'data/{table_name}.csv'
    df.to_csv(local_path, index=False)

    s3_key = PREFIX_MAP[table_name] + f"{table_name}.csv"

    #S3 upload
    s3.upload_file(local_path, BUCKET_NAME, s3_key)
    print(f"Successfully {table_name} -> S3 upload: s3://{BUCKET_NAME}/{s3_key}")

# fulfillment
for table in PREFIX_MAP:
    upload_table_to_s3(table)

Successfully op_customer_table -> S3 upload: s3://logistics-data-pipeline/customer/op_customer_table.csv
Successfully op_product_table -> S3 upload: s3://logistics-data-pipeline/product/op_product_table.csv
Successfully op_order_table -> S3 upload: s3://logistics-data-pipeline/order/op_order_table.csv


### dm용 데이터셋 저장하기 

In [10]:
# AmazonRedshift-CommandsAccessPolicy-20250502T212337
query = """
SELECT
    od.order_id,
    od.customer_id,
    cu.level_royalty,
    cu.country,
    od.product_id,
    po.product_name,
    po.price,
    od.delivery_fee,
    ABS(DATEDIFF(od.order_date, od.shipping_date)) AS delivery_days,
    ROUND(TIMESTAMPDIFF(SECOND, od.order_date, od.shipping_date) / 3600, 2) AS delivery_hours

FROM op_order_table AS od
JOIN op_customer_table AS cu ON od.customer_id = cu.customer_id
JOIN op_product_table AS po ON od.product_id = po.product_id;
;"""

conn = pymysql.connect(**db.config)
dm_dataset = pd.read_sql(query, conn)
conn.close()

dm_dataset.to_csv("data/dm_dataset.csv", index=False)


In [11]:
pd.read_csv('data/dm_dataset.csv')

Unnamed: 0,order_id,customer_id,level_royalty,country,product_id,product_name,price,delivery_fee,delivery_days,delivery_hours
0,ORD00001,CUST0155,Gold,Vietnam,PROD001,Thus,123.15,9.41,6,144.0
1,ORD00029,CUST0264,Bronze,Canada,PROD001,Thus,123.15,13.43,6,144.0
2,ORD00048,CUST0234,Bronze,Singapore,PROD001,Thus,123.15,4.73,6,144.0
3,ORD00129,CUST0029,Silver,USA,PROD001,Thus,123.15,6.62,2,48.0
4,ORD00299,CUST0096,Silver,Vietnam,PROD001,Thus,123.15,7.05,6,144.0
...,...,...,...,...,...,...,...,...,...,...
9995,ORD09796,CUST0154,Bronze,Singapore,PROD050,Start,62.77,12.95,1,24.0
9996,ORD09831,CUST0115,Bronze,Vietnam,PROD050,Start,62.77,11.58,1,24.0
9997,ORD09882,CUST0250,Gold,Australia,PROD050,Start,62.77,3.39,3,72.0
9998,ORD09949,CUST0296,Silver,UK,PROD050,Start,62.77,8.33,5,120.0
