In [None]:
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
import json
import undetected_chromedriver as uc
import time
from random import randint
import dataclasses as dc
from datetime import datetime
import datetime

## General functions

Here we define a general function to scroll pages and detect type of article.

In [None]:
@dc.dataclass
class WebPage:
    website: str = None
    url: str = None
    link: str = None
    title: str = None
    media_type: str = None
    date: str =None
    content: str = None
   
class Scraper:
        
    def scroll_method(self, driver):
        try:
            last_height = 0
            while True:
                driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                time.sleep(1)
                new_height = driver.execute_script("return document.body.scrollHeight")
                if new_height == last_height:
                    break
                else:
                    last_height = new_height
        except Exception as e:
            print("Error:", e)

    def write_to_jsonl(self, result: list[WebPage] | WebPage | Exception, filename: str):
        """Writes the result of the scraper function to a jsonl file. 
        If result is a list, each element is written to the file. 
        If result is a dict, it is written as a single line.
        """
        if result:
            with open(f'output/{filename}.jsonl', 'a') as f:        
                if isinstance(result, list):
                    result = [dc.asdict(r) for r in result]
                    [f.write(json.dumps(r, ensure_ascii=False) + '\n') for r in result]

                elif isinstance(result, WebPage):
                    f.write(json.dumps(dc.asdict(result), ensure_ascii=False) + '\n')
                    f.close()
        
                elif isinstance(result, Exception):
                    with open(f'logs/{filename}_error.jsonl', 'a') as f:
                        f.write(json.dumps({'error': str(result)}, ensure_ascii=False) + '\n')
                        f.close()

        else:
            with open(f'logs/{filename}_captcha.jsonl', 'a') as f:
                if isinstance(result, list):
                    for r in result:
                        f.write(r + '\n')
                    f.close()
                else:    
                    f.write(result + '\n')
                    f.close()

    def run(self, scrape_object: WebPage, scraper_function, filename: str):
        # This function is used to run the scraper
        self.driver  = uc.Chrome(headless=False,use_subprocess=False)
        self.driver.get(scrape_object.link)
        self.scroll_method(self.driver)
        result =  scraper_function(self.driver, scrape_object)
        self.write_to_jsonl(result, filename)
        self.driver.quit()


class Database:
    def check_if_exists(link: str) -> bool:
        # This function is used to check if the link already exists in the database
        with open('output/israeli_times_links.jsonl', 'r') as f:
            for line in f:
                if json.loads(line)['link'] == link:
                    return True
        return False    


class IsraeliTimesScraper():
    def __init__(self):
        self.scraper = Scraper()

    def detect_type_article(self, link: str) -> str:
        # This function is used to detect the type of article
        if 'liveblog' in link:
            return 'liveblog'
        elif 'blogs.timesofisrael.com' in link:
            return 'blog'
        elif 'https://jewishchronicle.timesofisrael.com/' in link:
            return 'jewishchronicle'
        else:
            return 'article'

    def collect_page_titles(self, driver, _: WebPage) -> list[WebPage]:
        result = []
        h = driver.find_elements(By.XPATH, '//div[@class="headline"]/a')
        already_scraped_count =  0
        for link in h:
            href = link.get_attribute('href')
            if Database.check_if_exists(href):
                already_scraped_count += 1
                continue

            type_of_article = self.detect_type_article(href)

            result.append(
                WebPage(
                website='timesofisrael',
                url=driver.current_url,
                date=None,
                title=link.text,
                link=href,
                media_type=type_of_article,
                content=None
            )
            )
        unique_domains = []
        unique = set()
        for r in result:
            if r.link not in unique:
                unique.add(r.link)
                unique_domains.append(r)
        print("Already scraped:", already_scraped_count)
        
        if len(result) == 0:
            return Exception('Already scraped all articles on this page')
        
        print('Collecting:', len(result), 'articles from the page')

        return unique_domains

    def scrape_article(self, driver, scrape_object: WebPage) -> WebPage:
        """Scrapes title, content, date from the article page."""
        try:
            title = driver.find_element(By.XPATH, '//h1[@class="headline"]').text
            content = driver.find_element(By.XPATH, '//div[@class="the-content"]').text
            date = driver.find_element(By.XPATH, '//span[@class="date"]').text
            article = WebPage(
                website='timesofisrael',
                title =title,
                date=date,
                link=scrape_object.link,
                media_type=scrape_object.media_type,
                content=content
            )

            if "Today" in article.date:
                new_date = datetime.date.today().strftime("%Y-%m-%d")
                article.date = new_date

            return article
        
        except Exception as e:
            return f"'Error': {e}, 'title', {scrape_object.title}, 'link', {scrape_object.link} \n"

    def collect_liveblog(self, driver, scrape_object: WebPage) -> list[WebPage]:
        """Scrapes title, content, date from the liveblog page."""
        try:
            title = driver.find_elements(By.XPATH, '//div[@class="liveblog-paragraph"]//h4')
            content = driver.find_elements(By.XPATH, '//div[@class="liveblog-paragraph"]//p')
            href = driver.find_elements(By.XPATH, '//div[@class="liveblog-paragraph"]//h4//a')
            dates = driver.find_elements(By.XPATH, '//div[@class="liveblog-date"]//a//span')
            result = []
            print(
                "title:", len(title), 
                "content:", len(content), 
                "link:", len(href), 
                "date:", len(dates)
                )

            for t, i, h, d in zip(title, content, href, dates): 
                
                # Convert epoch in timestamp to datetime
                title = t.text
                content = ''.join(i.text)
                href = h
                timestamp = int(d.get_attribute('data-timestamp'))
                dt_object = datetime.datetime.utcfromtimestamp(timestamp)
                epoch = dt_object.strftime('%Y-%m-%d %H:%M:%S')

            
                result.append(WebPage(
                    website='timesofisrael',
                    url=driver.current_url,
                    title =title,
                    date=epoch,
                    link=href.get_attribute('href'),
                    media_type='liveblog',
                    content=content
                ))
            
            return result
        except Exception as e:
            print("Error:", e)
            return f"'Error': {e}, 'title', {scrape_object.title}, 'link', {scrape_object.link} \n"

    def collect_blogs(self, driver, scrape_object: WebPage) -> WebPage:
        """Scrapes title, content, date from the blog page."""
        try:
            title = driver.find_element(By.XPATH, '//h1[@class="headline"]').text
            content = driver.find_element(By.XPATH, '//div[@class="article-content"]').text
            date = driver.find_element(By.XPATH, '//aside[@class="block cols1"]//div[@class="date"]').text
            article = WebPage(
                website='timesofisrael',
                title =title,
                date=date,
                link=scrape_object.link,
                media_type=scrape_object.media_type,
                content=content
            )  
            return article
            
        except Exception as e:
            return f"'Error': {e}, 'title', {scrape_object.title}, 'link', {scrape_object.link} \n"



    def run(self):
        homepage = WebPage(link='https://www.timesofisrael.com/', media_type='homepage')
        self.scraper.run(homepage, self.collect_page_titles, 'israeli_times_links')
        with open('output/israeli_times_links.jsonl', 'r') as f:
            for line in f:
                page = json.loads(line)
                page = WebPage(link=page['link'], media_type=page['media_type'])
                if page.media_type == 'article':
                    self.scraper.run(page, self.scrape_article, 'data')
                elif page.media_type == 'liveblog':
                    self.scraper.run(page, self.collect_liveblog, 'data')
                elif page.media_type == 'blog':
                    self.scraper.run(page, self.collect_blogs, 'data')
                
                
                time.sleep(randint(1, 3))



IsraeliTimesScraper().run()




In [16]:
def collect_blogs(driver, scrape_object: WebPage) -> WebPage:
    """Scrapes title, content, date from the blog page."""
    try:
        title = driver.find_element(By.XPATH, '//h1[@class="headline"]').text
        content = driver.find_element(By.XPATH, '//div[@class="article-content"]').text
        date = driver.find_element(By.XPATH, '//aside[@class="block cols1"]//div[@class="date"]').text
        article = WebPage(
            website='timesofisrael',
            title =title,
            date=date,
            link=scrape_object.link,
            media_type=scrape_object.media_type,
            content=content
        )  
        return article
        
    except Exception as e:
        return f"'Error': {e}, 'title', {scrape_object.title}, 'link', {scrape_object.link} \n"


blog = WebPage(link='https://blogs.timesofisrael.com/when-non-jews-help-jews/', media_type='blog')
IsraeliTimesScraper().scraper.run(blog, collect_blogs, 'blog_data')

In [13]:
print(x)

None
