## 의약품 데이터 수집

의약품 데이터를 식약처 오픈 API(공공데이터포털)로 부터 수집한 뒤 csv로 저장한다.

### 라이브러리 import 및 환경설정

In [None]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

In [None]:
from IPython.display import display
from tqdm.notebook import tqdm, trange
from pathlib import Path
import pandas as pd
import numpy as np
import urllib.request
import json
import os

#### API 요청 및 응답 정보

    api_dat: {
        "서비스명": {
            "오퍼레이션명: {
                "base_url": str,
                "total_count": int,
                "dataframe": pandas.Dataframe,
            },
            ...
        },
        ...
    }

In [None]:

api_dat = {
    "의약품허가정보": {
        "허가목록": {
            "base_url": "http://apis.data.go.kr/1471000/DrugPrdtPrmsnInfoService04" \
                    + "/getDrugPrdtPrmsnInq04",
            "timeout": 3,
            "dataframe": None,
        },
        "허가상세정보": {
            "base_url": "http://apis.data.go.kr/1471000/DrugPrdtPrmsnInfoService04" \
                    + "/getDrugPrdtPrmsnDtlInq03",
            "timeout": 12,
            "dataframe": None,
        },
        "주성분상세정보": {
            "base_url": "http://apis.data.go.kr/1471000/DrugPrdtPrmsnInfoService04" \
                    + "/getDrugPrdtMcpnDtlInq03",
            "timeout": 2.5,
            "dataframe": None,
        },
    },
    "낱알식별정보": {
        "낱알식별정보": {
            "base_url": "http://apis.data.go.kr/1471000/MdcinGrnIdntfcInfoService01" \
                    + "/getMdcinGrnIdntfcInfoList01",
            "timeout": 3,
            "dataframe": None,
        },
    },
    "DUR성분정보": {
        "병용금기": {
            "base_url": "http://apis.data.go.kr/1471000/DURIrdntInfoService02" \
                    + "/getUsjntTabooInfoList01",
            "timeout": 2.5,
            "dataframe": None,
        },
        "특정연령대금기": {
            "base_url": "http://apis.data.go.kr/1471000/DURIrdntInfoService02" \
                    + "/getSpcifyAgrdeTabooInfoList01",
            "timeout": 2.5,
            "dataframe": None,
        },
        "임부금기": {
            "base_url": "http://apis.data.go.kr/1471000/DURIrdntInfoService02" \
                    + "/getPwnmTabooInfoList01",
            "timeout": 2.5,
            "dataframe": None,
        },
        "용량주의": {
            "base_url": "http://apis.data.go.kr/1471000/DURIrdntInfoService02" \
                    + "/getCpctyAtentInfoList01",
            "timeout": 2.5,
            "dataframe": None,
        },
        "투여기간주의": {
            "base_url": "http://apis.data.go.kr/1471000/DURIrdntInfoService02" \
                    + "/getMdctnPdAtentInfoList01",
            "timeout": 2.5,
            "dataframe": None,
        },
        "효능군중복": {
            "base_url": "http://apis.data.go.kr/1471000/DURIrdntInfoService02" \
                    + "/getEfcyDplctInfoList01",
            "timeout": 2.5,
            "dataframe": None,
        },
        "노인주의": {
            "base_url": "http://apis.data.go.kr/1471000/DURIrdntInfoService02" \
                    + "/getOdsnAtentInfoList01",
            "timeout": 2.5,
            "dataframe": None,
        },
    },
    "DUR품목정보": {
        "병용금기": {
            "base_url": "http://apis.data.go.kr/1471000/DURPrdlstInfoService02" \
                    + "/getUsjntTabooInfoList02",
            "timeout": 3,
            "dataframe": None,
        },
        "특정연령대금기": {
            "base_url": "http://apis.data.go.kr/1471000/DURPrdlstInfoService02" \
                    + "/getSpcifyAgrdeTabooInfoList2",
            "timeout": 2.5,
            "dataframe": None,
        },
        "임부금기": {
            "base_url": "http://apis.data.go.kr/1471000/DURPrdlstInfoService02" \
                    + "/getPwnmTabooInfoList2",
            "timeout": 2.5,
            "dataframe": None,
        },
        "용량주의": {
            "base_url": "http://apis.data.go.kr/1471000/DURPrdlstInfoService02" \
                    + "/getCpctyAtentInfoList2",
            "timeout": 2.5,
            "dataframe": None,
        },
        "투여기간주의": {
            "base_url": "http://apis.data.go.kr/1471000/DURPrdlstInfoService02" \
                    + "/getMdctnPdAtentInfoList2",
            "timeout": 2.5,
            "dataframe": None,
        },
        "효능군중복": {
            "base_url": "http://apis.data.go.kr/1471000/DURPrdlstInfoService02" \
                    + "/getEfcyDplctInfoList2",
            "timeout": 2.5,
            "dataframe": None,
        },
        "서방정분할주의": {
            "base_url": "http://apis.data.go.kr/1471000/DURPrdlstInfoService02" \
                    + "/getSeobangjeongPartitnAtentInfoList2",
            "timeout": 2.5,
            "dataframe": None,
        },
        "노인주의": {
            "base_url": "http://apis.data.go.kr/1471000/DURPrdlstInfoService02" \
                    + "/getOdsnAtentInfoList2",
            "timeout": 2.5,
            "dataframe": None,
        },
        "DUR품목정보": {
            "base_url": "http://apis.data.go.kr/1471000/DURPrdlstInfoService02" \
                    + "/getDurPrdlstInfoList2",
            "timeout": 2.5,
            "dataframe": None,
        },
    },
}

#### 요청 URL 생성 메소드 정의

    [공공데이터 포털 오픈 API에 대하여]

    공공데이터포털의 오픈 API의 경우, 검색 필터링에 사용되는 파라미터가 없다면
    가능한 모든 결과를 '페이징'하여 반환한다. 즉, 어떤 오픈 API 서비스의 특정 오퍼레이션의
    전체 레코드 수가 12,345개 이고 주어진 num_of_rows 파라미터가 100이면
        - 전체 페이지 수는 124개이며
        - 마지막 페이지(124페이지)의 결과 레코드 수는 45개이다.

    한편, 식약처 오픈 API의 응답 형식은 다음과 같다.  

    {
        Header: {
            resultMsg: _,
            resultCode: _,
        },
        body: {
            numOfRows: _,
            pageNo: _,
            total_count: _m
            
            items: [item: {}, item: {}, ... item: {}]
            혹은
            items: [{}, {}, ..., {}]

            => items 리스트 요소가 item 으로 레이블링 되어 있냐의 여부를 제외하면 동일한 형식을 가진다.

        }
    }
    
    간혹 서버에서 오류가 발생한 경우, 요청 파라미터 상의 응답 형식이 json으로 설정되어 있음에도 불구하고
    에러 메시지가 xml 형식으로 반환되기도 한다. 만약 에러의 유형에 따라 다른 예외처리를 해 주어야 하는 경우
    이것이 문제가 될 수 있으니 주의해야한다.

In [None]:
def render_url(base_url: str, num_of_rows: int, page_no: int, response_type: str) -> str:
    """ 파라미터가 적용된 요청 가능한 url 문자열을 만들어 반환한다.\n
    식약처 오픈 API는 모든 검색 결과를 한 번에 반환하지 않고 페이징하므로, 이에 필요한 정보가 파라미터로 주어져야한다.\n
\n
    Args:\n
        base_url        : 베이스 url\n
        num_of_rows     : 한 페이지에 포함될 최대 레코드 수\n
        page_no         : 조회할 페이지 번호\n
        response_type   : 응답 형식("json" 또는 "xml")\n
\n
    Returns:\n
        요청 가능한 url 주소 문자열\n
    """

    #공공데이터포털 개인인증키
    service_key = r"ohlypaQx%2FWBhHdAlLzqu%2B3lcYq0yf8yFFCNO%2BFSew%2FU6AiJUyn9%2Fp0IZ5QskCJbEuJRhsHX242w%2F3GfR5KLt5Q%3D%3D"
    
    if num_of_rows < 1 or 100 < num_of_rows:
        raise Exception("파라미터 `num_of_rows`의 값은 1이상 100이하의 정수여야 합니다.")
    if response_type != "json":
        raise Exception("파라미터 `response_type`의 값은 \"json\" 이거나 \"xml\" 여야 합니다.")
    if page_no < 1:
        raise Exception("파라미터 `page_no`의 값은 양의 정수여야 합니다.")

    # 오픈 api 요청 파라미터
    url = base_url \
        + "?" + "serviceKey=" + service_key \
        + "&" + "numOfRows=" + str(num_of_rows) \
        + "&" + "type=" + response_type \
        + "&" + "pageNo=" + str(page_no)
    return url

### 3. 비동기 API 요청 메소드 정의

In [None]:
# 일자
from datetime import datetime
date_str = datetime.today().strftime("%Y-%m-%d_%Hh%Mm%Ss")

# 로깅
import logging
logger = logging.getLogger()
logger.setLevel(logging.WARNING)
formatter = logging.Formatter(u'%(asctime)s [%(levelname)8s] %(message)s')

if not os.path.exists("./log/"):
    os.mkdir("./log/")
    
file_handler = logging.FileHandler(f"./log/download_log_{date_str}.log")
logger.addHandler(file_handler)

In [None]:
import asyncio
import nest_asyncio
import urllib.request as urlreq

# 주피터 노트북 에서 이벤트 루프를 사용할 때 발생하는 에러 방지
#   참조: Jupyter: RuntimeError: this event loop is already running
#   (https://www.markhneedham.com/blog/2019/05/10/jupyter-runtimeerror-this-event-loop-is-already-running/)
nest_asyncio.apply()

async def download_a_page(base_url: str, num_of_rows: int, page_no: int, response_type: str, 
                          timeout: int, retry_cap: int) -> pd.DataFrame:
    """
    Args:\n
        base_url        : 공공데이터포털 오픈 API 서비스 및 오퍼레이션이 명시된 베이스 url\n
        num_of_rows     : 한 페이지에 포함될 레코드 수\n
        page_no         : 조회할 페이지 번호\n
        response_type   : 응답 메시지 형식\n
        timeout         : 응답시간초과 시간기준\n
        retry_cap       : 에러 발생 시 재시도 횟수 한계\n
\n
    Returns:\n
        만약, 정상적인 결과가 반환된 경우 해당 응답 메시지의 body.items를\n
        pandas 데이터프레임 형식으로 변환하여 반환한다.\n
        만약, 예외가 발생한 경우 몇 번 페이지에서 어떤 예외가 발생했는지 콘솔에 출력하고\n
        재시도한다.\n
            만약, 재시도 횟수가 retry_cap를 넘어간 경우 이벤트 루프를 정지시킨다.\n
            이에 따라 실행 종료된다.\n
    """
    
    retry_cnt = 0
    loop = asyncio.get_event_loop()

    while True:
        # 데이터 요청 -> json 파싱 -> 응답 성공 여부 검사
        try:
            url = render_url(base_url=base_url, num_of_rows=num_of_rows, page_no=page_no, response_type=response_type)
            request = urlreq.Request(
                url=url,
                headers={"Accept": "*/*;q=0.9"}
            )
            # urllib.request.openurl에 timeout을 설정해주기 위해 람다로 랩핑하여 전달
            response = await loop.run_in_executor(None, lambda: urlreq.urlopen(url=request, timeout=timeout))

            # 2. json 객체로 역직렬화
            json_str = await loop.run_in_executor(None, response.read)  # 왜 여기에 run_in_executer를 썼지?
            json_obj = json.loads(json_str)

            # 3. 헤더 정보 추출
            result_code: str = json_obj['header']['resultCode']
            result_msg: str = json_obj['header']['resultMsg']

            # 4. 응답 성공 여부 검사
            if result_code != "00":
                raise Exception(f"요청실패({result_code}), 사유: {result_msg}")
        
        # 예외 메시지 출력 및 재시도
        except Exception as e:
            logger.exception(f"#{page_no}번 페이지에서 예외 발생: {e}")
            if retry_cnt < retry_cap:
                retry_cnt += 1
                logger.warning(f" -> 해당 페이지에 대해 {retry_cnt} 번 재시도 중입니다...")
                continue
            else:
                logger.error(f"해당 페이지에 대한 재시도 횟수 ({retry_cap}번)을 초과하였습니다. 이벤트 루프를 중지합니다.")
                loop.stop()
        
        # 성공적인 응답에 대하여, 데이터프레임으로 변환 후 반환
        else:
            # 데이터프레임 생성
            df = pd.json_normalize(json_obj['body']['items'])

            # 인덱스 설정
            # 페이지 번호에 따른 유일한 인덱스를 부여하기 위한 절차
            df_len = len(df)
            idx = [*range(  1         + (num_of_rows * (page_no - 1)), 
                            df_len    + (num_of_rows * (page_no - 1)) + 1)]       
            df.index = idx
            return df

async def download_pages(base_url: str, num_of_rows: int, from_page_no: int, to_page_no: int, response_type: json, 
                         timeout: int, retry_cap: int) -> list[pd.DataFrame]:
    """
    Args:\n
        base_url        : 공공데이터포털 오픈 API 서비스 및 오퍼레이션이 명시된 베이스 url\n
        num_of_rows     : 한 페이지에 포함될 레코드 수\n
        from_page_no    : 시작 페이지 번호\n
        to_page_no      : 끝 페이지 번호\n
        response_type   : 응답 메시지 형식\n
        timeout         : 응답시간초과 시간기준\n
        retry_cap       : 에러 발생 시 재시도 횟수 한계\n
\n
    Returns:\n
        from_page_no 번 페이지 부터, to_page_no 번 페이지 까지 비동기적으로 요청한다.\n
        모든 페이지가 전부 다운로드 될 때까지 대기한다.\n
        다운로드 과정에서(download_a_page 메소드에서) 발생한 예외는 본 메소드에서 처리되지 않고\n
        그대로 전달된다.\n
    """
    futures = (
        asyncio.ensure_future(download_a_page(base_url=base_url, num_of_rows=num_of_rows, page_no=pno, 
                response_type=response_type, timeout=timeout, retry_cap=retry_cap))
                 
        for pno in range(from_page_no, to_page_no + 1)
    )

    result = await asyncio.gather(*futures)
    return result

### 3. API의 전체 결과 수 조회 메소드

 - 비동기적으로 모든 페이지를 요청하기 위해서는 해당 API의 전체 레코드 수를 먼저 알아야한다.

In [None]:
def get_total_count(base_url: str) -> int:
    """주어진 base_url에 해당하는 오픈 API 서비스 오퍼레이션의 전체 결과 수를 반환
    Args:\n
        base_url: 공공데이터 포털의 서비스 및 오퍼레이션이 명시된 base-url
    """

    retry_cap = 5

    try:
        url = render_url(base_url, 1, 1, "json")

        response = urllib.request.urlopen(url)
        json_str = response.read().decode("utf-8")
        json_obj = json.loads(json_str)

        result_code = json_obj['header']['resultCode']  #str
        result_msg = json_obj['header']['resultMsg']    #str
        total_count = json_obj['body']['totalCount']    #int

        if result_code != "00":
            raise Exception(result_msg)

    except Exception as e:
        if retry_cap <= 0:
            raise Exception(f"URL: {url} 로부터 예외가 반환됨.\nException: {e}")
        else:
            retry_cap -= 1
            
    else:
        print(f"url             : {url}")
        print(f"결과코드        : {result_code}")
        print(f"결과메시지      : {result_msg}")
        print(f"검색 결과 수    : {total_count}")
        return total_count

#예시
print("[예시]")
get_total_count(api_dat["의약품허가정보"]["허가목록"]["base_url"])

#### 4.3. 다운로드 실행 및 데이터프레임 생성

In [None]:
# 진행도 표시 위젯
pbar = tqdm([op for service in api_dat for op in api_dat[service]], desc= "전체 진행도")

# 다운로드 실행
for service, operations in api_dat.items():
    for operation, dat in operations.items():
        print(f"===== {service} 중 {operation} =====")
        logger.info(f"===== {service} 중 {operation} =====")

        base_url = dat["base_url"]

        while True:
            total_count = get_total_count(base_url)
            if total_count is not None:
                break
            else:
                print(f"전체 결과 수 획득 실패.\n재시도...")
        num_of_rows = 100
        last_page_no = (total_count - 1) // num_of_rows + 1

        timeout = dat["timeout"]
        retry_cap = 5

        ##### No chunking #####
        dat["dataframe"] = pd.concat(
            asyncio.run(
                download_pages(base_url, 100, 1, last_page_no, "json", timeout, retry_cap)
            )
        )

        # ##### Do chunking #####
        # chunk_size = 100
        # chunk_idxs = [(pno, min(pno + chunk_size - 1, last_page_no)) for pno in range(1, last_page_no+1, chunk_size)]
        # chunks = []
        # for i in tqdm(range(0, len(chunk_idxs))):
        #     print(f"========== chunk{i+1} ==========")
        #     chunk = asyncio.run(download_pages(num_of_rows, chunk_idxs[i][0], chunk_idxs[i][1]))
        #     chunks.append(chunk)

        # loose = []
        # for chunk in tqdm(chunks):
        #     for df in chunk:
        #         loose.append(df)
        # dat["dataframe"] = pd.concat(loose)

        # progress
        pbar.update(1)


### 5. 형식 오류 수정

#### 5.1. 컬럼 명 변경

    1) 낱알식별정보, 2) 의약품허가목록, 그리고 3) DUR'품목'정보 API의 응답의 경우  
    json 바디의 items 리스트가 _[{}, {}, ..., {}]_ 와 같은 구조를 가지고 있다면,  
    
    4) DUR 성분정보의 경우 [{"item":{...}}, {"item":{...}}, ..., {"item":{...}}] 와 같은 구조를 가진다.  
    따라서 DUR 성분 정보 API에 대해 컬럼 이름을 아래과 같이 처리해주어야 한다.

In [None]:
for operation, operation_dat in api_dat["DUR성분정보"].items():

    new_columns = []
    for col_name in tqdm(operation_dat["dataframe"].columns, desc=f"DUR성분정보 중 {operation}: "):
        new_columns.append(col_name.replace("item.", ""))
    operation_dat["dataframe"].columns = new_columns

#### 5.2. 형식 오류 수정(필요한 경우)

In [None]:
for service, operations in api_dat.items():
    for operation, dat in tqdm(operations.items(), desc=f"{service}"):        
        # csv 파일 형식을 깨뜨리는 특수 문자 제거
        # 개행(\n), 캐리지리턴(\r)
        target_df = dat["dataframe"]
        for col in target_df.columns:
            target_df[col] = target_df[col].replace("\n", "", regex=True)
            target_df[col] = target_df[col].replace("\r", "", regex=True)


# EDI_CODE 컬럼이 존재하는 데이터프레임의 어떤 레코드가
# 여러 개의 EDI_CODE 값을 가지는 경우 각 EDI_CODE 값을 컴마로 구분한다.
# 그런데 csv 파일 상에서 각 컬럼의 구분을 컴마로 하기때문에 인코딩 시 다른 문자로 대치해야 한다.
print("항목 구분 문자 변경: ',' => '/'")
edi_target = api_dat["낱알식별정보"]["낱알식별정보"]["dataframe"]
edi_target['EDI_CODE'] = edi_target['EDI_CODE'].replace(",", "/", regex=True)
display(edi_target[edi_target["EDI_CODE"].notnull()].tail()[["ITEM_SEQ", "ITEM_ENG_NAME", "EDI_CODE"]])

### 6. 데이터프레임 csv 파일로 내보내기

In [None]:
# 디렉토리 생성
dir = f"..\\dat\\{date_str}\\csv\\raw\\"
if not os.path.exists(dir):
    os.makedirs(dir)

# 진행도 표시 바
pbar = tqdm([op for service in api_dat for op in api_dat[service]], desc="csv로 내보내기")

for service, operations in api_dat.items():
    for operation, dat in operations.items():
        # csv 파일 저장 경로
        filename = f"{service}-{operation}.csv"
        print(f"{filename} 쓰는 중...")

        # 내보내기
        dat["dataframe"].to_csv(dir + filename)

        # progress
        pbar.update(1)