## 1. DBconnector.py & setting.py

In [1]:
from db.connector import DBconnector
from settings import DB_SETTINGS

ModuleNotFoundError: No module named 'psycopg2'

In [2]:
DB_SETTINGS["POSTGRES"]

{'host': '127.0.0.1',
 'database': 'postgres',
 'user': 'postgres',
 'password': '1234',
 'port': '5432'}

In [3]:
from db.connector import DBconnector
from settings import DB_SETTINGS

db_connector = DBconnector(**DB_SETTINGS["POSTGRES"])

with db_connector as connected:
    conn = connected.conn
    cursor = conn.cursor()

    cursor.execute("SELECT * FROM lecture LiMIT 5")
    print(cursor.fetchall())

Enter
[(6, 'Margaret', 1880, 'F', 1578), (7, 'Ida', 1880, 'F', 1472), (8, 'Alice', 1880, 'F', 1414), (9, 'Bertha', 1880, 'F', 1320), (10, 'Sarah', 1880, 'F', 1288)]
Exit


In [4]:
import os

os.getcwd()

'c:\\Users\\user\\Desktop\\Memo\\1028\\천재교육\\DAY2'

In [5]:
import sys

sys.path.append(r"c:\\Users\\user\\Desktop\\Memo\\1028\\천재교육\\DAY2")

## 2. query.py

    - 쿼리들은 파일로 관리하여 쉽게 호출할 수 있도록 작성

### 쿼리 내용 조회하는 부분을 class 내에 통합

In [6]:
import psycopg2
import db.pgsql_query as postgresql_qurey
from settings import DB_SETTINGS

In [7]:
class DBconnector:
    def __init__(self, host, database, user, password, port):
        self.conn_params = dict(
            host=host, dbname=database, user=user, password=password, port=port
        )

        self.connect = self.postgres_connect()
        self.queries = postgresql_qurey.queries

    def __enter__(self):
        print("Enter")
        return self

    def __exit__(self, exe_type, exe_value, traceback):
        self.conn.close()
        print("Exit")

    def postgres_connect(self):
        self.conn = psycopg2.connect(**self.conn_params)
        return self

    def get_query(self, table_name):
        try:
            _query = self.queries[table_name]
            return _query
        except KeyError:
            raise KeyError(
                f"'{table_name}' 키가 queries 에 존재하지 않습니다. 현재 있는 키 리스트 : {list(self.queries.keys)}"
            )

In [8]:
db_connector = DBconnector(**DB_SETTINGS["POSTGRES"])

db_connector.get_query("lecture")

'SELECT * FROM lecture'

In [9]:
from db.pgsql_query import queries

for tbl in queries.keys():
    db_connector = DBconnector(**DB_SETTINGS["POSTGRES"])
    _query = db_connector.get_query(tbl)
    print(_query)

SELECT * FROM lecture
SELECT * FROM tbl LIMIT 5


## extract.py
- 쿼리를 받아 DB에 조회하여 결과를 pandas dataframe으로 변환

In [10]:
from db.connector import DBconnector
from settings import DB_SETTINGS
import pandas as pd

In [15]:
db_connector = DBconnector(**DB_SETTINGS["POSTGRES"])

with db_connector as connected:
    _query = db_connector.get_query("lecture")
    con = connected.conn
    df = pd.read_sql(_query, con)

print(df), print(type(df))

Enter
Exit
    id       name  year gender  count
0    6   Margaret  1880      F   1578
1    7        Ida  1880      F   1472
2    8      Alice  1880      F   1414
3    9     Bertha  1880      F   1320
4   10      Sarah  1880      F   1288
5   11      Annie  1880      F   1258
6   12      Clara  1880      F   1226
7   13       Ella  1880      F   1156
8   14   Florence  1880      F   1063
9   15       Cora  1880      F   1045
10  16     Martha  1880      F   1040
11  17      Laura  1880      F   1012
12  18     Nellie  1880      F    995
13  19      Grace  1880      F    982
14  20     Carrie  1880      F    949
15  21      Maude  1880      F    858
16  22      Mabel  1880      F    808
17  23     Bessie  1880      F    796
18  24     Jennie  1880      F    793
19  25   Gertrude  1880      F    787
20  26      Julia  1880      F    783
21  27     Hattie  1880      F    769
22  28      Edith  1880      F    768
23  29     Mattie  1880      F    704
24  30       Rose  1880      F    700
2

  df = pd.read_sql(_query, con)


(None, None)

In [12]:
def extractor(db_connector, table_name):
    with db_connector as connected:
        try:
            _query = connected.get_query(table_name)
            con = connected.conn
            df = pd.read_sql(_query, con)
            return df
        except Exception as e:
            print(f"Extract MSG: {e}")
            return False

In [13]:
db_connector = DBconnector(**DB_SETTINGS["POSTGRES"])

return_extractor = extractor(db_connector, "lecture")
return_extractor.head()

Enter
Exit


  df = pd.read_sql(_query, con)


Unnamed: 0,id,name,year,gender,count
0,6,Margaret,1880,F,1578
1,7,Ida,1880,F,1472
2,8,Alice,1880,F,1414
3,9,Bertha,1880,F,1320
4,10,Sarah,1880,F,1288


### 4. transform.py

- Batch 날짜별 저장 경로 생성 및 해당 경로 이하에 df 저장
- 이행 환경에 따라 다르게 구성될 수 있음
    - Database -> Stragins Server -> Cloud/Database
    - Database -- Directory Connection -> Cloud/Database

- 목적지 database의 성격에 따라 추가적인 처리 함수가 포함될 수 있음.
    - Data Lake -> 거의 가공 없이 이행
    - Data Warehouse -> 결측치/ 공백 등 간단한 전처리를 거쳐 이행
    - Data Mart -> Group by/filter 등 성격에 맞는 데이터 처리를 거쳐 이행
    

#### 1) 저장 경로 생성
- Database 이름/ table 이름 / yyyy={}/ mm = {}/ dd = {}/ {table_name}.csv

In [6]:
import pandas as pd

from db.connector import DBconnector
from settings import DB_SETTINGS
from pipeline.extract import extractor


db_connector = DBconnector(**DB_SETTINGS["POSTGRES"])
table_name = "lecture"

return_extractor = extractor(db_connector, table_name)
# return_extractor.head()
if isinstance(return_extractor, pd.DataFrame):
    print(return_extractor.head())

else:
    print("데이터를 가져오지 못했습니다.")

Enter
Exit
   id      name  year gender  count
0   6  Margaret  1880      F   1578
1   7       Ida  1880      F   1472
2   8     Alice  1880      F   1414
3   9    Bertha  1880      F   1320
4  10     Sarah  1880      F   1288


  df = pd.read_sql(_query, con)


In [7]:
# Batch 날짜 설정

from datetime import datetime

batch_date = datetime.now()
format_date = batch_date.strftime("%Y%m%d")

_y = format_date[:4]
_m = format_date[4:6]
_d = format_date[6:]

In [8]:
f"{batch_date:%Y}", f"{batch_date:%m}", f"{batch_date:%d}"

('2024', '10', '28')

In [9]:
import os

temp_path = "c:\\Users\\user\\Desktop\\Memo\\1028\\천재교육\\DAY2\\temp_storage"

_path = os.path.join(temp_path, "postgres", "lecture")
_path

'c:\\Users\\user\\Desktop\\Memo\\1028\\천재교육\\DAY2\\temp_storage\\postgres\\lecture'

In [18]:
os.getcwd()

'c:\\Users\\user\\Desktop\\Memo\\1028\\천재교육\\DAY2'

#### 2) pandas dataframe을 csv/parquet 형태로 저장

In [10]:
from datetime import datetime

batch_date = datetime.now().strftime("%Y%m%d")
temp_path = "c:\\Users\\user\\Desktop\\Memo\\1028\\천재교육\\DAY2\\temp_storage"


def create_path(temp_path, batch_date):
    _y = format_date[:4]
    _m = format_date[4:6]
    _d = format_date[6:]

    return _path

In [12]:
# 저장 폴더 생성

path = create_path(temp_path, batch_date)
os.makedirs(path, mode=777, exist_ok=True)

In [18]:
# CSV format
save_path = os.path.join(path, "lecture.csv")
save_path

df.to_csv(save_path)

In [29]:
# JSON format
save_path = os.path.join(path, "lecture.json")
save_path

df.to_json(save_path, orient="records", indent=4, force_ascii=False)

In [20]:
!pip install pyarrow



In [21]:
# parguet format

save_path = os.path.join(path, "lecture.parquet")
save_path

df.to_parquet(save_path, engine="pyarrow", compression="gzip", index=False)

ImportError: Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.

In [23]:
def save_to_file(df, path, table_name):
    if len(df) > 0:
        os.makedirs(path, mode=777)
        save_path = os.path.join(path, f"{table_name}.csv")

        df.to_csv(save_path)
        return True

    else:
        print("EMPTY FILE")
        return False

In [24]:
save_to_file(df, path, table_name)

True

#### 저장 경로 생성 + DataFrame 저장 함수 통합

In [26]:
# transformer(create_path + save_to_file) 함수


def transformer(temp_path, batch_date, df, table_name):
    path = create_path(temp_path, batch_date)
    res = save_to_file(df, path, table_name)

    return res

In [27]:
transformer(temp_path, batch_date, df, table_name)

True

In [1]:
from db.connector import DBconnector
from settings import DB_SETTINGS, TEMP_PATH
from pipeline.extract import extractor
from pipeline.transform import transformer
from datetime import datetime

In [2]:
db_connector = DBconnector(**DB_SETTINGS["POSTGRES"])
table_name = "lecture"
batch_date = datetime.now().strftime("%Y%m%d")
# print(batch_date)

return_extractor = extractor(db_connector, table_name)
return_extractor

if return_extractor is not None and not return_extractor.empty:
    retrun_transformer = transformer(
        TEMP_PATH, batch_date, return_extractor, table_name
    )

else:
    print("DataFrame이 비었거나 데이터추출에 실패했습니다.")

Enter
Exit


  df = pd.read_sql(_query, con)


### 5. load.py
- 저장된 파일을 특정한 저장소에 적재

#### 1) pandas to_sql() 메소드를 활용한 테이블 적재(Local File -> Database)


In [2]:
from sqlalchemy import create_engine

engine = "postgresql"
user = "postgres"
password = "1234"
host = "127.0.0.1"
port = "5432"
database = "postgres"

db = create_engine(f"{engine}://{user}:{password}@{host}:{port}/{database}")

db

Engine(postgresql://postgres:***@127.0.0.1:5432/postgres)

In [3]:
import pandas as pd

df = pd.read_csv("./dataset/data-01/names.csv")
df

Unnamed: 0,id,name,year,gender,count
0,1,Mary,1880,F,7065
1,2,Anna,1880,F,2604
2,3,Emma,1880,F,2003
3,4,Elizabeth,1880,F,1939
4,5,Minnie,1880,F,1746
...,...,...,...,...,...
1995,1996,Woodie,1880,M,5
1996,1997,Worthy,1880,M,5
1997,1998,Wright,1880,M,5
1998,1999,York,1880,M,5


In [6]:
df.dtypes

id         int64
name      object
year       int64
gender    object
count      int64
dtype: object

In [4]:
df.to_sql(name="point", con=db, if_exists="replace")

1000

In [5]:
def loader(db_connector, db, table_name):
    with db_connector as connected:
        try:
            orm_conn = connected.orm_conn
            df.to_sql(name=table_name, con=orm_conn, if_exists="replace")
            return True

        except Exception as e:
            print(f"loader Error MSG: {e}")
            return False

In [16]:
!pip install --upgrade pandas sqlalchemy

Collecting pandas
  Downloading pandas-2.2.3-cp312-cp312-win_amd64.whl.metadata (19 kB)
Collecting sqlalchemy
  Downloading SQLAlchemy-2.0.36-cp312-cp312-win_amd64.whl.metadata (9.9 kB)
Downloading pandas-2.2.3-cp312-cp312-win_amd64.whl (11.5 MB)
   ---------------------------------------- 0.0/11.5 MB ? eta -:--:--
   ---------------------------------------- 0.1/11.5 MB 1.6 MB/s eta 0:00:08
   ------ --------------------------------- 1.9/11.5 MB 24.3 MB/s eta 0:00:01
   ---------------------- ----------------- 6.5/11.5 MB 52.3 MB/s eta 0:00:01
   -------------------------------------- - 11.1/11.5 MB 108.8 MB/s eta 0:00:01
   ---------------------------------------- 11.5/11.5 MB 81.8 MB/s eta 0:00:00
Downloading SQLAlchemy-2.0.36-cp312-cp312-win_amd64.whl (2.1 MB)
   ---------------------------------------- 0.0/2.1 MB ? eta -:--:--
   ---------------------------------------- 2.1/2.1 MB 64.9 MB/s eta 0:00:00
Installing collected packages: sqlalchemy, pandas
  Attempting uninstall: sqlalc

  You can safely remove it manually.
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dataset 1.6.2 requires sqlalchemy<2.0.0,>=1.3.2, but you have sqlalchemy 2.0.36 which is incompatible.


In [1]:
import pandas as pd
from db.connector import DBconnector
from settings import DB_SETTINGS, TEMP_PATH
from pipeline.extract import extractor
from pipeline.transform import transformer
from pipeline.load import loader
from datetime import datetime

In [3]:
db_connector = DBconnector(**DB_SETTINGS["POSTGRES"])
table_name = "lecture"
batch_date = datetime.now().strftime("%Y%m%d")

return_extractor = extractor(db_connector, table_name)
# return_extractor

if return_extractor is not None and not return_extractor.empty:
    return_transformer = transformer(
        TEMP_PATH, batch_date, return_extractor, table_name
    )
# return_transformer

if return_extractor is not None and not return_extractor.empty:
    return_loader = loader(db_connector, return_transformer, table_name)

Enter
Exit
Enter
loader Error MSG: 'bool' object has no attribute 'to_sql'
Exit


In [5]:
import shutil, os
from settings import TEMP_PATH

shutil.rmtree(TEMP_PATH)

os.makedirs(TEMP_PATH)

In [None]:
# remover


def remover(path):
    try:
        shutil.rmtree(path)
        os.makedirs(path)
        return True
    except Exception as e:
        print(f"Remover Error MSG: {e}")
        return False

In [1]:
import pandas as pd
from db.connector import DBconnector
from settings import DB_SETTINGS, TEMP_PATH
from pipeline.extract import extractor
from pipeline.transform import transformer
from pipeline.load import loader
from pipeline.remove import remover
from datetime import datetime

In [4]:
db_connector = DBconnector(**DB_SETTINGS["POSTGRES"])
table_name = "lecture"
batch_date = datetime.now().strftime("%Y%m%d")

return_extractor = extractor(db_connector, table_name)

if return_extractor is not None and not return_extractor.empty:
    return_transformer = transformer(
        TEMP_PATH, batch_date, return_extractor, table_name
    )

if return_transformer is not None and not return_transformer.empty:
    return_loader = loader(db_connector, return_transformer, table_name)

Enter
Exit
Enter
Exit


In [3]:
remover(TEMP_PATH)

True

### 6. Controller
- extractor, transformer 등 개별 모듈들에 대하여 순서대로 명령을 내려주는 파일

In [40]:
def controller():
    """
    1. DBconnector >> DB Connector 생성
    2. postgresql_query >> queries 에서 테이블 이름 목록(table_list) 받아오기
        ex)
            for tbl in table_list:

    3. extract >> DB 조회 후 DataFrame 형태로 변환
    4. transform >> 저장 경로 생성 후 임시 저장 디렉토리 아래에 dataframe 저장
    5. load >> 저장소에 dataframe 파일 저장
    6. remove >> 저장이 끝난 후 임시 저장 디렉토리 삭제
    """

In [41]:
from db.connector import DBconnector
from db.pgsql_query import queries
from settings import DB_SETTINGS, TEMP_PATH
from pipeline.extract import extractor
from pipeline.transform import transformer
from pipeline.load import loader
from pipeline.remove import remover

---

### 파이썬 팁

#### 클래스 예제

In [8]:
aa = {"aa": 1, "bb": 2}
list(aa.keys())

['aa', 'bb']

In [9]:
queries = __import__("db.pgsql_query", fromlist=[""])
queries.queries

{'lecture': 'SELECT * FROM lecture', 'tbl': 'SELECT * FROM tbl LIMIT 5'}

In [12]:
class cargo:
    def __init__(self, capacity):
        self.cargo = []
        self.capacity = capacity

    def unload(self, port):
        port_list = [p[0] for p in self.cargo]
        if port in port_list:
            unloaded = [i for i in self.cargo if i[0] == port]
            return unloaded
        else:
            return

    def can_depart(self):
        _depart = True if sum([i[1] for i in self.cargo]) < -self.capacity else False
        return _depart

    def load(self, new_cargo):
        self.cargo: list = new_cargo
        pass


if __name__ == "__main__":
    ship = cargo(20)
    ship.load([("NewYork", 1), ("London", 20)])
    print(ship.unload("NewYork"))
    print(ship.cargo)
    print(ship.can_depart())

[('NewYork', 1)]
[('NewYork', 1), ('London', 20)]
False


#### json 가공

In [18]:
import json

aa = [
    {"name": "eggs", "price": 1},
    {"name": "coffee", "price": 9.99},
    {"name": "rice", "price": 4.04},
]
aa

sorted_items = sorted(aa, key=lambda x: (x["price"], x["name"]))
joined_json = ",".join([json.dumps(i) for i in sorted_items])
# joined_json

ret = ",".join([json.dumps(i) for i in sorted_items])
ret

'{"name": "eggs", "price": 1},{"name": "rice", "price": 4.04},{"name": "coffee", "price": 9.99}'

In [17]:
aa

[{'name': 'eggs', 'price': 1},
 {'name': 'coffee', 'price': 9.99},
 {'name': 'rice', 'price': 4.04}]

In [27]:
import json


def sort_by_price_ascending(json_string):
    json_string = eval(json_string)
    sorted_json = sorted(json_string, key=lambda x: (x["price"], x["name"]))
    joined_json = ",".join([json.dumps(i) for i in sorted_json])
    print(str(joined_json))

    final = "[" + joined_json + "]"

    return final.replace(" ", "")

In [28]:
pp = sort_by_price_ascending(
    '[{"name": "eggs","price":1},{"name": "coffee","price":9.99},{"name": "rice","price":4.04}]'
)
pp

{"name": "eggs", "price": 1},{"name": "rice", "price": 4.04},{"name": "coffee", "price": 9.99}


'[{"name":"eggs","price":1},{"name":"rice","price":4.04},{"name":"coffee","price":9.99}]'

In [36]:
import re
from datetime import datetime


def transform_date_format(dates):
    date = [d for d in dates if "/" in d or "-" in d]
    date_list = []
    for date_str in dates:
        if re.match(r"\d{4}/(0[1-9]|1[0-2])/(0[1-9]|1[0-9]|2[0-9]|3[0-1])", date_str):
            print(date_str)
            transformed_date = datetime.strptime(date_str, "%Y/%m/%d").strftime(
                "%Y%m%d"
            )
            date_list.append(transformed_date)
        elif re.match(r"(0[1-9]|1[0-9]|2[0-9]|3[0-1])/(0[1-9]|1[0-2])/\d{4}", date_str):
            print(date_str)
            transformed_date = datetime.strptime(date_str, "%d/%m/%Y").strftime(
                "%Y%m%d"
            )
            date_list.append(transformed_date)
        elif re.match(r"(0[1-9]|1[0-9]|2[0-9]|3[0-1])/(0[1-9]|1[0-2])/\d{4}", date_str):
            print(date_str)
            transformed_date = datetime.strptime(date_str, "%m-%d-%Y").strftime(
                "%Y%m%d"
            )
            date_list.append(transformed_date)
    return date_list


if __name__ == "__main__":
    dates = transform_date_format(
        ["2010/02/20", "09/01/1994", "10-09-1996", "20210221"]
    )
    print(*dates, sep="\n")


# dates = ['2010/02/20', '09/01/1994', '10-09-1996', '20210221']
# sformed_dates = transform_date_format(dates)
# print(sformed_dates)

2010/02/20
09/01/1994
20100220
19940109


In [None]:
import re
from datetime import datetime


def transform_date_format(dates):
    dates = [d for d in dates if "/" in d or "-" in d]
    dates = [d for d in dates if len(d) == 10]
    date_list = []

    for date_str in dates:
        if re.match(r"\d{4}/(0[1-9]|1[0-2])/(0[1-9]|1[0-9]|2[0-9]|3[0-1])", date_str):
            date_list.append(datetime.strptime(date_str, "%d/%m/%Y").strftime("%Y%m%d"))
        elif re.match(r"(0[1-9]|1[0-9]|2[0-9]|3[0-1])/(0[1-9]|1[0-2])/\d{4}", date_str):
            date_list.append(datetime.strptime(date_str, "%d/%m/%Y").strftime("%Y%m%d"))
        elif re.match(r"(0[1-9]|1[0-9]|2[0-9]|3[0-1])/(0[1-9]|1[0-2])/\d{4}", date_str):
            date_list.append(datetime.strptime(date_str, "%d/%m/%Y").strftime("%Y%m%d"))
        else:
            pass

    return date_list


if __name__ == "__main__":
    dates = transform_date_format(
        ["2010/02/20", "09/01/1994", "10-09-1996", "20210221"]
    )
    print(*dates, sep="\n")

In [None]:
import re
from datetime import datetime


def transform_date_format(dates):
    dates = [d for d in dates if "/" in d or "-" in d]
    dates = [d for d in dates if len(d) == 10]
    date_list = []

    for date_str in dates:
        if re.match(r"\d{4}/(0[1-9]|1[0-2])/(0[1-9]|1[0-9]|2[0-9]|3[0-1])", date_str):
            transformed_date = datetime.strptime(date_str, "%Y/%m/%d").strftime(
                "%Y/%m/%d"
            )
        elif re.match(r"(0[1-9]|1[0-9]|2[0-9]|3[0-1])/(0[1-9]|1[0-2])/\d{4}", date_str):

            transformed_date = datetime.strptime(date_str, "%d/%m/%Y").strftime(
                "%Y/%m/%d"
            )
        elif re.match(r"(0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-9]|3[0-1])-\d{4}", date_str):
            transformed_date = datetime.strptime(date_str, "%m/%d/%Y").strftime(
                "%Y/%m/%d"
            )

        date_list.append(transformed_date)


if __name__ == "__main__":
    dates = transform_date_format(
        ["2010/02/20", "09/01/1994", "10-09-1996", "20210221"]
    )
    print(*dates, sep="\n")

In [37]:
import re
from datetime import datetime


def transform_date_format(dates):
    dates = [d for d in dates if "/" in d or "-" in d]
    dates = [d for d in dates if len(d) == 10]
    date_list = []
    for date_str in dates:
        if re.match(r"\d{4}/(0[1-9]|1[0-2])/(0[1-9]|1[0-9]|2[0-9]|3[0-1])", date_str):
            transformed_date = datetime.strptime(date_str, "%Y/%m/%d").strftime(
                "%Y%m%d"
            )
            date_list.append(transformed_date)
        elif re.match(r"(0[1-9]|1[0-9]|2[0-9]|3[0-1])/(0[1-9]|1[0-2])/\d{4}", date_str):
            transformed_date = datetime.strptime(date_str, "%d/%m/%Y").strftime(
                "%Y%m%d"
            )
            date_list.append(transformed_date)
        elif re.match(r"(0[1-9]|1[0-2])-(0[1-9]|1[0-9]|2[0-9]|3[0-1])-\d{4}", date_str):
            transformed_date = datetime.strptime(date_str, "%d-%m-%Y").strftime(
                "%Y%m%d"
            )
            date_list.append(transformed_date)
        else:
            pass
    return date_list