In [None]:

from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver import Keys, ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from bs4 import BeautifulSoup as bs
from selenium.webdriver.chrome.options import Options
from kafka import KafkaConsumer, KafkaProducer
from collections import deque
import json

import re
import random
import time
import concurrent.futures
from datetime import datetime
ARCA_LIVE_LINK = "https://arca.live/b/hotdeal"
RULI_WEB_LINK = "https://bbs.ruliweb.com/market/board/1020?view=default"
PPOM_PPU_LINK = "https://www.ppomppu.co.kr/zboard/zboard.php?id=ppomppu"
QUASAR_ZONE_LINK = "https://quasarzone.com/bbs/qb_saleinfo"
FM_KOREA_LINK = "https://www.fmkorea.com/hotdeal"

producer = KafkaProducer(
    acks=0, # 메시지 전송 완료에 대한 체크
    compression_type='gzip', # 메시지 전달할 때 압축(None, gzip, snappy, lz4 등)
    bootstrap_servers=['localhost:29092', 'localhost:39092', 'localhost:49092'], # 전달하고자 하는 카프카 브로커의 주소 리스트
    value_serializer=lambda x:json.dumps(x, default=str).encode('utf-8') # 메시지의 값 직렬화
)

consumer = KafkaConsumer(
    'test', # 토픽명
    bootstrap_servers=['localhost:29092', 'localhost:39092', 'localhost:49092'], # 카프카 브로커 주소 리스트
    auto_offset_reset='earliest', # 오프셋 위치(earliest:가장 처음, latest: 가장 최근)
    enable_auto_commit=True, # 오프셋 자동 커밋 여부
    group_id='test-group', # 컨슈머 그룹 식별자
    value_deserializer=lambda x: json.loads(x.decode('utf-8')), # 메시지의 값 역직렬화
    consumer_timeout_ms=10000 # 데이터를 기다리는 최대 시간
)

class PAGES:
    def __init__(self):
        self.refresh_delay = 60 # sec
        self.item_link_queue = deque()
        self.previous_items_queue = deque()
        
    def set_drvier(self, site_name):
        chrome_options = Options()
        chrome_options.add_argument("--headless")
        chrome_options.add_argument('--blink-settings=imagesEnabled=false')
        chrome_options.add_argument('--block-new-web-contents')
        driver = webdriver.Chrome(options = chrome_options)
        driver.implicitly_wait(10)
        driver.get(site_name)
        return driver
    
    def pub_hot_deal_page(self, page, item_link):
        producer.send(topic = 'test', headers = [("page", page.encode("utf-8"))], value=item_link)
        producer.flush()

    
    def error_logging(self, e: Exception, error_type, **kwargs):
        error_log = {"error_log": e.__str__(), "time": time.ctime(), "error_type": error_type}
        if kwargs:
            for k, v in kwargs:
                error_log[k] = v
        # db.error_log.insert_one(error_log)
        print(error_log)
        
class ARCA_LIVE(PAGES): # shopping_mall_link, shopping_mall, item_name, price, delivery, content, comment
    def __init__(self):
        super().__init__()
        self.hot_deal_page = ARCA_LIVE_LINK
    
    def get_item_links(self):
        get_item_driver = self.set_drvier(self.hot_deal_page)
        while True:
            for i in range(4, 49):
                try:
                    item = get_item_driver.find_element(By.CSS_SELECTOR, f"body > div.root-container > div.content-wrapper.clearfix > article > div > div.article-list > div.list-table.hybrid > div:nth-child({i}) > div > div > span.vcol.col-title > a")
                    item_link = item.get_attribute("href")
                    print(i, item_link)
                except Exception as e:
                    self.error_logging(e, f"fail get item links {self.__class__}")
                    
                if item_link not in self.previous_items_queue:
                    self.item_link_queue.append((item_link, 0))
                    self.previous_items_queue.appendleft(item_link)
                    if len(self.previous_items_queue) > 100:
                        self.previous_items_queue.pop()
                else:
                    pass
            time.sleep(30)
            get_item_driver.refresh()
            
    def crawling(self):
        driver = self.set_drvier(self.hot_deal_page)
        while True:
            try:
                item_link, retry_attempt = self.item_link_queue.popleft()
                print(item_link, retry_attempt)
            except:
                print("Empty Queue")
                break
            driver.get(item_link)
            time.sleep(5)
            try: # 신고 처리, 보안 검사 등
                table = driver.find_element(By.TAG_NAME, "table")
                rows = table.find_elements(By.TAG_NAME, "tr")
                details = [row.text for row in rows]
                shopping_mall_link, shopping_mall, item_name, price, delivery = list(map(lambda x: "".join(x.split()[1:]), details))
                content = driver.find_element(By.CSS_SELECTOR, "body > div.root-container > div.content-wrapper.clearfix > article > div > div.article-wrapper > div.article-body > div.fr-view.article-content").text
                comment_box = driver.find_element(By.CSS_SELECTOR, "#comment > div.list-area")
                comment = list(map(lambda x: x.text, comment_box.find_elements(By.CLASS_NAME, "text")))
            except Exception as e:
                if retry_attempt >= 3:
                    self.error_logging(e, f"fail crawling {self.__class__}", item_link = item_link)
                else:
                    self.item_link_queue.append((item_link, retry_attempt + 1))
                continue
            
            self.insert_to_db(item_link = item_link, shopping_mall_link = shopping_mall_link, shopping_mall = shopping_mall, price = price, item_name = item_name, delivery = delivery, content = content, comment = comment)

# shopping_mall_link가 누락된 채로 게시글이 올라옴
class RULI_WEB(PAGES): # shopping_mall_link, item_name, content, comment
    def __init__(self):
        super().__init__()
        self.hot_deal_page = RULI_WEB_LINK
    
    def get_item_links(self):
        get_item_driver = self.set_drvier(self.hot_deal_page)
        while True:
            item_table = get_item_driver.find_elements(By.CSS_SELECTOR, "#board_list > div > div.board_main.theme_default.theme_white.theme_white > table > tbody > tr")
            for i, item in enumerate(item_table):
                try:
                    if item.get_attribute("class") == "table_body blocktarget":
                        item_link = item.find_element(By.CSS_SELECTOR, "td.subject > div > a.deco").get_attribute("href")
                        print(i, item_link)
                    else: # 공지, best 핫딜 등
                        continue
                    
                    # if item_link not in self.previous_items_queue:
                    #     self.item_link_queue.append((item_link, 0))
                    #     self.previous_items_queue.appendleft(item_link)
                    #     if len(self.previous_items_queue) > 100:
                    #         self.previous_items_queue.pop()
                    # else:
                    #     pass
                except Exception as e:
                    self.error_logging(e, f"fail get item links {self.__class__}")
                    
            time.sleep(30)
            get_item_driver.refresh()
            
        
    def crawling(self):
        driver = self.set_drvier(self.hot_deal_page)
        while True:
            try:
                item_link, retry_attempt = self.item_link_queue.popleft()
                print(item_link, retry_attempt)
            except:
                print("Empty Queue")
                break
            driver.get(item_link)
            time.sleep(5)
            try: # 신고 처리, 보안 검사 등
                item_name = driver.find_element(By.CSS_SELECTOR, "#board_read > div > div.board_main > div.board_main_top > div.user_view > div:nth-child(1) > div > h4 > span > span.subject_inner_text").text
                shopping_mall_link = driver.find_element(By.CSS_SELECTOR, "#board_read > div > div.board_main > div.board_main_view > div.row.relative > div > div.source_url.box_line_with_shadow > a").text
                content = driver.find_element(By.TAG_NAME, "article").text
                comment = list(map(lambda x: x.text, driver.find_elements(By.CLASS_NAME, "comment")))
            except Exception as e:
                if retry_attempt >= 3:
                    self.error_logging(e, f"fail crawling {self.__class__}", item_link = item_link)
                else:
                    self.item_link_queue.append((item_link, retry_attempt + 1))
                continue
            
            self.insert_to_db(item_link = item_link, shopping_mall_link = shopping_mall_link, item_name = item_name, content = content, comment = comment)
        
class FM_KOREA(PAGES): # shopping_mall_link, shopping_mall, item_name, price, delivery, content, comment
    def __init__(self):
        super().__init__()
        self.hot_deal_page = FM_KOREA_LINK
    
    def get_item_links(self):
        get_item_driver = self.set_drvier(self.hot_deal_page)
        while True:
            for i in range(1, 21):
                try:
                    item = get_item_driver.find_element(By.CSS_SELECTOR, f"#bd_1196365581_0 > div > div.fm_best_widget._bd_pc > ul > li:nth-child({i}) > div > h3 > a")
                    item_link = item.get_attribute("href")
                    print(i, item_link)
                except Exception as e:
                    self.error_logging(e, f"fail get item links {self.__class__}")
                    
                if item_link not in self.previous_items_queue:
                    self.item_link_queue.append((item_link, 0))
                    self.previous_items_queue.appendleft(item_link)
                    if len(self.previous_items_queue) > 100:
                        self.previous_items_queue.pop()
                else:
                    pass
            time.sleep(30)
            get_item_driver.refresh()            
            
    def crawling(self):
        driver = self.set_drvier(self.hot_deal_page)
        
        while True:
            try:
                item_link, retry_attempt = self.item_link_queue.popleft()
                print(item_link, retry_attempt)
            except:
                print("Empty Queue")
                break
            driver.get(item_link)
            time.sleep(5)
            try: # 신고 처리, 보안 검사 등
                details = driver.find_elements(By.CLASS_NAME, "xe_content")
                shopping_mall_link, shopping_mall, item_name, price, delivery, content, *comment = details
                shopping_mall_link, shopping_mall, item_name, price, delivery, content = map(lambda x: x.text, (shopping_mall_link, shopping_mall, item_name, price, delivery, content))
                comment = list(map(lambda x: x.text, comment))
            except Exception as e:
                if retry_attempt >= 3:
                    self.error_logging(e, f"fail crawling {self.__class__}", item_link = item_link)
                else:
                    self.item_link_queue.append((item_link, retry_attempt + 1))
                continue
            
            self.insert_to_db(item_link = item_link, shopping_mall_link = shopping_mall_link, shopping_mall = shopping_mall, item_name = item_name, price = price, delivery = delivery, content = content, comment = comment)
            
            
def super_crawling():
    while True:
        messages = page_consumer.poll(timeout_ms=1000)
        
        if messages:
            for topic_partition, records in messages.items():
                for record in records:
                    print(record)
        else:
            print("123123")
        
        time.sleep(1)
        
class QUASAR_ZONE(PAGES): # shopping_mall_link, shopping_mall, item_name, price, delivery, content, comment
    def __init__(self):
        super().__init__()
        self.hot_deal_page = QUASAR_ZONE_LINK
        
    def get_item_links(self):
        print("get_item_links")
        get_item_driver = self.set_drvier(self.hot_deal_page)
        while True:
            for i in range(1, 31):
                try:
                    item = get_item_driver.find_element(By.CSS_SELECTOR, f"#frmSearch > div > div.list-board-wrap > div.market-type-list.market-info-type-list.relative > table > tbody > tr:nth-child({i}) > td:nth-child(2) > div > div.market-info-list-cont > p > a")
                    item_link = item.get_attribute("href")
                    print(i, item_link)
                except Exception as e:
                    self.error_logging(e, f"fail get item links {self.__class__}")
                    
                if item_link not in self.previous_items_queue:
                    self.pub_hot_deal_page(page = self.hot_deal_page, item_link = item_link)
                    self.item_link_queue.append((item_link, 0))
                    self.previous_items_queue.appendleft(item_link)
                    if len(self.previous_items_queue) > 1000:
                        self.previous_items_queue.pop()
                else:
                    pass
                
            time.sleep(30)
            get_item_driver.refresh()
        
    def crawling(self):
        print("crawling")
        driver = self.set_drvier(self.hot_deal_page)
        while True:
            try:
                item_link, retry_attempt = self.item_link_queue.popleft()
                print(item_link, retry_attempt)
            except:
                print("Empty Queue")
                time.sleep(60)
                continue
            driver.get(item_link)
            
            try: # 신고 처리, 보안 검사 등
                item_name = driver.find_element(By.CSS_SELECTOR, "#content > div > div.sub-content-wrap > div.left-con-wrap > div.common-view-wrap.market-info-view-wrap > div > dl > dt > div:nth-child(1) > h1").text.split()[2:]
                item_name = " ".join(item_name)
                table = driver.find_element(By.TAG_NAME, "table")
                rows = table.find_elements(By.TAG_NAME, "tr")
                content = driver.find_element(By.CSS_SELECTOR, "#new_contents").text
                comment = list(map(lambda x: x.text, driver.find_elements(By.CSS_SELECTOR, "#content > div.sub-content-wrap > div.left-con-wrap > div.reply-wrap > div.reply-area > div.reply-list")))
            except Exception as e:
                if retry_attempt >= 3:
                    self.error_logging(e, f"fail crawling {self.__class__}", item_link = item_link)
                else:
                    self.item_link_queue.append((item_link, retry_attempt + 1))
                continue
            
            details = [row.text for row in rows]
            shopping_mall_link, shopping_mall, price, delivery, *_ = list(map(lambda x: "".join(x.split()[1:]), details))
            self.insert_to_db(item_link = item_link, shopping_mall_link = shopping_mall_link, shopping_mall = shopping_mall, item_name = item_name, price = price, delivery = delivery, content = content, comment = comment)

# shopping_mall이 tag되지 않은 채로 올라옴
class PPOM_PPU(PAGES):
    def __init__(self):
        super().__init__()
        self.hot_deal_page = PPOM_PPU_LINK
        
    def get_item_links(self):
        get_item_driver = self.set_drvier(self.hot_deal_page)
        while True:
            for i in range(9, 34):#revolution_main_table > tbody > tr:nth-child(33)
                try:#revolution_main_table > tbody > tr:nth-child(9)
                    item = get_item_driver.find_element(By.CSS_SELECTOR, f"#revolution_main_table > tbody > tr:nth-child({i}) > td.baseList-space.title > div > div > a")
                    item_link = item.get_attribute("href")
                    print(i - 8, item_link)
                except Exception as e:
                    self.error_logging(e, f"fail get item links {self.__class__}")
                    break
                # if item_link not in self.previous_items_queue:
                #     self.item_link_queue.append((item_link, 0))
                #     self.previous_items_queue.appendleft(item_link)
                #     if len(self.previous_items_queue) > 100:
                #         self.previous_items_queue.pop()
                # else:
                #     pass
            time.sleep(30)
            get_item_driver.refresh()
            
    def crawling(self):
        driver = self.set_drvier(self.hot_deal_page)
        
        while True:
            try:
                item_link, retry_attempt = self.item_link_queue.popleft()
                print(item_link, retry_attempt)
            except:
                print("Empty Queue")
                break
            driver.get(item_link)
            time.sleep(5)
            try: # 신고 처리, 보안 검사 등
                # item_name = driver.find_element(By.CSS_SELECTOR, "body > div.wrapper > div.contents > div.container > div > table:nth-child(9) > tbody > tr:nth-child(3) > td > table > tbody > tr > td:nth-child(5) > div > div.sub-top-text-box > font.view_title2").text
                # content = driver.find_element(By.CSS_SELECTOR, "body > div.wrapper > div.contents > div.container > div > table:nth-child(15) > tbody > tr:nth-child(1) > td > table > tbody > tr > td").text
                # comments = driver.find_element(By.ID, "quote").text
                # shopping_mall_link = driver.find_element(By.CSS_SELECTOR, "body > div.wrapper > div.contents > div.container > div > table:nth-child(9) > tbody > tr:nth-child(3) > td > table > tbody > tr > td:nth-child(5) > div > div.sub-top-text-box > div > a").get_attribute("href")
                # shopping_mall = driver.find_element(By.CSS_SELECTOR, "body > div.wrapper > div.contents > div.container > div > table:nth-child(9) > tbody > tr:nth-child(3) > td > table > tbody > tr > td:nth-child(5) > div > div.sub-top-text-box > font.view_title2 > span").text
                item_name = driver.find_element(By.CSS_SELECTOR, "#topTitle > h1").text
                content = driver.find_element(By.CSS_SELECTOR, "body > div.wrapper > div.contents > div.container > div > table:nth-child(14) > tbody > tr:nth-child(1) > td > table > tbody > tr > td").text
                comments = driver.find_element(By.ID, "quote").text
                shopping_mall_link = driver.find_element(By.CSS_SELECTOR, "#topTitle > div > ul > li.topTitle-link > a").get_attribute("href")
                shopping_mall = driver.find_element(By.CSS_SELECTOR, "#topTitle > h1 > span.subject_preface.type2").text
                print(item_name, content, comments, shopping_mall, shopping_mall_link)
            except Exception as e:
                if retry_attempt >= 3:
                    self.error_logging(e, f"fail crawling {self.__class__}", item_link = item_link)
                else:
                    self.item_link_queue.append((item_link, retry_attempt + 1))
                continue
                
            self.insert_to_db(item_link = item_link, item_name = item_name, content = content, comments = comments, shopping_mall = shopping_mall, shopping_mall_link = shopping_mall_link)


In [None]:
quasar_zone = QUASAR_ZONE()
ppom_ppu = PPOM_PPU()
fm_korea = FM_KOREA()
ruli_web = RULI_WEB()
arca_live = ARCA_LIVE()


In [None]:
quasar_zone.get_item_links()

In [None]:
from concurrent.futures import ThreadPoolExecutor

# ThreadPoolExecutor 사용
with ThreadPoolExecutor(max_workers=4) as executor:
    executor.submit(quasar_zone.get_item_links)
    # executor.submit(quasar_zone.crawling)
    executor.submit(super_crawling)
print("Both functions have completed.")
