## 보험이력 추출 코드

In [7]:
import csv
import time
import re
import numpy as np
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

# Chrome 드라이버 생성시 옵션을 설정
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

print("작업 완료")

작업 완료


### 크롤링 셋업
#### `MAX_FETCH_COUNT`
- 보험이력 조회 횟수 조절
#### `INPUT_FILE_PATH`
- 매물 `id` 가 있는 `csv` 파일의 경로를 설정
#### `OUTPUT_FILE_PATH`
- 파일 저장 경로를 설정

In [8]:
MAX_FETCH_COUNT = 25600
INPUT_FILE_PATH = "../export/vehicles_price.csv"
OUTPUT_FILE_PATH = "../export/vehicles_history.csv"

In [9]:
def fetch_insurance_history(driver, vehicle_ids, output_file):
    # ID에 부합하는 보험이력 정보 저장
    '''
    id : Vehicle ID (매물 ID)
    n : Car Number (차량등록번호)
    model : Car Model (모델명)
    fuel : Fuel Type (연료)
    cc : Engine displacement (배기량)
    date : First date of registration (최초등록일)
    nc : Number Change History (번호 변경 이력)
    oc : Owner Change History (소유자 변경 이력)
    tl : Total Loss History (전손 사고 이력)
    fd : Flood Damage History (침수 사고 이력)
    tf : Theft History (도난 사고 이력)
    cm : Insurance Claim of My Car (내 차 피해 이력)
    co : Insurance Claim of Other Car (타 차 가해 이력)
    '''
    fieldnames = ["id", "license", "model", "fuel", "cc", "registration_date", "license_changed", "owner_changed", "total_loss", "flood", "theft", "cm", "co"]
    data = []
    count = 0
    log_count = 0
    for i, vid in enumerate(vehicle_ids):
        try:
            log_count += 1
            if log_count >= 20: 
                print(f"\n누적 조회 수: {len(vehicle_ids)}개 중 {i + 1}개\n")
                log_count = 0

            driver.get(f"https://fem.encar.com/cars/report/accident/{vid}")
            details = {
                "id": vid,
                "license": WebDriverWait(driver, 1).until(
                    EC.presence_of_element_located((By.XPATH, "//*[@id='wrap']/div/div[2]/div[2]/dl/dd[1]"))
                ).text,
                "model": driver.find_element(By.XPATH, "/html/body/div[1]/div/div[2]/div[2]/div[2]/dl[1]/dd/div/table/tbody/tr[1]/td[2]").get_attribute('innerText'),
                "fuel": driver.find_element(By.XPATH, "/html/body/div[1]/div/div[2]/div[2]/div[2]/dl[1]/dd/div/table/tbody/tr[2]/td[2]").get_attribute('innerText'),
                "cc": driver.find_element(By.XPATH, "/html/body/div[1]/div/div[2]/div[2]/div[2]/dl[1]/dd/div/table/tbody/tr[3]/td[1]").get_attribute('innerText'),  # /text()[1] 제거
                "registration_date": driver.find_element(By.XPATH, "/html/body/div[1]/div/div[2]/div[2]/div[2]/dl[1]/dd/div/table/tbody/tr[4]/td[2]").get_attribute('innerText'),
                "license_changed": driver.find_element(By.XPATH, "//*[@id='wrap']/div/div[2]/div[2]/div[1]/ul/li[3]/span/span[1]").text,
                "owner_changed": driver.find_element(By.XPATH, "//*[@id='wrap']/div/div[2]/div[2]/div[1]/ul/li[3]/span/span[2]").text,
                "total_loss": driver.find_element(By.XPATH, "//*[@id='wrap']/div/div[2]/div[2]/div[1]/ul/li[4]/span/span[1]").text,
                "flood": driver.find_element(By.XPATH, "//*[@id='wrap']/div/div[2]/div[2]/div[1]/ul/li[4]/span/span[2]").text,
                "theft": driver.find_element(By.XPATH, "//*[@id='wrap']/div/div[2]/div[2]/div[1]/ul/li[4]/span/span[3]").text,
                "cm": driver.find_element(By.XPATH, "//*[@id='wrap']/div/div[2]/div[2]/div[1]/ul/li[5]/span").text,
                "co": driver.find_element(By.XPATH, "//*[@id='wrap']/div/div[2]/div[2]/div[1]/ul/li[6]/span").text
            }
            data.append(details)
            count += 1

            if(count > MAX_FETCH_COUNT) : 
                print(f"{count}개 보험 데이터 추출 완료")
                break;
            
        except Exception:
            print(f"ID '{vid}': 판매자가 보험이력을 공개하지 않았습니다.")
            
    with open(output_file, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)
        print(f"{count}개 보험이력 데이터가 저장되었습니다.")

def merge_csv_files(files, output_file):
    # csv파일 최종적으로 하나로 병합
    combined_df = pd.concat([pd.read_csv(file) for file in files], ignore_index=True)
    combined_df.to_csv(output_file, index=False, encoding="utf-8")
    print(f"'{output_file}'파일에 병합되어 저장되었습니다.")


def main():
    try:
        df = pd.read_csv(INPUT_FILE_PATH)
        id_list = df['id'].tolist()
        csv_files = []
        fetch_insurance_history(driver, id_list, OUTPUT_FILE_PATH)
        csv_files.append(OUTPUT_FILE_PATH)
        print(f"===========================================================")
        merge_csv_files(csv_files, OUTPUT_FILE_PATH)

        # 병합된 CSV 파일 읽기
        final_output_file = OUTPUT_FILE_PATH
        df = pd.read_csv(final_output_file)

        # 숫자 추출 함수 정의
        def extract_number(value):
            # 숫자만 추출
            import re
            match = re.search(r'\d+', str(value))
            return int(match.group()) if match else 0

        def process_cm(value):
            if value == "없음":
                return "0/0"
            elif "미확정" in value:
                match = re.search(r'(\d+)회', value)
                if match:
                    count = match.group(1)
                    return f"{count}/None"
                else:
                    return "None/None"
            else:
                match = re.search(r'(\d+)회[^\d]*(\d+)', value.replace(",", ""))
                if match:
                    count = match.group(1)
                    amount = match.group(2)
                    return f"{count}/{amount}"
                else:
                    return "None/None"

        # 'cm' 컬럼 변환 적용
        df['cm'] = df['cm'].apply(process_cm)

        # '/' 기준으로 컬럼 나누기
        df[['damaged_count', 'damaged_total']] = df['cm'].str.split('/', expand=True)
        
        # 기존 'cm' 컬럼 제거
        df.drop(columns=['cm'], inplace=True)

        # 'cm_price' 컬럼에서 None 값을 NaN으로 변환 (숫자 처리 가능하도록)
        df['damaged_total'] = df['damaged_total'].replace("None", np.nan)  # pd.NA 대신 np.nan 사용
        
        # 평균값 계산 (NaN 값 제외)
        average_price = df['damaged_total'].astype(float).mean()  # float 변환 후 평균값 계산
        
        # 평균값을 반올림
        average_price = round(average_price)
        
        # None (NaN) 값을 반올림된 평균값으로 대체
        df['damaged_total'] = df['damaged_total'].astype(float).fillna(average_price)

        print(f"damaged_total 대체값 : {average_price}")

        # 'cm_price' 컬럼을 정수로 변환 (소수점 제거)
        df['damaged_total'] = df['damaged_total'].round().astype(int)

        print("==================== co 값 정제 =====================")

        # 'co' 컬럼 처리 함수
        def process_co(value):
            if value == "없음":
                return "0/0"
            elif "미확정" in value:
                match = re.search(r'(\d+)회', value)
                if match:
                    count = match.group(1)
                    return f"{count}/None"
                else:
                    return "None/None"
            else:
                match = re.search(r'(\d+)회[^\d]*(\d+)', value.replace(",", ""))
                if match:
                    count = match.group(1)
                    amount = match.group(2)
                    return f"{count}/{amount}"
                else:
                    return "None/None"
        
        # 'co' 컬럼 정제
        df['co'] = df['co'].apply(process_co)
        
        # 'co' 컬럼을 'co_num'과 'co_price'로 분리
        df[['damaged_by_other_count', 'damaged_by_other_total']] = df['co'].str.split('/', expand=True)
        
        # 기존 'co' 컬럼 삭제
        df.drop(columns=['co'], inplace=True)

        # 'co_price' 컬럼에서 None 값을 NaN으로 변환
        df['damaged_by_other_total'] = df['damaged_by_other_total'].replace('None', np.nan).astype(float)
        
        # 'co_price'의 평균값 계산
        co_price_mean = round(df['damaged_by_other_total'].mean())
        
        # 평균값으로 NaN 대체
        df['damaged_by_other_total'].fillna(co_price_mean, inplace=True)

        print(f"damaged_by_other_total 대체값 : {co_price_mean}")

        df['damaged_by_other_total'] = df['damaged_by_other_total'].astype(int)
        # 변환할 일반 숫자 추출 컬럼
        columns_to_convert = ["license_changed", "owner_changed", "total_loss", "flood", "theft"]

        # 일반 숫자 추출 수행
        for col in columns_to_convert:
            df[col] = df[col].apply(extract_number)

        # 변환된 결과를 다시 CSV 파일로 저장
        df.to_csv(final_output_file, index=False, encoding="utf-8")
        print(f"숫자 형식으로 변환된 결과가 '{final_output_file}'에 저장되었습니다.")
        
    finally:
        driver.quit()

if __name__ == "__main__":
    main()


11개 보험 데이터 추출 완료
11개 보험이력 데이터가 저장되었습니다.
'./vehicles_history.csv'파일에 병합되어 저장되었습니다.
damaged_total 대체값 : 1532917
damaged_by_other_total 대체값 : 166963
숫자 형식으로 변환된 결과가 './vehicles_history.csv'에 저장되었습니다.


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['damaged_by_other_total'].fillna(co_price_mean, inplace=True)
