In [15]:
import os
import glob
import shutil
import logging
import re
from tqdm import tqdm
import pandas as pd
import time
from datetime import datetime

from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.common.exceptions import UnexpectedAlertPresentException

from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

음식점정보 CSV전처리

In [16]:
 # 작업경로 초기화
def init_folder(download_path):
    if os.path.exists(download_path):
    # 폴더가 존재하면 내용을 모두 삭제
        for filename in os.listdir(download_path):
            file_path = os.path.join(download_path, filename)
            try:
                if os.path.isfile(file_path) or os.path.islink(file_path):
                    os.unlink(file_path)
                elif os.path.isdir(file_path):
                    shutil.rmtree(file_path)
            except Exception as e:
                logging.warning(f'Failed to delete {file_path}. Reason: {e}')
    else:
        # 폴더가 존재하지 않으면 새로 생성
        os.makedirs(download_path)

    logging.info("'downloads' 폴더가 정리되었습니다.")

# 다운로드 폴더와 원하는 파일 이름 설정
def downloadCsv(download_url, download_path):

    input_fieldcode = "TRDSTATENM" #(영업상태명) 필드코드
    input_text = "영업"

    # Chrome WebDriver 설정
    options = webdriver.ChromeOptions()
    options.add_experimental_option("prefs", {
        "download.default_directory": download_path,
        "download.prompt_for_download": False,
        "download.directory_upgrade": True,
        "safebrowsing.enabled": True,
        "profile.default_content_settings.popups": 0
    })

    # WebDriver 초기화
    driver = webdriver.Chrome(options=options)

    # 웹페이지 열기
    driver.get(download_url)
    driver.maximize_window()

    try:
        select_element = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, "selt_filterCol"))
    )
        select = Select(select_element)
        select.select_by_value(input_fieldcode)
        get_fieldcode = select_element.get_attribute('value')
        if get_fieldcode == input_fieldcode:
            logging.info(f"{input_fieldcode} 값이 정상적으로 입력되었습니다.")
        else:
            logging.warning(f"값이 올바르게 입력되지 않았습니다. 예상값: {input_fieldcode}, 실제값: {get_fieldcode}")        

        text_element = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.ID, "txtFilter"))
        )
        text_element.clear()
        text_element.send_keys(input_text)
        get_text = text_element.get_attribute('value')
        if get_text == input_text:
            logging.info(f"{input_text} 값이 정상적으로 입력되었습니다.")
        else:
            logging.warning(f"값이 올바르게 입력되지 않았습니다. 예상값: {input_text}, 실제값: {get_text}")

        # '조회' 버튼 찾기 및 클릭
        inquire_button = driver.find_element(By.XPATH, "//button[@class='btn-base-s'][.//span[text()='조회']]")
        inquire_button.click()
    
        time.sleep(3)

        # 'CSV 내려받기' 버튼 찾기 및 클릭
        csv_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.ID, "btnCsv"))
        )
        csv_button.click()

        # 알림 창 처리
        try:
            WebDriverWait(driver, 5).until(EC.alert_is_present())
            alert = driver.switch_to.alert
            logging.info(f"알림 메시지: {alert.text}")
            alert.accept()
        except:
            logging.warning("알림 창이 나타나지 않았습니다.")

        logging.info("CSV 파일 다운로드가 시작되었습니다.")

        csv_files = glob.glob(os.path.join(download_path, "*.csv"))
        if csv_files:
            logging.info(f"CSV 파일 다운로드가 완료되었습니다: {csv_files[0]}")
        else:
            logging,Warning("CSV 파일 다운로드가 실패했거나 아직 완료되지 않았습니다.")

    # 다운로드 완료 대기
        time.sleep(3)  # 필요에 따라 시간 조정

    except UnexpectedAlertPresentException as e:
        logging.warning(f"예상치 못한 알림 창 발생: {e.alert_text}")
        driver.switch_to.alert.accept()

    except Exception as e:
        logging.warning(f"오류 발생: {e}")

    finally:
        # 브라우저 종료
        driver.quit()
    return(csv_files[0])

# csv파일을 읽어서 df생성
def read_csv(csv_name):
    df = pd.DataFrame()
    encodings = ['utf-8', 'cp949', 'euc-kr']
    for encoding in encodings:
        try:
            df = pd.read_csv(csv_name, 
                             usecols=['관리번호', '전화번호', '지번주소', '도로명주소', '사업장명', '업태구분명'],
                             dtype={'관리번호': str, '전화번호': str, '지번주소': str, '도로명주소': str, '사업장명': str, '업태구분명': str},
                             encoding=encoding)
            logging.info(f"파일을 {encoding} 인코딩으로 성공적으로 읽었습니다.")
            # print(df.head())  # 데이터 확인
            return df
        except Exception as e:
            logging.warning(f"{encoding} 인코딩으로 읽기 실패: {e}")
    
    logging.warning("모든 인코딩 시도 실패")
    return None

def csv_main():
    download_url = "https://data.seoul.go.kr/dataList/OA-18665/S/1/datasetView.do"
    download_path = os.path.join(os.getcwd(), "downloads") #현재 작업 디렉토리(os.getcwd())

    init_folder(download_path)
    csv_name = downloadCsv(download_url,download_path)
    #csv_name = glob.glob(os.path.join(download_path, "*.csv"))[0]
    csv_data = read_csv(csv_name)
    if csv_data is not None:
        logging.info(f"총 {len(csv_data)} 개의 행이 로드되었습니다.")
    else:
        logging.warning("CSV 파일 읽기에 실패했습니다.")
    return csv_data

카카오맵에서 추출

In [21]:
def search_restaurant_kakao(driver, item):
    driver.get("https://map.kakao.com/")
    searchbox = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.XPATH, "//input[@id='search.keyword.query']"))
    )
    searchbox.clear()
    searchbox.send_keys(item)
    get_text = searchbox.get_attribute('value')
    if get_text != item:
        logging.warning(f"값이 올바르게 입력되지 않았습니다. 예상값: {item}, 실제값: {get_text}")

    searchbutton = WebDriverWait(driver, 10).until(
        EC.element_to_be_clickable((By.XPATH, "//button[@id='search.keyword.submit']"))
    )
    driver.execute_script("arguments[0].click();", searchbutton)
    #searchbutton.click()
    time.sleep(2)

def extract_data(driver):
    result = {}
    base_selector = "#info\\.search\\.place\\.list > li:nth-child(1)"
    selectors = {
        'href': f"{base_selector} > div.info_item > div.contact.clickArea > a.moreview",
        'rating': f"{base_selector} > div.rating.clickArea > span.score > em",
        'review': f"{base_selector} > div.rating.clickArea > span.score > a",
        'blog_review': f"{base_selector} > div.rating.clickArea > a > em"
    }
    for key, selector in selectors.items():
            try:
                element = WebDriverWait(driver, 10).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, selector))
                )
                if key == 'href':
                    value = element.get_attribute("href")
                elif key == 'rating':
                    try:
                        value = float(element.text)
                    except:
                        value = 0.0
                elif key == 'review':
                    try:
                        value = int(element.text.replace("건", ""))
                    except:
                        value = 0
                elif key == 'blog_review':
                    try:
                        value = int(element.text)
                    except:
                        value = 0
                
                result[key] = value
                logging.info(f"추출된 {key} 값: {value}")
            except Exception as e:
                logging.warning(f"{key} 요소를 찾을 수 없습니다: {str(e)}")
                result[key] = None

    return result

def get_reviews(driver, shop_ID, shop_name, result, latest_review_ids):
    now = datetime.now()
    now_date = now.strftime("%Y.%m.%d")
    reviewCount = int(result["review"])
    url = result["href"]
    review_df = pd.DataFrame(columns=["review_id", "user_id", "user_name", "shop_id", "shop_name", "content", "write_date", "crawl_date", "review_ratings",])
    user_info_df = pd.DataFrame(columns=["UserID", "UserName", "Reviews", "Ratings",])
    latest_review_id = int(latest_review_ids.get(int(shop_ID), 0))

    if reviewCount == 0:
        logging.info("리뷰가 없습니다.")
        return review_df, user_info_df

    logging.info("리뷰 존재")
    try:
        driver.execute_script('window.open("' + url + '", "_blank");')
        driver.switch_to.window(driver.window_handles[-1])
        WebDriverWait(driver, 10).until(
            lambda driver: driver.execute_script('return document.readyState') == 'complete'
        )
        while True:
            try:
                next_page = WebDriverWait(driver, 5).until(
                    EC.element_to_be_clickable((By.CSS_SELECTOR, "#mArticle > div.cont_evaluation > div.evaluation_review > a"))
                )
                if next_page.text == "후기 더보기":
                    next_page.click()
                    time.sleep(0.5)
                else:
                    break
            except Exception as e:
                logging.warning(f"오류 발생: {e}")
                break

        # WebDriverWait(driver, 10).until(
        #     EC.presence_of_element_located((By.CSS_SELECTOR, '.list_evaluation > li'))
        # )
        html = driver.page_source
        soup = BeautifulSoup(html, 'html.parser')
        review_lists = soup.select('.list_evaluation > li')

        for review in review_lists:
            try:
                review_id = int(review.get('data-id'))
                if review_id > latest_review_id:
                    review_data = extract_review_data(review, shop_ID, shop_name, now_date)
                    user_info = extract_user_info(review, now_date)

                    if review_data and any(review_data.values()):
                        review_data_df = pd.DataFrame([review_data])
                        if not review_data_df.empty:
                            review_df = pd.concat([review_df, review_data_df], ignore_index=True)

                    if user_info and any(user_info.values()): # user_info가 비어있지 않고 값이 있는 경우에만 연결
                        user_data_df = pd.DataFrame([user_info])
                        if not user_data_df.empty:
                            user_info_df = pd.concat([user_info_df, user_data_df], ignore_index=True)

                else:
                    logging.info(f"리뷰 ID {review_id}부터는 이미 수집되었습니다. 건너뜁니다.")
                    break
            except Exception as e:
                logging.warning(f"리뷰 처리 중 오류 발생: {e}")

    except Exception as e:
        logging.error(f"리뷰 페이지 접근 중 오류 발생: {e}")
    finally:
        driver.close()
        driver.switch_to.window(driver.window_handles[0])

    return review_df, user_info_df

def extract_review_data(review, shop_ID, shop_name, now_date):
    user_review = review.select_one('.txt_comment > span').text
    user_name = review.select_one('.txt_username').text
    rating_style = review.select_one('.ico_star.inner_star').get("style")
    rating = float(re.findall(r'\d+', rating_style)[0])/20
    user_id = review.get('data-userid')
    timestamp = review.select_one(' div > span.time_write').text
    
    return {
        "review_id": int(review.get('data-id')),
        "user_id": int(user_id),
        "user_name": user_name,
        "shop_id": int(shop_ID),
        "shop_name": shop_name,
        "content": user_review,
        "write_date": timestamp,
        "crawl_date": now_date,
        "review_ratings": rating,
        #"LikePoint": likepoint_string,
    }

def extract_user_info(review, now_date):
    user_id = int(review.get('data-userid'))
    user_name = review.select_one('.txt_username').text
    user_info_div = review.find('div', class_='unit_info')
    review_count = int(user_info_div.find('span', class_='txt_item', string='후기').find_next_sibling('span', class_='txt_desc').text.replace(",", ""))
    rating_avg = float(user_info_div.find('span', class_='txt_item', string='별점평균').find_next_sibling('span', class_='txt_desc').text)

    return {
        "user_id": user_id,
        "user_name": user_name,
        "stnd_ratings": rating_avg,
        "total_reviews_cnt": review_count,
        "insert_date": now_date,
        "update_date": now_date,
    }

def extract_main(items):
    latest_review_ids = get_latest_review_ids()
    driver = initialize_driver()
    all_reviews = pd.DataFrame()
    all_users = pd.DataFrame()

    for shop_ID, shop_name in tqdm(items, desc="식당 처리 중"):
        search_restaurant_kakao(driver, shop_name)
        if len(driver.find_elements(By.XPATH, "//a[@class='moreview']")) == 1:
            logging.info(f'{shop_name}: 식당 존재')
            result = extract_data(driver)
            review_df, user_info_df = get_reviews(driver, shop_ID, shop_name, result, latest_review_ids)
            """ 빈 DataFrame이나 모든 값이 NA인 행을 처리 """
            if not review_df.empty:
                review_df = review_df.dropna(how='all')
                all_reviews = pd.concat([all_reviews, review_df], ignore_index=True)
            if not user_info_df.empty:
                user_info_df = user_info_df.dropna(how='all')
                all_users = pd.concat([all_users, user_info_df], ignore_index=True)
        else:
            logging.warning(f'{shop_name}: 식당이 존재하지 않거나 검색결과가 하나가 아님')

    driver.quit()
    return all_reviews, all_users

DB관련 함수

In [18]:
# 데이터베이스 연결 함수
def get_db_connection():


def insert_reviews_to_db(reviews_df):
    try:
        engine = get_db_connection()
        with engine.connect() as connection:
            for _, row in reviews_df.iterrows():
                insert_query = text("""
                INSERT INTO kakaomap_review 
                (review_id, user_id, user_name, shop_id, shop_name, content, write_date, crawl_date, review_ratings)
                VALUES (:review_id, :user_id, :user_name, :shop_id, :shop_name, :content, :write_date, :crawl_date, :review_ratings)
                """)
                
                write_date = row['write_date'].replace('.', '')
                crawl_date = row['crawl_date'].replace('.', '')
                
                values = {
                    "review_id": row['review_id'],
                    "user_id": row['user_id'],
                    "user_name": row['user_name'],
                    "shop_id": row['shop_id'],
                    "shop_name": row['shop_name'],
                    "content": row['content'],
                    "write_date": write_date,
                    "crawl_date": crawl_date,
                    "review_ratings": row['review_ratings']
                }
                
                connection.execute(insert_query, values)
            
            connection.commit()
        
        logging.info(f"{len(reviews_df)} 개의 리뷰가 성공적으로 삽입되었습니다.")
    
    except SQLAlchemyError as e:
        logging.error(f"데이터베이스 오류: {e}")
    finally:
        logging.info("데이터베이스 작업이 완료되었습니다.")

def get_latest_review_ids():
    try:
        engine = get_db_connection()
        with engine.connect() as connection:
            query = "SELECT shop_id, MAX(review_id) AS max_review_id FROM kakaomap_review GROUP BY shop_id"
            df = pd.read_sql(query, connection)
            logging.info(f"최신 리뷰 ID 조회 결과:\n{df}")
            
        return df.set_index('shop_id')['max_review_id'].to_dict()
    
    except SQLAlchemyError as e:
        logging.error(f"데이터베이스 오류: {e}")
    finally:
        logging.info("데이터베이스 작업이 완료되었습니다.")
    return {}

def insert_users_to_db(user_df):
    try:
        engine = get_db_connection()
        with engine.connect() as connection:
            for _, row in user_df.iterrows():
                insert_query = text("""
                INSERT INTO kakaomap_user 
                (user_id, user_name, stnd_ratings, total_reviews_cnt, insert_date, update_date)
                VALUES (:user_id, :user_name, :stnd_ratings, :total_reviews_cnt, :insert_date, :update_date)
                ON DUPLICATE KEY UPDATE
                user_name = VALUES(user_name),
                stnd_ratings = VALUES(stnd_ratings),
                total_reviews_cnt = VALUES(total_reviews_cnt),
                update_date = VALUES(update_date)
                """)
                
                insert_date = row['insert_date'].replace('.', '')
                update_date = row['update_date'].replace('.', '')
                
                values = {
                    "user_id": row['user_id'],
                    "user_name": row['user_name'],
                    "stnd_ratings": row['stnd_ratings'],
                    "total_reviews_cnt": row['total_reviews_cnt'],
                    "insert_date": insert_date,
                    "update_date": update_date,
                }
                
                connection.execute(insert_query, values)
            
            connection.commit()
        
        logging.info(f"{len(user_df)} 개의 리뷰가 성공적으로 삽입되었습니다.")
    
    except SQLAlchemyError as e:
        logging.error(f"데이터베이스 오류: {e}")
    finally:
        logging.info("데이터베이스 작업이 완료되었습니다.")

Main

In [19]:
"""웹드라이버 초기화"""
def initialize_driver():
    chrome_options = Options()
    chrome_options.add_argument("--headless")  # 헤드리스 모드 실행 (선택적)
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")

    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_options)
    driver.maximize_window()
    return driver

def main():
    csv_data = csv_main()
    # 실행
    region = "마포구"
    # DataFrame에서 '관리번호'와 '사업장명' 열만 추출
    extracted_data = csv_data.head(8)[['관리번호', '사업장명']]
    # '관리번호' "-" 키워드 제거
    extracted_data['관리번호'] = extracted_data['관리번호'].apply(lambda x: x.replace("-","")[7:])
    # '사업장명' 앞에 "마포구" 키워드 추가
    extracted_data['사업장명'] = extracted_data['사업장명'].apply(lambda x: f"{region} {x}")
    # '관리번호'와 수정된 '사업장명'을 리스트로 변환
    result_list = extracted_data.values.tolist()
    # top_5_rows = [f"{region} {str(value)}" for value in csv_data.head(2)['사업장명'].tolist()]
    try:
        reviews, users = extract_main(result_list)
        insert_reviews_to_db(reviews)
        insert_users_to_db(users)
        logging.info("데이터 추출 및 삽입이 성공적으로 완료되었습니다.")
    except Exception as e:
        logging.error(f"데이터 처리 중 오류 발생: {e}")
    finally:
        logging.info("프로그램 실행이 완료되었습니다.")

In [22]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger('selenium').setLevel(logging.CRITICAL)
logging.getLogger('webdriver_manager').setLevel(logging.CRITICAL)

if __name__ == "__main__":
    main()

2024-09-10 13:51:39,509 - INFO - 'downloads' 폴더가 정리되었습니다.
2024-09-10 13:51:49,923 - INFO - TRDSTATENM 값이 정상적으로 입력되었습니다.
2024-09-10 13:51:50,115 - INFO - 영업 값이 정상적으로 입력되었습니다.
2024-09-10 13:51:58,652 - INFO - CSV 파일 다운로드가 시작되었습니다.
2024-09-10 13:51:58,653 - INFO - CSV 파일 다운로드가 완료되었습니다: c:\Start\python_basic\KakaoMap\downloads\서울시 마포구 일반음식점 인허가 정보.csv
2024-09-10 13:52:08,078 - INFO - 파일을 cp949 인코딩으로 성공적으로 읽었습니다.
2024-09-10 13:52:08,079 - INFO - 총 8234 개의 행이 로드되었습니다.
2024-09-10 13:52:09,488 - INFO - 최신 리뷰 ID 조회 결과:
        shop_id  max_review_id
0  1.011971e+11       10572330
1  1.011973e+11       10025393
2  1.011976e+11        8031798
3  1.011976e+11       10225080
2024-09-10 13:52:09,492 - INFO - 데이터베이스 작업이 완료되었습니다.
2024-09-10 13:52:10,571 - INFO - Get LATEST chromedriver version for google-chrome
2024-09-10 13:52:10,901 - INFO - Get LATEST chromedriver version for google-chrome
2024-09-10 13:52:11,239 - INFO - Driver [C:\Users\moon\.wdm\drivers\chromedriver\win64\128.0.6613.119\chromedr