## 1. DBConnector.py & settings.py
    - DB connection 모듈 및 공통 정보들을 각각의 파일형태로 나누어 관리

In [1]:
from db.connector import DBconnector
from settings import DB_SETTINGS

In [2]:
DB_SETTINGS['POSTGRES']

{'engine': 'postgresql',
 'orm_engine': 'postgresql',
 'host': 'localhost',
 'database': 'postgres',
 'user': 'hyunsoo',
 'password': '150808',
 'port': '5432'}

In [7]:
from db.connector import DBconnector
from settings import DB_SETTINGS

db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])

with db_connector as connected:

    con = connected.conn
    cursor = con.cursor()

    cursor.execute("SELECT * FROM lecture LIMIT 5")

    print(cursor.fetchall())

{'engine': 'postgresql',
 'orm_engine': 'postgresql',
 'conn_params': {'host': 'localhost',
  'dbname': 'postgres',
  'user': 'hyunsoo',
  'password': '150808',
  'port': '5432'},
 'mysql_conn_params': {'host': 'localhost',
  'db': 'postgres',
  'user': 'hyunsoo',
  'passwd': '150808',
  'port': 5432,
  'charset': 'utf8'},
 'orm_conn_params': 'postgresql://hyunsoo:150808@localhost:5432/postgres',
 'orm_conn': Engine(postgresql://hyunsoo:***@localhost:5432/postgres),
 'conn': <connection object at 0x00000294E9C70040; dsn: 'user=hyunsoo password=xxx dbname=postgres host=localhost port=5432', closed: 0>,
 'connect': None,
 'queries': {}}

## 2. query.py
    - 쿼리들을 파일로 관리하여 쉽게 호출할 수 있도록 작성

##### 쿼리 내용 조회하는 부분을 Class 내에 통합

In [3]:
import psycopg2
import db.postgresql_query as postgresql_query
from settings import DB_SETTINGS

class DBconnector:
    def __init__(self, host, database, user, password, port):
        self.conn_params = dict(
            host = host,
            dbname = database,
            user = user,
            password = password,
            port=  port
        )
        self.connect = self.postgres_connect()
        self.queries = postgresql_query.queries

    def __enter__(self):
        print("접속")
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.conn.close()
        print("종료")

    def postgres_connect(self):
        self.conn = psycopg2.connect(**self.conn_params)
        return self
    
    def get_query(self, table_name): 
        _query = self.queries[table_name]
        return _query

In [8]:
db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])

db_connector.get_query('lecture')

'SELECT * FROM lecture LIMIT 5'

In [7]:
from db.postgresql_query import queries

for tbl in queries.keys():
    db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])
    _query = db_connector.get_query(tbl)
    print(_query)

SELECT * FROM lecture LIMIT 5
SELECT * FROM first LIMIT 5
SELECT * FROM second LIMIT 5
SELECT * FROM third LIMIT 5
SELECT * FROM fourth LIMIT 5


## 3. extract.py
    - 쿼리를 받아 DB에 조회를 하여 결과를 pandas dataframe으로 변환

In [1]:
from db.connector import DBconnector
from settings import DB_SETTINGS
import pandas as pd

In [None]:
db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])

with db_connector as connected:
    _query = connected.get_query('lecture')
    con = connected.conn
    df = pd.read_sql(_query, con)

print(df), print(type(df))

In [4]:
# extractor 함수 생성

def extractor(db_connector, table_name):

    with db_connector as connected:
        try:
            _query = connected.get_query(table_name)
            con = connected.conn
            df = pd.read_sql(_query, con)
            return df
        
        except Exception as e:
            print(f"Error MSG : {e}")
            return False

In [None]:
db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])

return_extractor = extractor(db_connector, 'lecture')
return_extractor.head()

In [2]:
# 모듈 형태로 통합

from db.connector import DBconnector
from settings import DB_SETTINGS
from pipeline.extract import extractor

db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])
table_name = 'lecture'

return_extractor = extractor(db_connector, table_name)

return_extractor.head()

접속
종료


  df = pd.read_sql(_query, con)


Unnamed: 0,id,name,year,gender,count
0,1,Mary,1880,F,7065
1,3,Emma,1880,F,2003
2,4,Elizabeth,1880,F,1939
3,5,Minnie,1880,F,1746
4,6,Margaret,1880,F,1578


## 4. transform.py
  - Batch 날짜(년/월/일)별 저장 경로 생성 및 해당 경로 이하에 dataframe 저장
  - 이행 환경에 따라 다르게 구성될 수 있음
    - Database -> Staging Server -> Cloud / Database
    - Database -- Direct Connection --> Cloud / Database
  - 목적지 Database의 성격에 따라 추가적인 처리 함수가 포함될 수도 있음
    - Data Lake -> 거의 가공 없이 이행
    - Data Warehouse -> 결측치/공백 등 간단한 전처리를 거쳐 이행
    - Data Mart -> Group by/filter 등 성격에 맞는 데이터 처리를 거쳐 이행

#### 1) 저장 경로 생성
- Database 이름 / Table 이름 / yyyy=년 / mm=월 / dd=일 / [Table 이름].csv

In [9]:
# Batch 날짜 생성

from datetime import datetime

batch_date = datetime.now()
format_date = batch_date.strftime('%Y%m%d')


_y = format_date[:4]
_m = format_date[4:6]
_d = format_date[6:]

_y, _m, _d

('2024', '03', '22')

In [11]:
f"{batch_date:%Y}", f"{batch_date:%m}", f"{batch_date:%d}"

('2024', '03', '22')

In [22]:
# 전체 저장 경로 생성

import os

temp_path = 'C:\\Users\\user\\Desktop\\데엔 기초\\공유용\\day2\\temp_storage'

_path = os.path.join(temp_path, 'postgres', 'lecture')
_path


'C:\\Users\\user\\Desktop\\데엔 기초\\공유용\\day2\\temp_storage\\postgres\\lecture'

In [25]:
# 함수 생성

from datetime import datetime
batch_date = datetime.now().strftime('%Y%m%d')
temp_path = 'C:\\Users\\user\\Desktop\\데엔 기초\\공유용\\day2\\temp_storage'

def create_path(temp_path, batch_date):

    _y = batch_date[:4]
    _m = batch_date[4:6]
    _d = batch_date[6:]

    _path = os.path.join(temp_path, 'lecture', f'yyyy={_y}', f'mm={_m}', f'dd={_d}')

    return _path

create_path(temp_path, batch_date)

'C:\\Users\\user\\Desktop\\데엔 기초\\공유용\\day2\\temp_storage\\lecture\\yyyy=2023\\mm=11\\dd=27'

#### 2) pandas dataframe을 csv/parquet 형태로 저장

In [None]:
# 저장 폴더 생성

path = create_path(temp_path, batch_date)

os.makedirs(path, mode=777, exist_ok=True)

In [30]:
# CSV format

save_path = os.path.join(path, 'lecture.csv')
save_path

df.to_csv(save_path)

In [None]:
# JSON format

save_path = os.path.join(path, 'lecture.json')
save_path

df.to_json(save_path, orient = 'records', indent=4, force_ascii=False)

In [None]:
# parquet format

save_path = os.path.join(path, 'lecture.parquet')
save_path

df.to_parquet(save_path, engine = 'pyarrow', compression = 'gzip', index=False)

In [34]:
# 함수 생성

def save_to_file(df, path, table_name):

    if len(df) > 0:
        # 경로 생성
        os.makedirs(path, mode=777)
        save_path = os.path.join(path, f'{table_name}.csv')

        # 파일로 저장
        df.to_csv(save_path)
        return True
    else:
        print("EMPTY FILE")
        return False

In [35]:
save_to_file(df, path, table_name)

True

#### 3) 저장 경로 생성 함수 + Dataframe 저장 함수 통합

In [36]:
# transformer(create_path + save_to_file) 함수 생성

def transformer(temp_path, batch_date, df, table_name):

    path = create_path(temp_path, batch_date)

    response = save_to_file(df, path, table_name)

    return response

In [38]:
transformer(temp_path, batch_date, df, table_name)

True

In [2]:
# 모듈 형태로 통합

from db.connector import DBconnector
from settings import DB_SETTINGS, TEMP_PATH
from pipeline.extract import extractor
from pipeline.transform import transformer
from datetime import datetime

db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])
table_name = 'lecture'
batch_date = datetime.now().strftime('%Y%m%d')

return_extractor = extractor(db_connector, table_name)

if return_extractor:
    return_transformer = transformer(TEMP_PATH, batch_date, return_extractor, table_name)

접속
종료


  df = pd.read_sql(_query, con)


True

## 5. load.py

    - 저장된 파일을 특정한 저장소에 적재

##### 1) Pandas to_sql() 메소드를 활용한 테이블 적재 ( Local File -> Database )

[PANDAS > to_sql()](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html)

  - [dataframe].to_sql(name='테이블 이름', con='sqlalchemy connection', if_exists='replace')

In [1]:
from sqlalchemy import create_engine

engine = 'postgresql'
user = 'postgres'
passwd = '910506'
host = 'localhost'
port = '5432'
database = 'postgres'

db = create_engine(f'{engine}://{user}:{passwd}@{host}:{port}/{database}')

In [None]:
import pandas as pd

df = pd.read_csv('D:\KDT_Lecture\Lecture\mysql_data\sample_point.csv')

In [None]:
df.to_sql(name='point', con=db, if_exists='replace')
                                # fail : 테이블 존재하면 실패
                                # append : 테이블 존재하면 아래에 row 추가
                                # replace : 테이블 존재하면 명령 내린 테이블 데이터로 교체

In [None]:
# loader 함수 생성

def loader(db_connector, df, table_name):
    
    with db_connector as connected:
        try:
            orm_conn = connected.orm_conn
            df.to_sql(name=table_name, con=orm_conn, if_exists='replace')
            return True
        
        except Exception as e:
            print(f"Error MSG : {e}")
            return False

In [None]:
# 모듈 형태로 통합

from db.connector import DBconnector
from settings import DB_SETTINGS, TEMP_PATH
from pipeline.extract import extractor
from pipeline.transform import transformer
from pipeline.load import loader
from datetime import datetime

db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])
table_name = 'lecture'
batch_date = datetime.now().strftime('%Y%m%d')

return_extractor = extractor(db_connector, table_name)

if return_extractor:
    return_transformer = transformer(TEMP_PATH, batch_date, df, table_name)
    
if return_transformer:
    return_loader = loader(db_connector, df, table_name)

##### 2) AWS python SDK를 활용한 전송 ( Local File -> Cloud Storage )

- 권한 최소한으로 들어있는 임시 API KEY 사용.
- 실습 후 KEY 삭제 예정
  - KEY_ID : 
  - KEY_SECRET : 

> https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/upload_file.html

In [None]:
import boto3

# client 연결 생성
s3_client = boto3.client(
    's3', 
    region_name='ap-northeast-2', 
    aws_access_key_id='', 
    aws_secret_access_key=''
    )

In [None]:
# s3_client.upload_file(파일의 로컬 경로, 버킷, 저장될 S3 경로)

s3_client.upload_file(
    './temporary_storage/sample_point.csv',
    'chunjae-edu-external-datastore',
    'kdt_temp_storage/hyunsoo/kdt_sample_datasets/temp/point.csv'
    )

## 6. remove.py

    - 임시 저장된 파일을 삭제

In [None]:
import shutil, os
from settings import TEMP_DIR

shutil.rmtree(TEMP_DIR)

os.makedirs(TEMP_DIR)

In [None]:
# remover 함수 생성

def remover(path):
    try:
        shutil.rmtree(path)
        os.makedirs(path)
        return True
    
    except Exception as e:
        print(f"Error MSG : {e}")
        return False


In [None]:
# 모듈 형태로 통합

from db.connector import DBconnector
from settings import DB_SETTINGS, TEMP_PATH
from pipeline.extract import extractor
from pipeline.transform import transformer
from pipeline.load import loader
from datetime import datetime

db_connector = DBconnector(**DB_SETTINGS['POSTGRES'])
table_name = 'lecture'
batch_date = datetime.now().strftime('%Y%m%d')

return_extractor = extractor(db_connector, table_name)

if return_extractor:
    return_transformer = transformer(TEMP_PATH, batch_date, df, table_name)
    
if return_transformer:
    return_loader = loader(db_connector, df, table_name)
    
if return_loader:
    remover(TEMP_PATH)