In [1]:
import time
import os

In [2]:
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options

In [4]:
class TechCrunchScraper:
    MODE = 'normal' # or headless
    BLOCK_IMAGES = True
    BLOCK_JS = False
    DRIVER_PATH = '/Users/hadi/Documents/workspace/daneshkar/week 11 (project sraping)/shared/selenium/chromedriver'
    WEBSITE_MAIN_PAGE = 'https://techcrunch.com'
    HTML_FOLDER_PATH = '/Users/hadi/Documents/workspace/daneshkar/week 11 (project sraping)/scraped_html'

    def __init__(self, mode='normal', block_images=True, block_js=False) -> None:
        self.MODE = mode
        self.BLOCK_IMAGES = block_images
        self.BLOCK_JS = block_js

        self.driver = None
        self.service = None
        self.all_categories_list = None

    def run_driver(self, page_load_timeout=10):
        if self.driver is not None:
            return
        if self.MODE == 'headless':
            options = webdriver.ChromeOptions()
            options.headless = True
            options.add_argument("--window-size=1920,1200")
        elif self.MODE == 'normal':
            options = webdriver.ChromeOptions()

        if self.BLOCK_IMAGES or self.BLOCK_JS:
            ### This blocks images and javascript requests
            block_dict = {}
            if self.BLOCK_IMAGES:
                block_dict['images'] = 2
            if self.BLOCK_JS:
                block_dict['javascript'] = 2
            chrome_prefs = {
                "profile.default_content_setting_values": block_dict
            }
            options.experimental_options["prefs"] = chrome_prefs

        self.service = Service(executable_path=self.DRIVER_PATH)
        self.driver = webdriver.Chrome(service=self.service, options=options)
        self.driver.set_page_load_timeout(page_load_timeout)
        return self.driver
    
    def open_link_in_driver(self, link, try_again_if_timeout=True):
        if self.driver is None:
            self.run_driver()
        while True:
            try:
                self.driver.get(link)
            except TimeoutException:
                if try_again_if_timeout:
                    continue
                else:
                    break

    def scroll_to_bottom(self):
        javaScript = "window.scrollBy(0, 100000);"
        self.driver.execute_script(javaScript)

    def get_list_of_all_categories(self):
        if self.all_categories_list is not None:
            return self.all_categories_list
        
        self.open_link_in_driver(self.WEBSITE_MAIN_PAGE)

        categories_links_path = '//header[contains(@class, "site-navigation")]//ul[contains(@class, "menu")]/li[@class="menu__item"]/a'
        categories_links = self.driver.find_elements(By.XPATH, categories_links_path)
        main_categories_list = [(link.get_attribute('href'), link.text) for link in categories_links if '/category/' in link.get_attribute('href')]

        more_categories_btn = '//header[contains(@class, "site-navigation")]//ul[contains(@class, "menu")]/li[@class="menu__item more-link"]/a'
        more_btn = self.driver.find_element(By.XPATH, more_categories_btn)
        more_btn.click()

        more_categories_links_path = '//header[contains(@class, "site-navigation")]//div[@class="desktop-nav navigation-desktop__flyout"]//li[@class="menu__item"]/a'
        more_categories_links = self.driver.find_elements(By.XPATH, more_categories_links_path)
        more_categories_list = [(link.get_attribute('href'), link.text) for link in more_categories_links if '/category/' in link.get_attribute('href')]

        all_categories_list = main_categories_list + more_categories_list
        self.all_categories_list = all_categories_list
        return self.all_categories_list
    
    def get_article_data_from_html(self, article_header):
        def get_article_header_type():
            try:
                article_category = article_header.find_element(By.XPATH, './div[@class="article__primary-category"]/a')
                return {
                    'type': types['article_category'], 
                    'text': article_category.text, 
                    'href': article_category.get_attribute('href'),
                }
            except:
                try:
                    article_label = article_header.find_element(By.XPATH, './div[@class="featured-article__label"]/div[contains(@class, "featured-article__label__text")]')
                    return {
                        'type': types['article_label'], 
                        'text': article_label.text, 
                        'href': article_label.get_attribute('href'),
                    }
                except:
                    article_event_title = article_header.find_element(By.XPATH, './h3[@class="article__event-title"]/a')
                    return {
                        'type': types['article_event'], 
                        'text': article_event_title.text, 
                        'href': article_event_title.get_attribute('href'),
                    }
                
        types = {'article_category': 'Category', 'article_label': 'Label', 'article_event': 'Event'}
        title = article_header.find_element(By.XPATH, './h2[@class="post-block__title"]').text
        # //div[contains(@class, "river")]/div//article/header//div[@class="post-block__meta"]//span[@class="river-byline__authors"]//a
        author_name_el = article_header.find_element(By.XPATH, './/div[@class="post-block__meta"]//span[@class="river-byline__authors"]//a')
        author_name, author_link = author_name_el.text, author_name_el.get_attribute('href')
        # //div[contains(@class, "river")]/div//article/header//div[@class="post-block__meta"]//span[@class="river-byline__full-date-time__wrapper"]//time
        date_and_time = article_header.find_element(By.XPATH, './/div[@class="post-block__meta"]//div[@class="river-byline__full-date-time__wrapper"]//time').get_attribute('datetime')
        article_canonical_link = article_header.find_element(By.XPATH, './h2[@class="post-block__title"]/a').get_attribute('href')
        return {
            'title': title,
            'article_link': article_canonical_link,
            'header': get_article_header_type(),
            'author_name': author_name,
            'author_link': author_link,
            'date_and_time': date_and_time,
        }
    
    def scrape_new_articles_of_category_link(self, category_page_link, already_scraped_articles_num=0):
        if already_scraped_articles_num == 0:
            self.open_link_in_driver(category_page_link)
        else:
            self.scroll_to_bottom()
        category_river_div_path = '//div[contains(@class, "river")]/div'
        river_div = self.driver.find_element(By.XPATH, category_river_div_path)
        articles_elements = river_div.find_elements(By.XPATH, '//article')
        articles_elements = articles_elements[already_scraped_articles_num:]
        for article in articles_elements:
            article_header = article.find_element(By.XPATH, './header')
            self.save_article_data_in_database(self.get_article_data_from_html(article_header))

    def click_load_more_in_category_page(self, wait_after=10, try_until_success=True):
        while True:
            try:
                time.sleep(wait_after)
                load_more_btn_xpath = '//*[@id="tc-main-content"]//button[contains(@class, "load-more")]'
                load_more_btn = self.driver.find_element(By.XPATH, load_more_btn_xpath)
                load_more_btn.click()
                return
            except:
                if try_until_success:
                    continue
                break

    def get_number_of_current_articles_in_page(self):
        category_river_div_path = '//div[contains(@class, "river")]/div'
        river_div = self.driver.find_element(By.XPATH, category_river_div_path)
        articles_elements = river_div.find_elements(By.XPATH, '//article')
        return len(articles_elements)

    def scrape_category_scroll_down(self, category_link):
        already_scraped_articles_num = 0
        while True:
            self.scrape_new_articles_of_category_link(category_link, already_scraped_articles_num)
            already_scraped_articles_num = self.get_number_of_current_articles_in_page()
            print(already_scraped_articles_num)
            self.scroll_to_bottom()
            self.click_load_more_in_category_page(try_until_success=True)

    def save_article_data_in_database(self, article_data):
        # {
        #     'title': title,
        #     'article_link': article_canonical_link,
        #     'header': get_article_header_type(),get_number_of_current_articles_in_page
        #     'author_name': author_name,
        #     'author_link': author_link,
        #     'date_and_time': date_and_time,
        # }
        from app_database import save_article_data_to_database
        save_article_data_to_database(
            article_data['article_link'],
            article_data['author_name'],
            article_data['author_link'],
            article_data['header']['href'],
            article_data['title'],
        )