In [None]:
import platform as python_platform

class Platform:
    WINDOWS = "Windows"
    MACOS = "Darwin"

platform = python_platform.system()

In [None]:
windows_root_directory = "D:/Vector A/0. KHTN/Nam 4/HKII/Thesis/Brainstorming/DataCrawling"
macos_root_directory = "???"

root_directory = windows_root_directory if platform == Platform.WINDOWS else macos_root_directory

In [None]:
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By

import sys
sys.path.append(root_directory)

import os
from os.path import join, getsize
import threading
import time
import subprocess
import signal

import re

import file_utils as fu

In [None]:
class BrowserStatus:
    AVAILABLE = "available"
    BUSY = "busy"

In [None]:
json_dir_path = fu.relative_to_absolute_path(
    "GetAllSongHTML/song_list_link_by_artist/json",
    root_path=root_directory
)

print(json_dir_path)

In [None]:
song_links_by_alphabet = fu.get_file_path_list_in_dir(json_dir_path)

song_links_by_alphabet = {
    path.split("/")[-1].split(".")[0].lower() : path
    for path in song_links_by_alphabet
}

print(song_links_by_alphabet)

In [None]:
def song_link_to_relative_html_file_path(link):
    artist_letter = link.split("/")[-2][0]
    artist = link.split("/")[-2]
    song = link.split("/")[-1]

    return f"{artist_letter}/{artist}/{song}.html"


print(
    song_link_to_relative_html_file_path(
        fu.read_data_from_json_file(
            song_links_by_alphabet["a"])[0]["link"]
        )
    )

In [None]:
webdriver_pool = []
lock = threading.Lock()

In [None]:
def scroll_to_bottom(webdriver_pool_index, scroll_pause_time = 0.5):
    # Get the screen height of the web
    screen_height = webdriver_pool[webdriver_pool_index]["browser"].execute_script("return window.screen.height;") 
    i = 1

    max_scroll_times = 100

    while True:
        print(f"Scrolled: {i} time(s)")

        # Scroll one screen height each time
        try:
            webdriver_pool[webdriver_pool_index]["browser"].execute_script(f"window.scrollTo(0, {screen_height}*{i});")
        except:
            print(f"Error while scrolling")
            raise Exception("Error while scrolling")
        
        time.sleep(scroll_pause_time)
        
        # Cập nhật scroll_height sau mỗi lần scroll, vì scroll_height có thể thay đổi sau khi scroll trang
        scroll_height = webdriver_pool[webdriver_pool_index]["browser"].execute_script("return document.body.scrollHeight;")

        i += 1
        if (screen_height) * i > scroll_height:
            break

        if i > max_scroll_times:
            print(f"Max scroll times reached")
            raise Exception("Max scroll times reached")

In [None]:
def scroll_and_get_html_content(url, webdriver_pool_index):
    try:
        webdriver_pool[webdriver_pool_index]["browser"].get(url)
    except:
        print(f"Failed to get {url}")
        raise Exception(f"Failed to get {url}")
    
    scroll_to_bottom(webdriver_pool_index)
    
    contents = webdriver_pool[webdriver_pool_index]["browser"].page_source

    return contents

In [None]:
def get_song_raw_html(url, webdriver_pool_index):
    print(f"{url}: processing...")
    
    page_source = scroll_and_get_html_content(url, webdriver_pool_index)

    path_to_save_html = fu.relative_to_absolute_path(
        song_link_to_relative_html_file_path(url),
        root_path=fu.relative_to_absolute_path(
            "GetAllSongHTML/song_list/raw_html",
            root_path=root_directory
        )
    )
    
    os.makedirs(os.path.dirname(path_to_save_html), exist_ok=True)

    fu.write_data_to_html_file(page_source, path_to_save_html)

    print(f"{url}: done")

In [None]:
def open_edge_in_remote_debugging_mode(port, platform, browser_instance_data_dir):
    print(f"Opening Edge in remote debugging mode on port {port}...")

    macos_command = f'''open -na "Microsoft Edge.app" --args
                        --remote-debugging-port={port}
                        --user-data-dir="{browser_instance_data_dir}"'''

    windows_command = f'''start msedge.exe 
                        --remote-debugging-port={port} 
                        --user-data-dir="{browser_instance_data_dir}'''

    command = macos_command if platform == Platform.MACOS else windows_command
    
    os.system(command.replace("\n", " "))

In [None]:
def init_selenium_in_remote_debugging_mode(port):
    options = webdriver.EdgeOptions()
    options.add_experimental_option("debuggerAddress", f"localhost:{port}")

    # browser.maximize_window()
    global webdriver_pool
    with lock:
        webdriver_pool.append(
            {
                "port": port,
                "browser": webdriver.Edge(options=options),
                "browser_status": BrowserStatus.AVAILABLE
            }
        )

In [None]:
def windows_get_PID_of_process_running_on_port(port):
    command = f"netstat -a -n -o | findstr :{port}"

    result = subprocess.run(command, shell=True, capture_output=True, text=True)

    #   TCP    127.0.0.1:40000        0.0.0.0:0              LISTENING       6920
    regex_pattern = r"\s+TCP\s+127.0.0.1:\d+\s+\d+.\d+.\d+.\d+:\d+\s+LISTENING\s+(.+)"

    match = re.search(regex_pattern, result.stdout)

    pid = None

    if match:
        pid = match.group(1).replace(" ", "")
    else:
        pid = None

    return pid

In [None]:
def kill_process_running_on_port(port, platform=Platform.WINDOWS):
    print(f"Killing process running on port {port}...")

    try:
        if platform == Platform.MACOS:
            result = os.system(f"lsof -ti tcp:{port} | xargs kill -9")
        else:
            pid = windows_get_PID_of_process_running_on_port(port)

            if pid != None:
                result = subprocess.run(
                    f"taskkill /PID {pid} /F", 
                    shell=True, 
                    capture_output=True, 
                    text=True
                ).stdout
            else:
                result = f"No process is running on port {port}"
    except:
        print(f"Error when killing process running on port {port}")

In [None]:
def start_webdriver(port, platform):
    # Start webdriver if port is not in the pool
    if len(webdriver_pool) == 0 or port not in [webdriver["port"] for webdriver in webdriver_pool]:
        kill_process_running_on_port(port, platform)

        open_edge_in_remote_debugging_mode(
            port,
            platform,
            fu.relative_to_absolute_path(
                f"GetAllSongHTML/song_list/browser_instance_data/{port}",
                root_path=root_directory
            )
        )

        init_selenium_in_remote_debugging_mode(port)
    else:
        print(f"Webdriver on port {port} is already started")

In [None]:

def init_webdriver_pool(n_webdriver=5):

    webdriver_pool.clear()

    if platform == Platform.WINDOWS:
        os.system("taskkill /F /IM msedgedriver.exe")
        os.system("taskkill /F /IM msedge.exe")

    for i in range(n_webdriver):
        port = 40000 + i

        kill_process_running_on_port(port, platform)
        
        start_webdriver_thread = threading.Thread(
            target=start_webdriver, 
            args=(port, platform, )
        )
        
        start_webdriver_thread.start()

        start_webdriver_thread.join(15)

        # Check if the webdriver is started successfully
        if len(webdriver_pool) != i + 1:
            print(f"Failed to start webdriver on port {port}. Retrying...")
            i -= 1
        else:
            print(f"Webdriver on port {port} started successfully")           

In [None]:
def restart_webdriver_pool():
    n_webdriver_item = len(webdriver_pool)

    for webdriver_instance in webdriver_pool:
        kill_process_running_on_port(webdriver_instance["port"], platform)

    init_webdriver_pool(n_webdriver_item)

In [None]:
def change_port_status(port, status):
    for i in range(len(webdriver_pool)):
        if webdriver_pool[i]["port"] == port:
            webdriver_pool[i]["browser_status"] = status

In [None]:
def get_raw_html_with_webdriver_from_pool(url, webdriver_pool_index):
    if (webdriver_pool[webdriver_pool_index]["browser_status"] == BrowserStatus.AVAILABLE):
        try:
            with lock:
                change_port_status(webdriver_pool[webdriver_pool_index]["port"], BrowserStatus.BUSY)
            
            get_song_raw_html(url, webdriver_pool_index)
            
            with lock:
                change_port_status(webdriver_pool[webdriver_pool_index]["port"], BrowserStatus.AVAILABLE)
        except:
            with lock:
                restart_webdriver_pool()

In [None]:
def should_song_html_be_downloaded(song_link):
    local_html_file_path = fu.relative_to_absolute_path(
        song_link_to_relative_html_file_path(song_link),
        root_path=fu.relative_to_absolute_path(
            "GetAllSongHTML/song_list/raw_html",
            root_path=root_directory
        )
    )
    
    is_file_exist = os.path.exists(local_html_file_path)

    local_file_size = getsize(local_html_file_path) if is_file_exist else 0

    size_in_kilobytes_threshold = 150

    return (not is_file_exist) or (local_file_size < size_in_kilobytes_threshold * 1000)

In [None]:
def get_song_raw_html_by_alphabet(letter, song_per_batch=5):
    # Lấy danh sách link của letter
    song_links = fu.read_data_from_json_file(song_links_by_alphabet[letter])

    # Xóa các song đã được crawl
    song_links = [
        song
        for song in song_links 
        if should_song_html_be_downloaded(song["link"])
    ]

    n_song_processed = 0
    thread_timeout = 60

    while(len(song_links) > 0):
        current_song_per_batch = song_per_batch if song_per_batch <= len(song_links) else len(song_links)
        song_batch = [song_links.pop(0) for _ in range(current_song_per_batch)]

        threadBatch = []

        for webdriver_pool_index in range(len(webdriver_pool)):
            if webdriver_pool[webdriver_pool_index]["browser_status"] == BrowserStatus.AVAILABLE:
                threadBatch.insert(
                    0,
                    threading.Thread(
                        target=get_raw_html_with_webdriver_from_pool,
                        args=(song_batch.pop(0)["link"], webdriver_pool_index, )
                    )
                )

                threadBatch[0].start()
            elif webdriver_pool_index == len(webdriver_pool) - 1:
                print(f"All webdrivers are busy")
                break
        
        for i in range(len(threadBatch)):
            threadBatch[i].join(thread_timeout)


        n_song_processed += current_song_per_batch

        anti_rate_limiting_waited_time = 0
        total_time_to_wait = 15

        for i in range(total_time_to_wait):
            time.sleep(1)
            anti_rate_limiting_waited_time += 1
            print(f"{n_song_processed} songs processed. Waited {anti_rate_limiting_waited_time} of {total_time_to_wait} seconds")
        
        threadBatch.clear()

    print(f"Letter {letter} done")


In [None]:
song_per_batch = 8
init_webdriver_pool(n_webdriver=song_per_batch)

In [None]:
# get_song_raw_html_by_alphabet('a', song_per_batch)

In [None]:
start_letter = 'a'
start_letter = 'b'
end_letter = 'z'

for letter in range(ord(start_letter), ord(end_letter) + 1):
    get_song_raw_html_by_alphabet(chr(letter), song_per_batch)

In [None]:
# Kill all Edge instances
for webdriver in webdriver_pool:
    kill_process_running_on_port(webdriver["port"], platform)

In [None]:
import platform

# Lấy thông tin về hệ điều hành
operating_system = platform.system()

print(operating_system)

In [None]:
# For debugging
import os
import platform as python_platform

if python_platform.system() == "Windows":
    os.system("taskkill /F /IM msedge.exe")