In [169]:
# 데이터 처리 모듈
import pandas as pd
# 행렬 등 통계 연산 모듈
import numpy as np
# 지수형 표기법 e를 연속형 변환
pd.options.display.float_format = '{:.4f}'.format
# 타입 어노테이션(Any, Sequence 등의 메서드 활용)
from typing import *

# 구글 드라이브 마운트
from google.colab import drive
drive.mount("/content/drive")
import os

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [168]:
class MergeData():

  def __init__(self, soho_credit_data = None):
    self.soho_credit_data = soho_credit_data

  def mergeSohoCredit(self, date_time: int, file_path: str, column_name_list: List):
    # date_time에 202201 ~ 202212를 입력값으로 받고, file_path에 해당 데이터가 있는 드라이브 경로 설정
    # file_path = # /content/drive/MyDrive/2023BigContest/Archive/"

    credit_data_list: List = []
      # 월별 데이터프레임을 담을 리스트 생성
    date_time_list = [str(date_time + n) for n in range(0, 12, 1)]
    for date in date_time_list:
      data = pd.read_csv(
          os.path.join(file_path + date + "_INCOME_230907.csv"),
          index_col = 0, sep = ",",
          names = column_name_list)
          # header = None)
      data.drop("index", axis = 0, inplace = True)
      data[column_name_list[1:]] = data[column_name_list[1:]].astype("float")

      # data["손익분기점_매출액_STD"] = data["월임대료"] / (1 - (data["부가가치세_STD"] + data["매입액"]) / V["매출액"]) = 고정비용 / (1-변동비율)
      data["객단가_STD"] = data["매출액_STD"] / data["고객수_STD"]
      data["신규고객단가_STD"] = data["매출액_STD"] / data["신규고객수_STD"]
      data["전체대비_주말매출액비중_STD"] = data["주말매출액_STD"] / data["매출액_STD"]
        # 파생컬럼 생성

      credit_data_list.append(data)
        # os.path.join을 통한 구글 드라이브 파일 내 경로로 csv데이터 로드하여 판다스 데이터프레임으로 저장 후
        # credit_data_list에 0 ~ 11번째 인덱스의 원소로 추가

    return credit_data_list
      # 2022년 12개월의 소호 상권별 신용거래정보 데이터프레임을 담은 리스트 반환
      # 병합하여 self.soho_credit_data라는 멤버변수에 저장할 것

In [None]:
Preprocess = MergeData()

In [None]:
credit_data_list = Preprocess.mergeSohoCredit(
    date_time = 202201,
    file_path = "/content/drive/MyDrive/2023BigContest/data/KCD_CREDIT_DATA/SOHO_",
    column_name_list = ["상권명", "매출액_STD", "매입액_STD", "매출총이익_STD", "부가가치세_STD", "부가세차감전영업이익_STD", "영업이익_STD", "주말매출액_STD", "고객수_STD", "신규고객수_STD"])

```
* standard_deviation_population(kcd.transactions.sum_sales_card + kcd.transactions.sum_sales_delivery)
* standard_deviation_population(kcd.transactions.sum_purchase_card + kcd.transactions.sum_purchase_cash)
* standard_deviation_population((kcd.transactions.sum_sales_card + kcd.transactions.sum_sales_delivery) - (kcd.transactions.sum_purchase_card + kcd.transactions.sum_purchase_cash))
* standard_deviation_population(kcd.transactions.sum_sales_invoice - kcd.transactions.sum_purchase_invoice)
* standard_deviation_population(((kcd.transactions.sum_sales_card + kcd.transactions.sum_sales_delivery) - (kcd.transactions.sum_purchase_card + kcd.transactions.sum_purchase_cash)) - kcd.transactions.monthly_rental_fee)
* standard_deviation_population((((kcd.transactions.sum_sales_card + kcd.transactions.sum_sales_delivery) - (kcd.transactions.sum_purchase_card + kcd.transactions.sum_purchase_cash)) - kcd.transactions.monthly_rental_fee) - (kcd.transactions.sum_sales_invoice - kcd.transactions.sum_purchase_invoice))
* standard_deviation_population(kcd.transactions.sum_weekend_sales_card + kcd.transactions.sum_weekend_sales_delivery)
* standard_deviation_population(sum_customer_cnt)
* standard_deviation_population(sum_new_customer_cnt)
```


In [164]:
display(credit_data_list[11].info())

<class 'pandas.core.frame.DataFrame'>
Index: 93 entries, 0 to 92
Data columns (total 13 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   상권명               93 non-null     object 
 1   매출액_STD           93 non-null     float64
 2   매입액_STD           93 non-null     float64
 3   매출총이익_STD         93 non-null     float64
 4   부가가치세_STD         93 non-null     float64
 5   부가세차감전영업이익_STD    93 non-null     float64
 6   영업이익_STD          93 non-null     float64
 7   주말매출액_STD         93 non-null     float64
 8   고객수_STD           93 non-null     float64
 9   신규고객수_STD         93 non-null     float64
 10  객단가_STD           93 non-null     float64
 11  신규고객단가_STD        93 non-null     float64
 12  전체대비_주말매출액비중_STD  93 non-null     float64
dtypes: float64(12), object(1)
memory usage: 12.2+ KB


None

In [193]:
class LoadData():

  def __init__(self,
               commerce_district_data_path: str, commerce_district_data = None, soho_commerce_data = None):
    # 생성자를 활용하여 초기화하고 클래스 내부에 캡슐화

    commerce_district_data_chunk = pd.read_csv(
        commerce_district_data_path, chunksize = 10**5,
        index_col = False, na_values = "NaN", encoding = "utf-8")
    commerce_district_data_list = list(commerce_district_data_chunk)
      # 용량이 큰 csv 파일 읽어오기(fopen - fread와 유사한 방식)
    self.commerce_district_data = pd.concat(commerce_district_data_list)
    self.soho_commerce_data = soho_commerce_data


  def chunkDataMerge(self, column_name_list: List, codebook_filepath: str, district_file_path: str,
                     left_join_key1: str, right_join_key1: str, left_join_key2: str, right_join_key2: str):
    self.commerce_district_data.columns = column_name_list
    codebook = pd.read_csv(codebook_filepath, index_col = False)
    district_code = pd.read_csv(district_file_path, index_col = False, encoding = "euc-kr")

    self.commerce_district_data = pd.merge(
        left = self.commerce_district_data, right = codebook,
        left_on = left_join_key1, right_on = right_join_key1, how = "left")
    # display(self.commerce_district_data.info())
    # display(district_code.info())

    self.commerce_district_data.drop(right_join_key1, axis = 1, inplace = True)
    self.commerce_district_data = pd.merge(
        left = self.commerce_district_data, right = district_code[["상권_구분_코드_명", "상권_코드", "상권_코드_명"]],
        left_on = left_join_key2, right_on = right_join_key2, how = "left")

    return self.commerce_district_data


  def splitMonthlyData(self, month: int):
    soho_sales_grade_list: List = []
    while month < 202213:
      soho_sales_grade_list.append(
          self.commerce_district_data.loc[self.commerce_district_data["데이터기준연월"] == month])
      month += 1

    return soho_sales_grade_list


  def mergeData(self, soho_credit_list: List, sales_grade_list: List,
                left_join_key: str, right_join_key: str):
    self.soho_commerce_data: List = []
    for index in range(0, 11, 1):
      self.soho_commerce_data(
          pd.merge(left = soho_credit_list[index], right = sales_grade_list[index],
                  left_on = left_join_key, right_on = right_join_key, how = "left"))

    return self.soho_commerce_data

In [195]:
soho_sales_grade = LoadData(
    commerce_district_data_path = "/content/drive/MyDrive/2023BigContest/data/대회제공/필지단위 소상공인 매출등급 정보.csv")
commerce_district_sales = soho_sales_grade.chunkDataMerge(
    column_name_list = ["데이터기준연월", "필지고유번호", "법정동읍면동코드", "업종코드", "매출등급"],
    codebook_filepath = "/content/drive/MyDrive/2023BigContest/data/market_loc_map.csv",
    left_join_key1 = "법정동읍면동코드", right_join_key1 = "STDG_EMD_CD",
    district_file_path = "/content/drive/MyDrive/2023BigContest/data/서울시 상권분석서비스(상권영역).csv",
    left_join_key2 = "상권_코드", right_join_key2 = "상권_코드")

<class 'pandas.core.frame.DataFrame'>
Int64Index: 13138513 entries, 0 to 13138512
Data columns (total 8 columns):
 #   Column       Dtype  
---  ------       -----  
 0   데이터기준연월      int64  
 1   필지고유번호       float64
 2   법정동읍면동코드     int64  
 3   업종코드         object 
 4   매출등급         int64  
 5   상권_코드        float64
 6   행정동_코드       float64
 7   STDG_EMD_CD  float64
dtypes: float64(4), int64(3), object(1)
memory usage: 902.2+ MB


None

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1650 entries, 0 to 1649
Data columns (total 10 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   기준_년월_코드    1650 non-null   int64  
 1   상권_구분_코드    1650 non-null   object 
 2   상권_구분_코드_명  1650 non-null   object 
 3   상권_코드       1650 non-null   int64  
 4   상권_코드_명     1650 non-null   object 
 5   엑스좌표_값      1650 non-null   int64  
 6   와이좌표_값      1650 non-null   int64  
 7   시군구_코드      1650 non-null   int64  
 8   행정동_코드      1650 non-null   int64  
 9   형태정보        0 non-null      float64
dtypes: float64(1), int64(6), object(3)
memory usage: 129.0+ KB


None

In [196]:
soho_sales_grade_list = soho_sales_grade.splitMonthlyData(month = 202201)

```
STDG_EMD_CD_codebook = pd.read_excel(
    "/content/drive/MyDrive/2023BigContest/data/대회제공/소상공인 매출등급 예상 대상 필지.xlsx",
    sheet_name = 0,
    index_col = None)
STDG_EMD_CD_codebook.drop(["SLS_GRD_2301", "SLS_GRD_2302"], axis = 1, inplace = True)
  # [법정동읍면동코드] 시, 구 아래에 두며 그 아래에는 통과 반이 있는, 법률로 지정한 행정 구역의 정보를 나타내는 기호 체계
  # 법정동(10자리)의 구성: 광역시(2자리)+시군구(3자리)+읍면동(3자리)+리(2자리)
  # 법정동코드 10자리 중 앞6번째부터 8번째자리(101:청운동, 101:신교동…)
display(STDG_EMD_CD_codebook)
  # 고유식별자는 총 10,000행
  # [매출등급] 특정 필지의 '평당추정매출/평당월임대료'
```

In [None]:
display(credit_data_list[10]), display(soho_sales_grade_list[10])
  # 2022년과 2023의 상권코드는 체계가 다른것으로 추정되어 TABLE JOIN하였으나 모두 NULL값으로 반환

In [None]:
soho_commerce_merge_data = soho_sales_grade.mergeData(
    soho_credit_list = credit_data_list, sales_grade_list = soho_sales_grade_list,
    left_join_key = "상권코드", right_join_key = "상권_코드")

```
# VarianceInflationFactor()
# VIF(분산팽창인자) 값에 따른 최소한 모형 적합 시 입력할 독립변수의 수 파악을 위한 반복문 수행
  def VarainceInflationFactor(self,
                              X_train_data: pd.DataFrame(), X_test_data: pd.DataFrame(),
                              # num_variables: int,
                              Y_train_data: pd.DataFrame(), Y_test_data: pd.DataFrame()):

    evaluation_train = pd.DataFrame()
    evaluation_test = pd.DataFrame()

    for index in tqdm( range(1, len(X_train_data.columns) + 1)):

    # 분산팽창인자를 피처 수별로 구한 후 오름차순하여 vif 데이터프레임에 저장(다중공선성 문제로 나머지 변수 제외)
    # X_train_data에서 변수 index 수만큼의 피처(독립변수)만 남기고 나머지 변수 제외하여 X_train_fe_scaling_multiple 생성
    # X_train_test도 이하 상동
      vif = pd.DataFrame()
      vif["VIF_factor"] = [variance_inflation_factor(X_train_data.values, index)
                          for index in range(X_train_data.shape[1]) ]
      vif["Features"] = X_train_data.columns
      # print(vif)
      X_column_vif = vif.sort_values(by = "VIF_factor", ascending = True)["Features"][:index].values
        # TypeError: 'NoneType' object is not subscriptable

      X_train_fe_scaling_multiple, X_test_fe_scaling_multiple = X_train_data[X_column_vif].copy(), X_test_data[X_column_vif].copy()
```