In [1]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.common.exceptions import WebDriverException, NoSuchElementException, TimeoutException
import time
import pandas as pd
from pprint import pprint
from bs4 import BeautifulSoup

In [2]:
import time
from pprint import pprint
from typing import Dict, List

import json

from selenium.webdriver.chrome.service import Service
from selenium import webdriver
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed

import threading


def store_cache(cache_path, cache: Dict[str, str]):
    """
    Stores the provided cache dictionary into a JSON file at the specified path.

    Args:
        cache_path (str): The file path where the cache will be stored.
        cache (dict): The cache data to store.

    Returns:
        None
    """
    with open(cache_path, "a", encoding="utf-8") as cache_file:
        for key, value in cache.items():
            if isinstance(key, tuple):
                key = str(key)

            json_string = json.dumps({key: value}) + "\n"  # Convert to JSON string with newline
            cache_file.write(json_string)



In [3]:
class USNewsRankingCrawl:
    base_url = "https://www.usnews.com"
    ranking_page_url = base_url + "/education/best-global-universities/canada"
    front_url = "?name="
    # back_url = "&country=api&subject=search"

    def __init__(self, total_num: int = 2165):
        self.total_num = total_num
        self.driver = webdriver.Chrome(service=Service())
        self.ranking_info = {}
        self.lock = threading.Lock()

    # TODO: fix this, can't load all pages, dynamically, at once
    def keep_scrolling_to_the_bottom(self):
        while True:
            previous_scrollY = self.driver.execute_script( 'return window.scrollY' )
            self.driver.execute_script( 'window.scrollBy( 0, 230 )' )
            time.sleep( 10 )
            if previous_scrollY == self.driver.execute_script( 'return window.scrollY' ):
                button = self.driver.find_element(By.CSS_SELECTOR, "#rankings > div.pager__Container-sc-1i8e93j-0.hqeWub > button")
                button.click()
                time.sleep( 2 )
                if previous_scrollY == self.driver.execute_script( 'return window.scrollY' ):
                    break


    def get_page_content_by_name(self, name:str):
        if not name:
            raise ValueError("Expect the name of a university but got none.")
        try:
            self.driver.get(f"{self.ranking_page_url}{self.front_url}{name}")
        except TimeoutException:
            print("Page load timed out. Check your internet connection or website accessibility.")
            return None  # or handle as needed
        except WebDriverException as e:
            print(f"An error occurred while trying to navigate: {e}")
            return None
        return self.driver.page_source

    def get_page(self):
        self.driver.get("/home/ivan/Uforse/university_crawl/university_info_generator/fetcher/ranking_fetcher/ranking_data/usnews_canada_2023.html")

    def parse_page(self, page_content):
        page_data = {}
        soup = BeautifulSoup("/home/ivan/Uforse/university_crawl/university_info_generator/fetcher/ranking_fetcher/ranking_data/usnews_canada_2023.html", "html.parser")
        try:
            rows = soup.find_all(name="li", attrs={"class": "item-list__ListItemStyled-sc-18yjqdy-1 boZDDO"})
        except AttributeError:
            return
        for row in rows:
            with self.lock:
                row_bs = BeautifulSoup(str(row), "html.parser")
                try:
                    rank = row_bs.find(name="div", attrs={"class": "RankList__Rank-sc-2xewen-2 ieuiBj ranked has-badge"}).text.strip()
                    uni_link = row_bs.find(name="a", attrs={"class": "Anchor-byh49a-0 DetailCardGlobalUniversities__StyledAnchor-sc-1v60hm5-5 eMEqFO bFdMFJ"})
                    university_name = uni_link.text.strip()
                    link = uni_link["href"]
                    if "#" in rank:
                        rank = rank[rank.index("#")+1:]
                except AttributeError:
                    print("Not enough info skipped\n" + str(row))
                    continue
                page_data[university_name] = {"rank": rank, "uni_link": f"{link}"}
                # print(page_data[university_name])

                self.ranking_info.update(page_data)
        return page_data

    def get_all_ranking_by_list(self, lst:List[str]):
            self.parse_page(self.get_page())


    def close(self):
        if self.driver:
            self.driver.quit()

    def to_dataframe(self):
        df = pd.DataFrame.from_dict(self.ranking_info, orient="index").reset_index()
        df.columns = ["university_name", "rank", "arwu_uni_link"]
        return df

    def to_csv(self):
        df = self.to_dataframe()
        file_path = "/home/ivan/Uforse/university_crawl/university_info_generator/fetcher/ranking_fetcher/ranking_data/us_news_ranking_2023.csv"
        df.to_csv(file_path, index=False, encoding="utf-8")

In [4]:
# df = pd.read_csv("all_universities.csv")
# lst = df["university_name"].to_list()
# print(lst[-20:])

In [5]:


if __name__ == "__main__":
    cra = USNewsRankingCrawl()
    # pprint(cra.get_page_content(0))
    page = cra.parse_page(None)
    try:
        file_path = "/home/ivan/Uforse/university_crawl/university_info_generator/fetcher/ranking_fetcher/ranking_data/us_news_ranking_2023.jsonl"
        store_cache(f"{file_path}", cra.ranking_info)
    except IOError as exc:
        raise IOError(f"An error occurred while writing to the file: {file_path}") from exc
    finally:
        cra.close()





In [6]:
store_cache(f"{file_path}", cra.ranking_info)

In [7]:
print(cra.ranking_info)

{}


In [8]:
soup = BeautifulSoup("/home/ivan/Uforse/university_crawl/university_info_generator/fetcher/ranking_fetcher/ranking_data/usnews_canada_2023.html", "html.parser")

In [10]:
rows = soup.find_all(name="li", attrs={"class": "item-list__ListItemStyled-sc-18yjqdy-1 boZDDO"})

In [11]:
print(len(rows))

0
