### 저장 데이터 네이밍

- 원천 데이터 : `user_data.parquet`
- MySQl 테이블 : `user_data`
- Postgres 테이블 : `user_data_summary`

### 테이블 형상

#### user_data
<img src="https://velog.velcdn.com/images/newnew_daddy/post/84ddff4a-943a-4a8a-b0cf-e18f01b07f31/image.png" width="70%">

#### user_data_summary
<img src="https://velog.velcdn.com/images/newnew_daddy/post/7868a1bd-0857-40bc-808b-d9f160c67768/image.png" width="50%">

#### 1. DB 연결 정보 import

In [1]:
from settings import DB_SETTINGS
from db.connector import DBconnector

In [10]:
DBconnector(**DB_SETTINGS['mysql_params']).__dict__

{'engine_name': 'mysql+pymysql',
 'user': 'codeit',
 'password': 'sprint',
 'host': 'localhost',
 'port': '3300',
 'database': 'docker_mysql',
 'pymysql_conn': <pymysql.connections.Connection at 0x2b252cf7f20>,
 'sql_conn': Engine(mysql+pymysql://codeit:***@localhost:3300/docker_mysql)}

#### 2. source.py 모듈 개발

1) user_data.parquet 파일을 dataframe으로 변환
2) dataframe에서 특정 날짜값으로 필터링
3) 필터링된 dataframe을 MySQL 데이터베이스에 저장 -> to_sql() 사용

In [3]:
## 1
import pandas as pd

df = pd.read_parquet('source_data/user_data.parquet')

## 2
batch_date = '2025-03-02'

batch_df = df[df['credate'] == batch_date]

## 3

mysql_conn = DBconnector(**DB_SETTINGS['mysql_params']).sql_conn

batch_df.to_sql(
    name='user_data',
    con=mysql_conn, 
    if_exists='append',
    index=False
)

76

In [None]:
## source 함수 개발
"""
1. 로깅!
2. 예외 처리! (try - except)
"""
from loguru import logger

def source_to_db(db_conn, source_path, file_name, batch_date):
    """
    source data에 저장된 parquet 파일을 추출하여 MySQL에 저장하는 함수
    """
    try:
        logger.info(f"SOURCE 단계 시작! : {file_name} 파일 데이터를 MYSQL에 저장")
        
        df = pd.read_parquet(f'{source_path}/{file_name}.parquet')

        batch_df = df[df['credate'] == batch_date]

        batch_df.to_sql(
            name=file_name,
            con=db_conn, 
            if_exists='append',
            index=False
        )
        
        logger.success(f"SOURCE 작업 완료 : {batch_df.shape} 형상의 데이터 MYSQL에 저장")
        
        return True
        
    except Exception as e:
        logger.error(f"SOURCE 단계 오류 발생! : {e}")
        return False

In [11]:
## source 함수 실행

from pipeline.source import source_to_db

mysql_conn = DBconnector(**DB_SETTINGS['mysql_params']).sql_conn
source_path = 'source_data'
file_name = 'user_data'
batch_date = '2025-03-03'

source_to_db(
    db_conn=mysql_conn,
    source_path=source_path,
    file_name=file_name,
    batch_date=batch_date
    )

[32m2025-07-22 13:38:15.615[0m | [1mINFO    [0m | [36mpipeline.source[0m:[36msource_to_db[0m:[36m9[0m - [1mSOURCE 단계 시작! : user_data 파일 데이터를 MYSQL에 저장[0m
[32m2025-07-22 13:38:15.891[0m | [32m[1mSUCCESS [0m | [36mpipeline.source[0m:[36msource_to_db[0m:[36m22[0m - [32m[1mSOURCE 작업 완료 : (85, 9) 형상의 데이터 MYSQL에 저장[0m


True

#### 2. extract.py 모듈 개발

1) MySQL에 저장된 user_data 테이블을 dataframe으로 불러오기
2) 불러온 dataframe을 return!

In [None]:
def extractor(db_conn, table_name):
    """
    MySQL 데이터베이스에서 데이터를 추출하여 dataframe으로 변환하는 함수
    """
    try:
        logger.info(f"EXTRACT 단계 시작! : MYSQL에서 {table_name} 데이터 추출")
        df = pd.read_sql_table(
            table_name = table_name,
            con = db_conn
        )
        logger.success(f"EXTRACT 작업 완료 : {df.shape} 형상의 데이터 추출")
        
        return df
    
    except Exception as e:
        logger.error(f"EXTRACT 단계 오류 발생! : {e}")
        
        return False

In [16]:
from pipeline.extract import extractor

mysql_conn = DBconnector(**DB_SETTINGS['mysql_params']).sql_conn
table_name = 'user_data'

df = extractor(
    db_conn=mysql_conn,
    table_name=table_name
)

[32m2025-07-22 14:18:56.259[0m | [1mINFO    [0m | [36mpipeline.extract[0m:[36mextractor[0m:[36m9[0m - [1mEXTRACT 단계 시작! : MYSQL에서 user_data 데이터 추출[0m
[32m2025-07-22 14:18:56.833[0m | [32m[1mSUCCESS [0m | [36mpipeline.extract[0m:[36mextractor[0m:[36m14[0m - [32m[1mEXTRACT 작업 완료 : (219, 9) 형상의 데이터 추출[0m


In [15]:
help(extractor)

Help on function extractor in module pipeline.extract:

extractor(db_conn, table_name)
    MySQL 데이터베이스에서 데이터를 추출하여 dataframe으로 변환하는 함수



#### 3. transform.py 모듈 개발

In [14]:
from loguru import logger

def transformer(pandas_df):
    """
    MySQL에서 추출한 dataframe을 가공하는 함수
    """
    current_year = 2025

    try:
        logger.info(f"TRANSFORM 단계 시작! : MYSQL {table_name} 데이터 변환")
        # 도시 이름 컬럼 생성
        pandas_df["city"] = pandas_df["residence"].str.split().str[0]

        # 출생년도 컬럼 생성
        pandas_df['birthdate'] = pandas_df['birthdate'].astype(str)
        pandas_df["birthyear"] = pandas_df["birthdate"].str.slice(0, 4)

        # 혈액형 컬럼 생성
        pandas_df["blood"] = pandas_df["blood_group"].str.slice(0, -1)

        # 나이 컬럼 생성
        pandas_df["age"] = current_year - pandas_df["birthyear"].astype(int)

        # 나이가 0 이하인 데이터 제거
        pandas_df = pandas_df[pandas_df["age"] > 0].copy()

        # 나이대 컬럼 생성
        pandas_df["age_category"] = pandas_df["age"].apply(categorize_age)
        
        # 컬럼 순서 세팅
        df = pandas_df[["name", 'job', "sex", "city", "birthyear", "age", "blood", "age_category"]]
        
        logger.success(f"TRANSFORM 작업 완료 : {df.shape} 형상의 데이터 변환")
        return df

    except Exception as e:
        logger.error(f"TRANSFORM 단계 오류 발생! : {e}")
        
        return False


def categorize_age(age):
    if age >= 100:
        return "90대 이상"
    else:
        return str(age // 10 * 10) + "대"

In [17]:
from pipeline.transform import transformer

tdf = transformer(df)

tdf

[32m2025-07-22 14:20:22.486[0m | [1mINFO    [0m | [36mpipeline.transform[0m:[36mtransformer[0m:[36m10[0m - [1mTRANSFORM 단계 시작! : MYSQL 데이터 변환[0m
[32m2025-07-22 14:20:23.396[0m | [32m[1mSUCCESS [0m | [36mpipeline.transform[0m:[36mtransformer[0m:[36m33[0m - [32m[1mTRANSFORM 작업 완료 : (218, 8) 형상의 데이터 변환[0m


Unnamed: 0,name,job,sex,city,birthyear,age,blood,age_category
0,김중수,임상병리사,M,충청남도,1977,48,B,40대
1,서예진,청소원,F,대전광역시,1922,103,B,90대 이상
2,조서연,신발제조기 조작원 및 조립원,F,경기도,1913,112,O,90대 이상
3,윤성수,유치원 교사,M,인천광역시,1999,26,O,20대
4,김정훈,법률/경찰/소방 및 교도 관리자,M,세종특별자치시,1994,31,A,30대
...,...,...,...,...,...,...,...,...
214,이현준,기타 전기/전자기기 설치 및 수리원,M,광주광역시,2006,19,A,10대
215,박민서,섬유공학 기술자 및 연구원,F,대구광역시,2008,17,AB,10대
216,노지현,철도 및 전동차 기관사,F,대전광역시,1950,75,O,70대
217,송지아,건설 및 광업기계 설치 및 정비원,F,제주특별자치도,1967,58,O,50대


#### 4. load.py 모듈 개발

1) transform에서 가공된 dataframe을 받음
2) dataframe을 postgresql 데이터베이스에 'user_data_summary' 테이블로 저장!

In [None]:
def loader(pandas_df, db_conn, table_name):
    """
    가공된 dataframe을 PostgreSQl 데이터베이스에 저장하는 함수
    """
    try:
        logger.info(f"LOAD 단계 시작! : Postgres 데이터베이스의 {table_name} 테이블로 저장")
        
        pandas_df.to_sql(
            name = table_name,
            con = db_conn,
            if_exists='append',
            index=False
        )
        
        logger.success(f"LOAD 작업 완료 : {pandas_df.shape} 형상의 데이터 저장 완료")
        return True

    except Exception as e:
        logger.error(f"LOAD 단계 오류 발생! : {e}")
        
        return False

In [18]:
from pipeline.load import loader

postgres_conn = DBconnector(**DB_SETTINGS['postgres_params']).sql_conn

loader(
    pandas_df = tdf,
    db_conn = postgres_conn,
    table_name = 'user_data_summary'
)

[32m2025-07-22 14:36:26.279[0m | [1mINFO    [0m | [36mpipeline.load[0m:[36mloader[0m:[36m8[0m - [1mLOAD 단계 시작! : Postgres 데이터베이스의 user_data_summary 테이블로 저장[0m
[32m2025-07-22 14:36:27.214[0m | [32m[1mSUCCESS [0m | [36mpipeline.load[0m:[36mloader[0m:[36m17[0m - [32m[1mLOAD 작업 완료 : (218, 8) 형상의 데이터 저장 완료[0m


True

#### 5. controller.py 모듈 개발

In [None]:
from settings import DB_SETTINGS
from db.connector import DBconnector
from pipeline.source import source_to_db
from pipeline.extract import extractor
from pipeline.transform import transformer
from pipeline.load import loader
from loguru import logger

def run_pipeline(batch_date):
    
    logger.info("="*50)
    logger.info(f"{batch_date} 일자 데이터에 대한 파이프라인 시작")
    logger.info("="*50)

    ## 1. 관련 변수 선언 & 데이터베이스 연결
    mysql_conn = DBconnector(**DB_SETTINGS['mysql_params']).sql_conn
    postgres_conn = DBconnector(**DB_SETTINGS['postgres_params']).sql_conn
    source_path = 'source_data'
    file_name = 'user_data'

    ## 2. SOURCE
    source_res = source_to_db(
        db_conn=mysql_conn,
        source_path=source_path,
        file_name=file_name,
        batch_date=batch_date
        )
    if not source_res:
        return False

    ## 3. EXTRACT
    table_name = 'user_data'
    
    df = extractor(
        db_conn=mysql_conn,
        table_name=table_name
    )

    ## 4. TRANSFORM
    tdf = transformer(df)

    ## 5. LOAD
    load_res = loader(
        pandas_df = tdf,
        db_conn = postgres_conn,
        table_name = 'user_data_summary'
    )
    if not load_res:
        return False
    
    logger.success("="*50)
    logger.success(f"파이프라인 작동 완료!")
    logger.success("="*50)

In [19]:
from controller import run_pipeline

batch_date = '2025-03-04'

run_pipeline(batch_date)

[32m2025-07-22 15:15:38.043[0m | [1mINFO    [0m | [36mcontroller[0m:[36mrun_pipeline[0m:[36m12[0m - [1m2025-03-04 일자 데이터에 대한 파이프라인 시작[0m
[32m2025-07-22 15:15:39.790[0m | [1mINFO    [0m | [36mpipeline.source[0m:[36msource_to_db[0m:[36m9[0m - [1mSOURCE 단계 시작! : user_data 파일 데이터를 MYSQL에 저장[0m
[32m2025-07-22 15:15:43.982[0m | [32m[1mSUCCESS [0m | [36mpipeline.source[0m:[36msource_to_db[0m:[36m22[0m - [32m[1mSOURCE 작업 완료 : (91, 9) 형상의 데이터 MYSQL에 저장[0m
[32m2025-07-22 15:15:43.983[0m | [1mINFO    [0m | [36mpipeline.extract[0m:[36mextractor[0m:[36m9[0m - [1mEXTRACT 단계 시작! : MYSQL에서 user_data 데이터 추출[0m
[32m2025-07-22 15:15:44.572[0m | [32m[1mSUCCESS [0m | [36mpipeline.extract[0m:[36mextractor[0m:[36m14[0m - [32m[1mEXTRACT 작업 완료 : (310, 9) 형상의 데이터 추출[0m
[32m2025-07-22 15:15:44.573[0m | [1mINFO    [0m | [36mpipeline.transform[0m:[36mtransformer[0m:[36m10[0m - [1mTRANSFORM 단계 시작! : MYSQL 데이터 변환[0m
[32m2025-07-22 15:15:44.

#### 6. start.py 모듈 개발

1. controller에 batch_date 넘겨주기
2. log 파일로 저장되도록 설정!

In [None]:
from controller import run_pipeline
from loguru import logger

logger.add(
        "log_file/pipeline.log", # 로그 파일 저장 경로
        level="DEBUG", # DEBUG 이상 수준의 로그를 모두 기록
    )

def main(batch_date):
    
    run_pipeline(batch_date)