<a href="https://colab.research.google.com/github/gksrb2656/AlgoPractice/blob/master/Data_collector.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install PublicDataReader --upgrade

Collecting PublicDataReader
  Downloading PublicDataReader-1.0.25-py3-none-any.whl (3.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m16.1 MB/s[0m eta [36m0:00:00[0m
Collecting xmltodict (from PublicDataReader)
  Downloading xmltodict-0.13.0-py2.py3-none-any.whl (10.0 kB)
Installing collected packages: xmltodict, PublicDataReader
Successfully installed PublicDataReader-1.0.25 xmltodict-0.13.0


In [2]:
"""
금융위원회 OpenAPI
semas(Small Enterprise And Market Service)

1. soleinfo 클래스: 금융위원회_개인사업자기본정보_API
    01.개인사업자개요정보조회
    02.개인사업자휴폐업정보조회
"""
import pandas as pd
import logging
import requests
import xmltodict
from bs4 import BeautifulSoup

requests.packages.urllib3.disable_warnings()


class Soleinfo:

    def __init__(self, service_key=None):

        self.service_key = service_key
        self.meta_dict = {
            "개인사업자개요정보": {
                "url": f"https://apis.data.go.kr/1160100/service/GetSBProfileInfoService/getOtlInfo",
                "columns": ['basYm', 'rprSexNm', 'rprAggrNm', 'estbYr', 'bizAreaNm', 'bizBzcCd', 'bizBzcCdNm', 'empeCntNm']
            },
            "개인사업자휴폐업정보": {
                "url": f"https://apis.data.go.kr/1160100/service/GetSBProfileInfoService/getCsdoStatus",
                "columns": ['basYm', 'rprSexNm', 'rprAggrNm', 'estbYr', 'bizAreaNm', 'bizBzcCd', 'bizBzcCdNm', 'empeCntNm', 'csdoClsfNm']
            }
        }

    def get_data(self,
                 service_name,
                 key=None,
                 resultType=None,
                 basYm=None,
                 bizAreaNm=None,
                 bizBzcCdNm=None,
                 bizBzcCd =None,
                 estbYr=None,
                 rprSexNm=None,
                 verbose=False,
                 translate=True,
                 **kwargs
                 ):
        try:
            # 서비스명으로 API URL 선택 (ex. 지정상권, 반경상권, 사각형상권 등)
            url = self.meta_dict.get(service_name).get("url")
            # 서비스명으로 API 컬럼 선택 (ex. 지정상권, 반경상권, 사각형상권 등)
            columns = self.meta_dict.get(service_name).get("columns")
        except AttributeError:
            raise AttributeError("서비스명을 확인해주세요.")
        # 서비스키, 행수, 시군구코드, 법정동코드 설정
        params = {
            "serviceKey": requests.utils.unquote(self.service_key),
            "pageNo": 1,
            "numOfRows": 9999,
            "key": key,
            "resultType": "json",
            "basYm": basYm,
            "bizAreaNm": bizAreaNm,
            "bizBzcCdNm": bizBzcCdNm,
            "bizBzcCd": bizBzcCd,
            "estbYr": estbYr,
            "rprSexNm": rprSexNm
        }
        # 선택 파라미터 추가 설정
        params.update(kwargs)
        # 빈 데이터 프레임 생성
        df = pd.DataFrame(columns=columns)
        # API 요청
        res = requests.get(url, params=params, verify=False)
        # 요청 결과 JSON 변환
        res_json = xmltodict.parse(res.text)
        # 응답 키 존재 확인
        if not res_json.get("response"):
            if verbose:
                print(res_json)
            raise Exception("API 요청이 실패했습니다.")
        # 결과코드가 정상이 아닌 경우
        if res_json['response']['header']['resultCode'] != '00':
            if res_json['response']['header']['resultCode'] == '03':
                if verbose:
                    print("조회 결과가 없습니다.")
                return pd.DataFrame(columns=columns)
            else:
                error_message = res_json['response']['header']['resultMsg']
                raise Exception(error_message)
        items = res_json['response']['body']['items']
        if not items:
            return pd.DataFrame(columns=columns)
        data = items['item']
        if isinstance(data, list):
            sub = pd.DataFrame(data)
        elif isinstance(data, dict):
            sub = pd.DataFrame([data])
        df = pd.concat([df, sub], axis=0, ignore_index=True)
        if len(df) >= 99999:
            print("행수가 99999개를 초과했습니다. 다음 페이지를 조회하세요.")
        # 컬럼명 한글로 변경
        if translate:
            df = self.translate_columns(df)
        return df

    def translate_columns(self, df):
        """
        영문 컬럼명을 한글로 변경
        """
        rename_columns = {
            'basYm': '기준년월',
            'rprSexNm': '대표자성별명',
            'rprAggrNm': '대표자연령대명',
            'estbYr': '설립년도',
            'bizAreaNm': '사업지역명',
            'bizBzcCd': '사업업종코드',
            'bizBzcCdNm': '종업원수구분명',
        }
        return df.rename(columns=rename_columns)