In [1]:
import os
import requests
import json
from hashlib import sha256
from urllib.parse import urlparse, quote
from bs4 import BeautifulSoup
from datetime import datetime
from time import sleep

from langchain_core.documents import Document
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage

from src.prompts import IMAGE_EXTRACTOR_PROMPT

from dotenv import load_dotenv
load_dotenv()

## Utils

### Image extractor

In [2]:
class InvalidAPIKey(Exception):
    message = 'Invalid API Key'
    def __init__(self):
        super().__init__(self.message)
        

class InvalidURL(Exception):
    message = 'Invalid URL'
    def __init__(self):
        super().__init__(self.message)

In [3]:
class GeminiImageExtractor:
    prompt = IMAGE_EXTRACTOR_PROMPT
    model_list = ['gemini-1.5-flash-latest',
                  'gemini-1.5-pro-latest']
    def __init__(self, model_name='gemini-1.5-pro-latest'):
        self.api = self._get_api_()
        self.chat_model = ChatGoogleGenerativeAI(model=model_name, google_api_key=self.api)
        self.text_content = {
            "type": "text",
            "text": self.prompt
        }
        
    def extract(self, image_path, sleep_time=0):
        image_content = {
            "type": "image_url",
            "image_url": image_path
        }
        message = HumanMessage(content=[self.text_content, image_content])
        
        result = self.chat_model.invoke([message])
        
        sleep(sleep_time)
        return result.content
    
    @staticmethod
    def _get_api_(self):
        key = os.getenv('GOOGLE_API_KEY')
        if not key:
            raise InvalidAPIKey
        
        return key

### Helper functions

In [4]:
def write_json(path, items):
    with open(path, 'w') as file:
        json.dump(items, file, indent=4)

In [5]:
def is_subdirectory(href):
    parsed = urlparse(href)
    # Check if the scheme and netloc are present
    return not (bool(parsed.scheme) and bool(parsed.netloc))

In [6]:
def is_attachment(url):
    # TODO: generalize to various file types
    return url.endswith('.pdf')

In [None]:
def get_web_soup(url):
    response = requests.get(url, headers=REQUEST_HEADER)
    if response.status_code == 200:
        soup = BeautifulSoup(response.content, 'html.parser')
        return soup

    return None

In [None]:
def preprocess_soup(soup):
    # get the main content
    main_content = soup.find('section', {'id': 'sp-main-body'})

    # decompose navigate elements (next or previous page navigators)
    navigator_element = main_content.find('ul', {'class': 'pager pagenav'})
    if navigator_element:
        navigator_element.decompose()

    # decompose hit numbers
    for element in soup.find_all('span', {'class': 'mod-articles-category-hits'}):
        element.decompose()

    return main_content

In [None]:
def website_is_updated(url, hash_value):
    soup = get_web_soup(url)
    if soup:
        main_soup = preprocess_soup(soup)
        return sha256(main_soup.encode()).hexdigest() != hash_value

    raise Exception('Can not connect to destination URL')

In [None]:
def encode_url(url):
    if is_subdirectory(url):
        raise InvalidURL

    parsed_url = urlparse(url)
    subdirectory = parsed_url.path
    encoded_subdirectory = quote(subdirectory)
    encoded_url = f'{parsed_url.scheme}://{parsed_url.netloc}{encoded_subdirectory}'

    return encoded_url

In [None]:
def get_title(soup):
    title = soup.find('meta', {'property': 'og:title'}).get('content') or soup.title.get_text()

    return title

In [None]:
def get_links(soup, internal_link=True, external_link=False, attachment=True, start_url=None):
    # get links
    links = set()
    for a in soup.find_all('a', href=True):
        if attachment and is_attachment_file(a['href']):
            links.add(a['href'])

        else:
            is_internal_link = is_subdirectory(a['href'])
            if is_internal_link and internal_link:
                links.add(encode_url(start_url + a['href']))
            elif not is_internal_link and external_link:
                links.add(encode_url(a['href']))

    return links

In [None]:
def get_images(url):
    response = requests.get(url, headers=REQUEST_HEADER)
    if not response.ok:
        response.raise_for_status()

    return response.content

In [None]:
def is_table(table_soup):
    caption = table_soup.find('caption')
    if caption is None or not caption.get_text().startswith('Attachments'):
        return True
    return False

In [None]:
def get_table(soup):
    table_elements = []
    for table in soup.find_all('table'):
        if is_table(table):
            table_elements.append(str(table))
            table.decompose()

    return table_elements

In [None]:
def parse_website_image(soup, extractor, start_url=None):
    not_parsed_imgs = []
    for img in soup.find_all('img', src=True):
        if img['src'].split('.')[-1] == 'gif':
            continue

        url = img['src']
        if is_subdirectory(url):
            try:
                url = start_url + url
            except:
                raise Exception("Can not access incomplete URL")

        parse_content = extractor.invoke(url)
        if not parse_content.startswith('others'):
            img.insert_after(parse_content)
        elif not parse_content.startswith('nothing'):
            not_parsed_imgs.append(url)

    return soup, not_parsed_imgs

In [None]:
def parse_website_url(soup, start_url):
    for a in soup.find_all('a', href=True):
        if a.string and a.string.strip() != a['href']:
            original_url = start_url + a['href'] if is_subdirectory(a['href']) else a['href']
            a.string += f' ({original_url})'

    return soup

In [None]:
def parse_website(soup, parse_reference=True, parse_image=False, start_url=None):
    # kill all script and style elements
    for script in soup(['script', 'style']):
        script.decompose()

    # parse image
    imgs = []
    if parse_image:
        image_extractor = GeminiImageExtractor()
        soup, imgs = parse_website_image(soup, image_extractor, start_url=start_url)

    # parse references
    if parse_reference:
        soup = parse_website_url(soup, start_url)

    tables = get_table(soup)

    # parse content
    text = soup.get_text()
    lines = (line.strip() for line in text.splitlines())
    chunks = (phrase.strip() for line in lines for phrase in line.split('  '))
    text = '\n'.join(chunk for chunk in chunks if chunk)

    return text, tables, imgs

## Crawling

**rule:** There is not any 2 articles have different release year

In [None]:
# crawl functions
def crawl_webpage(url, parse_reference=True, parse_image=False, hash=False):
    reference_urls = set()
    page_content = None
    hash_value = None

    soup = get_web_soup(url)
    if soup:
        title = get_title(soup)
        main_soup = preprocess_soup(soup)

        # hash
        if hash:
            hash_value = sha256(main_soup.encode()).hexdigest()

        # extract start url
        parsed_url = urlparse(url)
        start_url = f"{parsed_url.scheme}://{parsed_url.netloc}"

        # get references
        reference_urls.update(get_links(main_soup,
                                              internal_link=True,
                                              external_link=False,
                                              attachment=True,
                                              start_url=start_url))

        # parse content
        text, tables, images = parse_website(main_soup, parse_reference, parse_image, start_url)

        ret = title, text, tables, images, reference_urls
        if hash:
            ret += (hash_value,)

        return ret
    return None

In [None]:
def crawl(storage_urls: dict):
    # essential variables
    start_url = 'https://tuyensinh.hcmus.edu.vn'
    news_url = 'https://tuyensinh.hcmus.edu.vn/th%C3%B4ng-tin-tuy%E1%BB%83n-sinh-%C4%91%E1%BA%A1i-h%E1%BB%8Dc'
    need2crawl_url = set(storage_urls['base_url']).difference(news_url)
    crawled_url = set()
    data_dict = {}

    # crawl
    soup = get_web_soup(news_url)
    if soup:
        main_soup = preprocess_soup(soup)

        # check latest article
        latest_article_tag = main_soup.find('li')
        latest_article_url = latest_article_tag.find('a')['href']
        latest_article_url = start_url + latest_article_url if is_subdirectory(latest_article_url) else latest_article_url


        if latest_article_url not in storage_urls['article_url']:
            latest_article_release_date = latest_article_tag.find('span', class_="mod-articles-category-date").text.strip()
            random_crawled_url = None
            valid_year = datetime.now().year

            if storage_urls['article_url']:
                random_crawled_url = list(storage_urls['article_url'].items())[0]

            # case 2: article is of new year
            if random_crawled_url and random_crawled_url[1]['release_date'][-4:] == latest_article_release_date[-4:]:
                # delete old crawled article
                valid_year = random_crawled_url[1]['release_date'][-4:]

            # case 3: update of this year
            else:
                storage_urls['article_url'].clear()

            # get new articles
            new_article_urls = set()
            for tag in main_soup.find_all('li'):
                release_date = tag.find('span', class_="mod-articles-category-date").text.strip()
                url = tag.find('a', class_="mod-articles-category-title")['href']
                if url and is_subdirectory(url):
                    url = start_url + url
                if url not in storage_urls['article_url'] and int(release_date[-4:]) == valid_year:
                    new_article_urls.add((encode_url(url), release_date))

            need2crawl_url.update(new_article_urls)

            # crawl new article
            for url, release_date in new_article_urls:
                crawled_url.add(url)

                title, text, tables, image_paths, references, hash_value = crawl_webpage(url, parse_reference=True, parse_image=True, hash=True)
                storage_urls['article_url'][url] = {
                    'release_date': release_date,
                    'hash_value': hash_value,
                }

                data_dict[url] = {}
                data_dict[url]['text'] = text
                data_dict[url]['table'] = tables
                data_dict[url]['image_path'] = image_paths
                data_dict[url]['title'] = title

                need2crawl_url.update(references)

        # crawl base url
        for url in storage_urls['base_url'].keys():
            hash_value = storage_urls['base_url'][url].get('hash_value')
            if website_is_updated(url, hash_value):
                title, text, tables, image_paths, references, hash_value = crawl_webpage(url, parse_reference=True, parse_image=True, hash=True)
                storage_urls['base_url'][url] = {
                    'hash_value': hash_value,
                }

                data_dict[url] = {}
                data_dict[url]['text'] = text
                data_dict[url]['table'] = tables
                data_dict[url]['image_path'] = image_paths
                data_dict[url]['title'] = title

                need2crawl_url.update(references)

        crawled_url.update(storage_urls['base_url'].keys())

        # check if complete all references
        # TODO: Handle remaining URLs
        need2crawl_url = need2crawl_url.difference(crawled_url)

        return storage_urls, data_dict
    
    return None

In [18]:
with open('../data/sitemap.json', 'r', encoding='utf-8') as file:
    sitemap = json.load(file)
    
updated_sitemap, data = crawl(sitemap)