In [None]:
!pip install beautifulsoup4
!pip install requests
!pip install fake_useragent
!pip install selenium


In [None]:
import psutil
import shutil
from fake_useragent import UserAgent
from selenium import webdriver
import subprocess
import os

ua = UserAgent()

class Scrapper:
    driver = None
    chrome_data_folder = os.path.join(os.getcwd(), "ChromeData")
    debug_window_pid = None
    debug_window_port = 9222
    options = None

    def __init__(self):
        chrome_options = {
            '--disable-gpu': None,
            '--disable-dev-shm-usage': None,
            '--disable-setuid-sandbox': None,
            '--no-first-run': None,
            '--no-sandbox': None,
            '--no-zygote': None,
            '--disable-blink-features': 'AutomationControlled',
            'start-maximized': None,
            'disable-infobars': None,
            '--disable-extensions': None,
            '--remote-debugging-port': '9222',
            'window-size': '1920x1080',
            'lang': 'ko_KR',
            'user-agent': ua.random
        }
        self.options = webdriver.ChromeOptions()
        for option, value in chrome_options.items():
            if value is not None:
                self.options.add_argument(f"{option}={value}")
            else:
                self.options.add_argument(option)

    def start(self):
        if os.path.exists(self.chrome_data_folder):
            shutil.rmtree(self.chrome_data_folder)
            print('기존 캐쉬 삭제 완료')
        debug_window = subprocess.Popen([
            "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
            f"--remote-debugging-port={self.debug_window_port}",
            f"--user-data-dir={self.chrome_data_folder}"])
        self.debug_window_pid = debug_window.pid
        self.driver = webdriver.Chrome(options=self.options)
        self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": """ Object.defineProperty(navigator, 'webdriver', { get: () => undefined }) """})
        return self

    def end(self):
        self.driver.quit()
        for proc in psutil.process_iter():
            if proc.pid == self.debug_window_pid and f"--remote-debugging-port={self.debug_window_port}" in proc.cmdline():
                proc.terminate()

print('setting chrome options done')

# 1. 스크랩 시작

In [None]:
search_query = input("검색 쿼리를 입력하세요 : ")
max_count = int(input("최대 스크랩 개수를 입력하세요(1 ~ 25개 추천) : "))
start_page = int(input("시작 페이지 번호를 입력하세요 : "))
print("검색 쿼리: ", search_query)
print("최대 스크랩 개수: ", max_count)
print("시작 페이지 번호: ", start_page)
print("스크랩 시작 준비 완료")

In [None]:
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import random
import time

results = []
scrapper = Scrapper()
driver = scrapper.start().driver

def get_sub_urls(start_url, by: By, selector: str):
    driver.get(start_url)
    wait = WebDriverWait(driver, 10)
    # 해당 페이지 제품 링크들
    wait.until(EC.presence_of_all_elements_located((by, selector)))
    sub_urls = [link_element.get_attribute('href') for link_element in driver.find_elements(by, selector)]
    print(f'found {len(sub_urls)} sub_urls')
    return sub_urls

def get_data_from(target_url):
    try:
        print()
        driver.get(target_url)
        wait = WebDriverWait(driver, 10)
        print(f'[start]  {len(results)+1}번째 상품 가져오는 중....')

        # 상품명
        wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".prod-buy-header__title")))
        item_name_element = driver.find_element(By.CSS_SELECTOR, ".prod-buy-header__title")
        item_name = item_name_element.text

        print(f'상품명 : {item_name}')

        # 상품 설명
        wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".prod-attr-item")))
        item_detail_elements = driver.find_elements(By.CSS_SELECTOR, ".prod-attr-item")
        item_details = [element.text for element in item_detail_elements]
        print('상품 설명 : ')
        for detail in item_details:
            print(f'- {detail}')

        # 상품 가격
        wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".total-price strong")))
        price_element = driver.find_element(By.CSS_SELECTOR, ".total-price strong")
        item_price = int(price_element.text.replace(",","").replace("원",""))
        print(f'가격 : {item_price}')

        # 상품 이미지 urls
        delay = random.uniform(5,10)
        time.sleep(delay)
        wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".lazy-load-img")))
        image_elements = driver.find_elements(By.CSS_SELECTOR, ".lazy-load-img")
        if len(image_elements) < 2:
            print('[failed] 이미지 개수 부족으로 실패')
            return
        image_urls = set([image.get_attribute('src').replace("48x48ex","492x492ex") for image in image_elements])
        print(f'이미지 개수 : {len(image_urls)}')
        for l, each_url in enumerate(image_urls):
            print(f' {l} : {each_url}')
            if each_url.startswith("data:image"):
                raise Exception("스크랩 한계치 초과, 재시작 필요")

    except TimeoutError as er:
        print(f'[failed] {len(results)+1}번째 상품 가져오기 실패!')
        print(f"An error occurred: {type(er)}")
        print()
        return

    result = {
        "item_name": item_name,
        "item_detail": "  \n".join(item_details),
        "price": item_price,
        "image_urls": image_urls
    }
    results.append(result)
    print(f'[success] {len(results)}개 완료')
    print()

# 링크 loop
try:
    print(f'검색 쿼리({search_query})로 스크랩 시작')
    page = start_page
    while len(results) < max_count:
        start_url = f'https://www.coupang.com/np/search?component=&q={search_query}&channel=user&page={page}'
        sub_urls = get_sub_urls(start_url, By.CSS_SELECTOR, ".search-product-link")
        for url in sub_urls:
            if len(results) >= max_count:
                break
            get_data_from(url)
        page += 1
    print("스크랩 완료")

except Exception as e:
    print("스크랩 실패")
    print(f"An error occurred: {type(e)}")

finally:
    scrapper.end()
    print("스크랩 종료")


# 2.1 DB로 저장 (Option A)

## DB 정보 입력

In [None]:
host = input("DB 호스트: ")
port = input("DB 포트: ")
user = input("DB 유저: ")
password = input("DB 비밀번호: ")
database = input("DB 이름: ")
print(f"DB 정보: {host}, {port}, {user}, {password}, {database}")
image_storage_path = input("저장 루트 경로(예:'C:\\Users\\shj92\\resources', 이미지는 'image\item'에 저장됨): ")
print(f"저장 경로: {image_storage_path}")
account = input("계정(email): ")
print(f"계정: {account}")
print("세팅 완료")

## 저장

In [None]:
import mysql.connector
import time
from datetime import datetime
import requests
from urllib.parse import urlparse
import uuid

# 데이터 가공해서 Item 테이블에 저장
for i, item in enumerate(results):
    conn = mysql.connector.connect(
        host=host,
        port=port,
        user=user,
        password=password,
        database=database
    )
    cursor = conn.cursor()
    try:
        # 변수 설정
        created_by = account
        created_at = datetime.now()
        item_name = item["item_name"]
        item_detail = item["item_detail"]
        item_status = "ON_SALE"
        price = item["price"]
        stock = 100
        sold_count = 0
        is_deleted = 0

        # 변수 DB 저장
        insert_query = "INSERT INTO item (" \
            "created_at, " \
            "created_by, " \
            "item_detail, " \
            "item_name, " \
            "item_status, " \
            "price, " \
            "stock, " \
            "sold_count, " \
            "is_deleted) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"

        cursor.execute(insert_query, (
            created_at,
            created_by,
            item_detail,
            item_name,
            item_status,
            price,
            stock,
            sold_count,
            is_deleted))

       # 마지막 ID 가져오기
        new_item_id = cursor.lastrowid
        print(f"상품 {i}번째 ID : {new_item_id} ")

        # 이미지 데이터 저장
        storage_path = image_storage_path

        for i, image_url in enumerate(item['image_urls']):

            # 변수 저장
            item_id = new_item_id
            image_type = "THUMBNAIL" if i == 0 else "PRODUCT"
            original_image_name = os.path.basename(urlparse(image_url).path)
            image_name = f"{str(uuid.uuid4())}_{original_image_name}"
            item_image_sub_path = 'image\\item'
            created_at = datetime.now()
            created_by = account

            # 이미지 스토리지 저장
            image_data = requests.get(image_url, headers={'User-Agent': ua.random}, timeout=10).content
            image_path = os.path.join(storage_path, item_image_sub_path, image_name)
            print(image_path)
            with open(image_path, 'wb') as f:
                f.write(image_data)

            # 변수 DB 저장
            insert_query = "INSERT INTO item_image (" \
                "item_id, " \
                "item_image_type, " \
                "original_filename, " \
                "filename, " \
                "sub_path, " \
                "created_at, " \
                "created_by) VALUES (%s, %s, %s, %s, %s, %s, %s)"
            print(insert_query)

            cursor.execute(insert_query, (
                item_id,
                image_type,
                original_image_name,
                image_name,
                item_image_sub_path,
                created_at,
                created_by))

            print(f"이미지 {i}번째 저장")
            time.sleep(random.randint(3, 7))

        conn.commit()
        print("commit")

    except Exception as ex:
        print(ex)
        conn.rollback()
        print("rollback the change, there was an error")

    cursor.close()
    conn.close()


# 2.2 CSV로 저장 (Option B)

## CSV 정보 입력

In [None]:
image_storage_path = input("이미지 저장 루트 경로(예:'C:\\Users\\shj92\\resources', 이미지는 'image\item'에 저장됨): ")
print(f"저장 경로: {image_storage_path}")
account = input("계정(email): ")
print(f"계정: {account}")
print("세팅 완료")

In [None]:
import pandas as pd
import time
import random
from fake_useragent import UserAgent

ua = UserAgent()
items_df = pd.DataFrame(columns=['created_at', 'created_by', 'item_detail', 'item_name', 'item_sell_status', 'price', 'stock', 'sold_count'])
images_df = pd.DataFrame(columns=['created_at', 'created_by', 'filename', 'root_path', 'original_filename', 'item_image_type', 'item_id'])

for i, item in enumerate(results):

    # 변수 설정
    created_by = account
    created_at = datetime.now()
    item_name = item["item_name"]
    item_detail = item["item_detail"]
    item_status = "ON_SALE"
    price = item["price"]
    stock = 100
    sold_count = 0
    is_deleted = 0

    items_df = items_df.append(pd.DataFrame({
        'created_at': [created_at],
        'created_by': [created_by],
        'item_detail': [item_detail],
        'item_name': [item_name],
        'item_sell_status': [item_status],
        'price': [price],
        'stock': [stock],
        'sold_count': [sold_count]}),
        ignore_index=True)

    # Get the data for the images table
    image_path = image_storage_path
    for j, image_url in enumerate(item['image_urls']):

        item_id = i
        image_type = "THUMBNAIL" if j == 0 else "PRODUCT"
        original_image_name = os.path.basename(urlparse(image_url).path)
        image_name = f"{str(uuid.uuid4())}_{original_image_name}"
        item_image_sub_path = 'image\\item'
        created_at = datetime.now()
        created_by = account
        success = False # 이미지 저장 성공 여부

        try:
            image_data = requests.get(image_url, headers={'User-Agent': ua.random}, timeout=10).content
            with open(os.path.join(image_path, image_name), 'wb') as f:
                f.write(image_data)
            success = True
        except requests.exceptions.RequestException as e:
            print(e)
            success = False

        images_df = images_df.append(pd.DataFrame({
            'created_at': [created_at],
            'created_by': [created_by],
            'filename': [image_name],
            'sub_path': [item_image_sub_path],
            'original_filename': [original_image_name],
            'item_image_type': [image_type],
            'item_id': [],
            'success': [success],
            'image_url': [image_url]
        }), ignore_index=True)
        time.sleep(random.randint(1, 5))

items_df.to_excel('items.xlsx', index=False)
images_df.to_excel('images.xlsx', index=False)
print("Data saved to excel successfully.")
