diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 00000000..5fe092a6 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,29 @@ +name: PyDoll Tests Suite + +on: push + +jobs: + tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.12 + uses: actions/setup-python@v4 + with: + python-version: "3.12" + - name: Install dependencies + run: | + python -m pip install poetry + poetry install + - name: Run tests with coverage + run: | + poetry run pytest -s -x --cov=pydoll -vv --cov-report=xml + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v5 + with: + file: ./coverage.xml + flags: tests + name: PyDoll Tests + fail_ci_if_error: true + token: ${{ secrets.CODECOV_TOKEN }} \ No newline at end of file diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 00000000..9f319886 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,7 @@ +coverage: + status: + project: + default: + target: 90% + threshold: 0% + base: auto \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 24e38404..2fad1068 100644 --- a/poetry.lock +++ b/poetry.lock @@ -120,6 +120,21 @@ yarl = ">=1.17.0,<2.0" [package.extras] speedups = ["Brotli", "aiodns (>=3.2.0)", "brotlicffi"] +[[package]] +name = "aioresponses" +version = "0.7.7" +description = "Mock out requests made by ClientSession from aiohttp package" +optional = false +python-versions = "*" +files = [ + {file = "aioresponses-0.7.7-py2.py3-none-any.whl", hash = "sha256:6975f31fe5e7f2113a41bd387221f31854f285ecbc05527272cd8ba4c50764a3"}, + {file = "aioresponses-0.7.7.tar.gz", hash = "sha256:66292f1d5c94a3cb984f3336d806446042adb17347d3089f2d3962dd6e5ba55a"}, +] + +[package.dependencies] +aiohttp = ">=3.3.0,<4.0.0" +packaging = ">=22.0" + [[package]] name = "aiosignal" version = "1.3.1" @@ -1211,4 +1226,4 @@ propcache = ">=0.2.0" [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "2f56c6a3241851e92303fdd53f1b131f2867ce4bb615ef4f61d2d069d47d360a" +content-hash = "df43804661aa82720a45f099256276ec22c1e0b2526eb57a869c50a92a3e1682" diff --git a/pydoll/browser/base.py b/pydoll/browser/base.py index e4f12f6e..97f0a813 100644 --- a/pydoll/browser/base.py +++ b/pydoll/browser/base.py @@ -1,14 +1,15 @@ import asyncio -import os -import shutil -import subprocess from abc import ABC, abstractmethod -from contextlib import suppress from functools import partial from random import randint -from tempfile import TemporaryDirectory from pydoll import exceptions +from pydoll.browser.managers import ( + BrowserOptionsManager, + BrowserProcessManager, + ProxyManager, + TempDirectoryManager, +) from pydoll.browser.options import Options from pydoll.browser.page import Page from pydoll.commands.browser import BrowserCommands @@ -18,7 +19,7 @@ from pydoll.commands.page import PageCommands from pydoll.commands.storage import StorageCommands from pydoll.commands.target import TargetCommands -from pydoll.connection import ConnectionHandler +from pydoll.connection.connection import ConnectionHandler from pydoll.events.fetch import FetchEvents @@ -31,7 +32,9 @@ class Browser(ABC): # noqa: PLR0904 """ def __init__( - self, options: Options | None = None, connection_port: int = None + self, + options: Options | None = None, + connection_port: int = None, ): """ Initializes the Browser instance. @@ -39,14 +42,21 @@ def __init__( Args: options (Options | None): An instance of the Options class to configure the browser. If None, default options will be used. + connection_port (int): The port to connect to the browser. + + Raises: + TypeError: If any of the arguments are not callable. """ + self.options = BrowserOptionsManager.initialize_options(options) + self._proxy_manager = ProxyManager(self.options) self._connection_port = ( connection_port if connection_port else randint(9223, 9322) ) - self.connection_handler = ConnectionHandler(self._connection_port) - self.options = self._initialize_options(options) - self.process = None - self.temp_dirs = [] + self._browser_process_manager = BrowserProcessManager() + self._temp_directory_manager = TempDirectoryManager() + self._connection_handler = ConnectionHandler(self._connection_port) + BrowserOptionsManager.add_default_arguments(self.options) + self._pages = [] async def __aenter__(self): @@ -54,81 +64,27 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): await self.stop() - for temp_dir in self.temp_dirs: - with suppress(OSError): - shutil.rmtree(temp_dir.name) - await self.connection_handler.close() + await self._connection_handler.close() async def start(self) -> None: - """ - Starts the browser process with the specified options, - including proxy configurations. - - This method initializes and launches the browser, setting up the - necessary command-line arguments. It checks for a specified user data - directory, creating a temporary directory if none is provided, - and configures the browser to run in a controlled environment. - - Returns: - Page: The Page instance for the browser. - """ - + """Método principal para iniciar o navegador.""" binary_location = ( self.options.binary_location or self._get_default_binary_location() ) - self.options.arguments.append('--no-first-run') - self.options.arguments.append('--no-default-browser-check') - - temp_dir = self._get_temp_dir() - - if '--user-data-dir' not in [ - arg.split('=')[0] for arg in self.options.arguments - ]: - self.options.arguments.append(f'--user-data-dir={temp_dir.name}') - - private_proxy, proxy_credentials = self._configure_proxy() + self._setup_user_dir() - self.process = subprocess.Popen( - [ - binary_location, - f'--remote-debugging-port={self._connection_port}', - *self.options.arguments, - ], - stdout=subprocess.PIPE, + self._browser_process_manager.start_browser_process( + binary_location, + self._connection_port, + self.options.arguments, ) - if not await self._is_browser_running(): - raise exceptions.BrowserNotRunning('Failed to start browser') + await self._verify_browser_running() - if private_proxy: - await self.enable_fetch_events(handle_auth_requests=True) - await self.on( - FetchEvents.REQUEST_PAUSED, - self._continue_request, - temporary=True, - ) - # partial is used to send extra arguments to the callback - # and keep the callback as a coroutine function - await self.on( - FetchEvents.AUTH_REQUIRED, - partial( - self._continue_request_auth_required, - proxy_username=proxy_credentials[0], - proxy_password=proxy_credentials[1], - ), - temporary=True, - ) + proxy_config = self._proxy_manager.get_proxy_credentials() + await self._configure_proxy(proxy_config[0], proxy_config[1]) - pages = await self.get_pages() - try: - valid_page = [ - page - for page in pages - if page['type'] == 'page' and 'chrome://newtab/' in page['url'] - ][0]['targetId'] - self._pages.append(valid_page) - except IndexError: - await self.new_page() + await self._init_first_page() async def set_download_path(self, path: str): """ @@ -143,10 +99,9 @@ async def get_page(self) -> Page: Retrieves a Page instance for an existing page in the browser. If no pages are open, a new page will be created. """ - if not self._pages: - await self.new_page() - - page_id = self._pages.pop() + page_id = ( + await self.new_page() if not self._pages else self._pages.pop() + ) return Page(self._connection_port, page_id) async def delete_all_cookies(self): @@ -201,8 +156,7 @@ async def callback_wrapper(event): function_to_register = callback_wrapper else: function_to_register = callback - - return await self.connection_handler.register_callback( + return await self._connection_handler.register_callback( event_name, function_to_register, temporary ) @@ -217,9 +171,9 @@ async def new_page(self, url: str = ''): TargetCommands.create_target(url) ) page_id = response['result']['targetId'] - self._pages.append(page_id) + return page_id - async def get_pages(self): + async def _get_targets(self): """ Retrieves the list of open pages in the browser. @@ -238,7 +192,8 @@ async def stop(self): """ if await self._is_browser_running(): await self._execute_command(BrowserCommands.CLOSE) - self.process.terminate() + self._browser_process_manager.stop_process() + self._temp_directory_manager.cleanup() else: raise exceptions.BrowserNotRunning('Browser is not running') @@ -282,82 +237,6 @@ async def set_window_minimized(self): BrowserCommands.set_window_minimized(window_id) ) - async def _is_browser_running(self): - """ - Checks if the browser process is currently running. - Attempts to connect to the browser to verify its status. - - Returns: - bool: True if the browser is running, False otherwise. - """ - for _ in range(10): - if await self._check_browser_connection(): - return True - await asyncio.sleep(1) - return False - - async def _check_browser_connection(self): - """ - Checks if the browser process is currently running. - - Returns: - bool: True if the browser is running, False otherwise. - """ - try: - await self.connection_handler.connection - return True - except Exception as exc: - print(f'Browser is not running: {exc}') - return False - - async def _execute_command(self, command: str): - """ - Executes a command through the connection handler. - - Args: - command (str): The command to be executed. - - Returns: - The response from executing the command. - """ - return await self.connection_handler.execute_command( - command, timeout=60 - ) - - def _configure_proxy(self) -> tuple[bool, tuple[str, str]]: - """ - Configures the proxy settings for the browser. If the proxy - is private, the credentials will be extracted from the proxy - string and returned. - - Returns: - tuple[bool, tuple[str, str]]: A tuple containing a boolean - indicating if the proxy is private and a tuple with the proxy - username and password - """ - private_proxy = False - proxy_username, proxy_password = None, None - - if any('--proxy-server' in arg for arg in self.options.arguments): - proxy_index = next( - index - for index, arg in enumerate(self.options.arguments) - if '--proxy-server' in arg - ) - proxy = self.options.arguments[proxy_index].replace( - '--proxy-server=', '' - ) - - if '@' in proxy: - credentials, proxy_server = proxy.split('@') - self.options.arguments[proxy_index] = ( - f'--proxy-server={proxy_server}' - ) - proxy_username, proxy_password = credentials.split(':') - private_proxy = True - - return private_proxy, (proxy_username, proxy_password) - async def enable_page_events(self): """ Enables listening for page-related events over the websocket @@ -377,7 +256,7 @@ async def enable_page_events(self): Returns: None """ - await self.connection_handler.execute_command( + await self._connection_handler.execute_command( PageCommands.enable_page() ) @@ -399,7 +278,7 @@ class documentation. Returns: None """ - await self.connection_handler.execute_command( + await self._connection_handler.execute_command( NetworkCommands.enable_network_events() ) @@ -431,7 +310,7 @@ async def enable_fetch_events( Returns: None """ - await self.connection_handler.execute_command( + await self._connection_handler.execute_command( FetchCommands.enable_fetch_events( handle_auth_requests, resource_type ) @@ -454,7 +333,7 @@ async def enable_dom_events(self): Returns: None """ - await self.connection_handler.execute_command( + await self._connection_handler.execute_command( DomCommands.enable_dom_events() ) @@ -474,7 +353,7 @@ async def disable_fetch_events(self): Returns: None """ - await self.connection_handler.execute_command( + await self._connection_handler.execute_command( FetchCommands.disable_fetch_events() ) @@ -532,54 +411,96 @@ async def _continue_request_auth_required( ) await self.disable_fetch_events() - @staticmethod - def _validate_browser_path(path: str): - """ - Validates the provided browser path. + async def _init_first_page(self): + pages = await self._get_targets() + valid_page = await self._get_valid_page(pages) + self._pages.append(valid_page) - Args: - path (str): The file path to the browser executable. + async def _verify_browser_running(self): + """Verifica se o navegador está rodando.""" + if not await self._is_browser_running(): + raise exceptions.BrowserNotRunning('Failed to start browser') - Raises: - ValueError: If the browser path does not exist. + async def _configure_proxy(self, private_proxy, proxy_credentials): + """Configura o proxy, se necessário.""" + if private_proxy: + await self.enable_fetch_events(handle_auth_requests=True) + await self.on( + FetchEvents.REQUEST_PAUSED, + self._continue_request, + temporary=True, + ) + await self.on( + FetchEvents.AUTH_REQUIRED, + partial( + self._continue_request_auth_required, + proxy_username=proxy_credentials[0], + proxy_password=proxy_credentials[1], + ), + temporary=True, + ) - Returns: - str: The validated browser path. + @staticmethod + def _is_valid_page(page: dict) -> bool: + """Verifica se uma página é uma nova aba válida.""" + return page.get('type') == 'page' and 'chrome://newtab/' in page.get( + 'url', '' + ) + + async def _get_valid_page(self, pages) -> str: """ - if not os.path.exists(path): - raise ValueError(f'Browser not found: {path}') - return path + Obtém o ID de uma página válida ou cria uma nova. - @staticmethod - def _initialize_options(options: Options | None) -> Options: + Returns: + str: targetId da página existente ou nova """ - Initializes the options for the browser. + valid_page = next( + (page for page in pages if self._is_valid_page(page)), None + ) - Args: - options (Options | None): An instance of the Options class or None. + if valid_page: + try: + return valid_page['targetId'] + except KeyError: + pass - Raises: - ValueError: If the provided options are invalid. + return await self.new_page() + + async def _is_browser_running(self, timeout: int = 10) -> bool: + """ + Checks if the browser process is currently running. + Attempts to connect to the browser to verify its status. Returns: - Options: The initialized options instance. + bool: True if the browser is running, False otherwise. """ - if options is None: - return Options() - if not isinstance(options, Options): - raise ValueError('Invalid options') - return options + for _ in range(timeout): + if await self._connection_handler.ping(): + return True + await asyncio.sleep(1) + return False - def _get_temp_dir(self): + async def _execute_command(self, command: str): """ - Retrieves a temporary directory for the browser instance. + Executes a command through the connection handler. + + Args: + command (str): The command to be executed. Returns: - TemporaryDirectory: The temporary directory. + The response from executing the command. """ - temp_dir = TemporaryDirectory() - self.temp_dirs.append(temp_dir) - return temp_dir + return await self._connection_handler.execute_command( + command, timeout=60 + ) + + def _setup_user_dir(self): + """Prepara o diretório de dados do usuário, se necessário.""" + temp_dir = self._temp_directory_manager.create_temp_dir() + if '--user-data-dir' not in [ + arg.split('=')[0] for arg in self.options.arguments + ]: + self.options.arguments.append(f'--user-data-dir={temp_dir.name}') @abstractmethod def _get_default_binary_location(self) -> str: diff --git a/pydoll/browser/chrome.py b/pydoll/browser/chrome.py index d461b480..8dbc91ad 100644 --- a/pydoll/browser/chrome.py +++ b/pydoll/browser/chrome.py @@ -1,23 +1,31 @@ import os from pydoll.browser.base import Browser +from pydoll.browser.managers import BrowserOptionsManager from pydoll.browser.options import Options class Chrome(Browser): - def __init__(self, options: Options | None = None): - super().__init__(options) + def __init__( + self, options: Options | None = None, connection_port: int = 9222 + ): + super().__init__(options, connection_port) - def _get_default_binary_location(self): + @staticmethod + def _get_default_binary_location(): os_name = os.name match os_name: case 'nt': browser_path = ( r'C:\Program Files\Google\Chrome\Application\chrome.exe' ) - return self._validate_browser_path(browser_path) + return BrowserOptionsManager.validate_browser_path( + browser_path + ) case 'posix': browser_path = '/usr/bin/google-chrome' - return self._validate_browser_path(browser_path) + return BrowserOptionsManager.validate_browser_path( + browser_path + ) case _: raise ValueError('Unsupported OS') diff --git a/pydoll/browser/managers.py b/pydoll/browser/managers.py new file mode 100644 index 00000000..c430d96b --- /dev/null +++ b/pydoll/browser/managers.py @@ -0,0 +1,154 @@ +import os +import shutil +import subprocess +from contextlib import suppress +from tempfile import TemporaryDirectory + +from pydoll.browser.options import Options + + +class ProxyManager: + def __init__(self, options): + self.options = options + + def get_proxy_credentials(self) -> tuple[bool, tuple[str, str]]: + """ + Configura as configurações de proxy e extrai credenciais se presentes. + + Returns: + tuple[bool, tuple[str, str]]: (private_proxy, (username, password)) + """ + private_proxy = False + credentials = (None, None) + + proxy_arg = self._find_proxy_argument() + + if proxy_arg is not None: + index, proxy_value = proxy_arg + has_credentials, username, password, clean_proxy = ( + self._parse_proxy(proxy_value) + ) + + if has_credentials: + self._update_proxy_argument(index, clean_proxy) + private_proxy = True + credentials = (username, password) + + return private_proxy, credentials + + def _find_proxy_argument(self) -> tuple[int, str] | None: + """Encontra o primeiro argumento --proxy-server válido""" + for index, arg in enumerate(self.options.arguments): + if arg.startswith('--proxy-server='): + return index, arg.split('=', 1)[1] + return None + + @staticmethod + def _parse_proxy(proxy_value: str) -> tuple[bool, str, str, str]: + """Extrai credenciais e limpa o valor do proxy""" + if '@' not in proxy_value: + return False, None, None, proxy_value + + try: + creds_part, server_part = proxy_value.split('@', 1) + username, password = creds_part.split(':', 1) + return True, username, password, server_part + except ValueError: + return False, None, None, proxy_value + + def _update_proxy_argument(self, index: int, clean_proxy: str) -> None: + """Atualiza a lista de argumentos com proxy limpo""" + self.options.arguments[index] = f'--proxy-server={clean_proxy}' + + +class BrowserProcessManager: + def __init__(self, process_creator=None): + self._process_creator = ( + process_creator or self._default_process_creator + ) + self._process = None + + def start_browser_process( + self, binary_location: str, port: int, arguments: list + ) -> None: + """Inicia o processo do navegador""" + self._process = self._process_creator([ + binary_location, + f'--remote-debugging-port={port}', + *arguments, + ]) + return self._process + + @staticmethod + def _default_process_creator(command: list[str]): + return subprocess.Popen( + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + + def stop_process(self): + """Para o processo do navegador se estiver em execução""" + if self._process: + self._process.terminate() + + +class TempDirectoryManager: + def __init__(self, temp_dir_factory=TemporaryDirectory): + self._temp_dir_factory = temp_dir_factory + self._temp_dirs = [] + + def create_temp_dir(self): + """ + Cria um diretório temporário para a instância do navegador. + + Returns: + TemporaryDirectory: O diretório temporário. + """ + temp_dir = self._temp_dir_factory() + self._temp_dirs.append(temp_dir) + return temp_dir + + def cleanup(self): + """Limpa todos os diretórios temporários""" + for temp_dir in self._temp_dirs: + with suppress(OSError): + shutil.rmtree(temp_dir.name) + + +class BrowserOptionsManager: + @staticmethod + def initialize_options(options: Options | None) -> Options: + """ + Inicializa as opções para o navegador. + + Args: + options (Options | None): Uma instância da classe Options ou None. + + Returns: + Options: A instância de opções inicializada. + """ + if options is None: + return Options() + if not isinstance(options, Options): + raise ValueError('Invalid options') + return options + + @staticmethod + def add_default_arguments(options: Options): + """Adiciona argumentos padrão aos argumentos fornecidos""" + options.arguments.append('--no-first-run') + options.arguments.append('--no-default-browser-check') + + @staticmethod + def validate_browser_path(path: str) -> str: + """ + Valida o caminho fornecido do navegador. + + Args: + path (str): O caminho do arquivo executável do navegador. + + Returns: + str: O caminho do navegador validado. + """ + if not os.path.exists(path): + raise ValueError(f'Browser not found: {path}') + return path diff --git a/pydoll/browser/page.py b/pydoll/browser/page.py index 185c9396..0b54ca2c 100644 --- a/pydoll/browser/page.py +++ b/pydoll/browser/page.py @@ -9,7 +9,7 @@ from pydoll.commands.page import PageCommands from pydoll.commands.runtime import RuntimeCommands from pydoll.commands.storage import StorageCommands -from pydoll.connection import ConnectionHandler +from pydoll.connection.connection import ConnectionHandler from pydoll.element import WebElement from pydoll.mixins.find_elements import FindElementsMixin from pydoll.utils import decode_image_to_bytes @@ -90,6 +90,8 @@ async def page_source(self) -> str: Returns: str: The source code of the page. + + TODO: tix this """ response = await self._execute_command( RuntimeCommands.evaluate_script( diff --git a/pydoll/commands/__init__.py b/pydoll/commands/__init__.py new file mode 100644 index 00000000..374cf61c --- /dev/null +++ b/pydoll/commands/__init__.py @@ -0,0 +1,18 @@ +# global imports +from pydoll.commands.dom import DomCommands +from pydoll.commands.fetch import FetchCommands +from pydoll.commands.input import InputCommands +from pydoll.commands.network import NetworkCommands +from pydoll.commands.page import PageCommands +from pydoll.commands.runtime import RuntimeCommands +from pydoll.commands.storage import StorageCommands + +__all__ = [ + 'DomCommands', + 'FetchCommands', + 'InputCommands', + 'NetworkCommands', + 'PageCommands', + 'RuntimeCommands', + 'StorageCommands', +] diff --git a/pydoll/commands/dom.py b/pydoll/commands/dom.py index 1dce6c47..ec60cf61 100644 --- a/pydoll/commands/dom.py +++ b/pydoll/commands/dom.py @@ -17,7 +17,9 @@ class DomCommands: SelectorType (Literal): A type definition for supported selector types. """ - SelectorType = Literal[By.CSS, By.XPATH, By.CLASS_NAME, By.ID, By.TAG_NAME] + SelectorType = Literal[ + By.CSS_SELECTOR, By.XPATH, By.CLASS_NAME, By.ID, By.TAG_NAME + ] ENABLE = {'method': 'DOM.enable'} DOM_DOCUMENT = {'method': 'DOM.getDocument'} @@ -113,7 +115,7 @@ def find_element( selector = escaped_value if object_id and not by == By.XPATH: script = Scripts.RELATIVE_QUERY_SELECTOR.replace( - '{selector}', escaped_value + '{selector}', selector ) command = RuntimeCommands.call_function_on( object_id, @@ -168,21 +170,19 @@ def _find_element_by_xpath(cls, xpath: str, object_id: str) -> dict: escaped_value = xpath.replace('"', '\\"') if object_id: escaped_value = cls._ensure_relative_xpath(escaped_value) - command = copy.deepcopy(RuntimeCommands.CALL_FUNCTION_ON_TEMPLATE) - command['params']['objectId'] = object_id - command['params']['functionDeclaration'] = ( - Scripts.FIND_RELATIVE_XPATH_ELEMENT.replace( - '{escaped_value}', escaped_value - ) + script = Scripts.FIND_RELATIVE_XPATH_ELEMENT.replace( + '{escaped_value}', escaped_value + ) + command = RuntimeCommands.call_function_on( + object_id, + script, + return_by_value=False, ) - command['params']['returnByValue'] = False else: - command = copy.deepcopy(RuntimeCommands.EVALUATE_TEMPLATE) - command['params']['expression'] = ( - Scripts.FIND_XPATH_ELEMENT.replace( - '{escaped_value}', escaped_value - ) + script = Scripts.FIND_XPATH_ELEMENT.replace( + '{escaped_value}', escaped_value ) + command = RuntimeCommands.evaluate_script(script) return command @classmethod @@ -191,20 +191,19 @@ def _find_elements_by_xpath(cls, xpath: str, object_id: str) -> dict: escaped_value = xpath.replace('"', '\\"') if object_id: escaped_value = cls._ensure_relative_xpath(escaped_value) - command = copy.deepcopy(RuntimeCommands.CALL_FUNCTION_ON_TEMPLATE) - command['params']['objectId'] = object_id - command['params']['functionDeclaration'] = ( - Scripts.FIND_RELATIVE_XPATH_ELEMENTS.replace( - '{escaped_value}', escaped_value - ) + script = Scripts.FIND_RELATIVE_XPATH_ELEMENTS.replace( + '{escaped_value}', escaped_value + ) + command = RuntimeCommands.call_function_on( + object_id, + script, + return_by_value=False, ) else: - command = copy.deepcopy(RuntimeCommands.EVALUATE_TEMPLATE) - command['params']['expression'] = ( - Scripts.FIND_XPATH_ELEMENTS.replace( - '{escaped_value}', escaped_value - ) + script = Scripts.FIND_XPATH_ELEMENTS.replace( + '{escaped_value}', escaped_value ) + command = RuntimeCommands.evaluate_script(script) return command @staticmethod diff --git a/pydoll/commands/network.py b/pydoll/commands/network.py index 368fe2c3..70a571ec 100644 --- a/pydoll/commands/network.py +++ b/pydoll/commands/network.py @@ -1,3 +1,6 @@ +import copy + + class NetworkCommands: """ This class encapsulates the network commands of the @@ -88,7 +91,7 @@ def delete_cookies(cls, name: str, url: str = ''): Returns: dict: A command to delete the specified cookie. """ - delete_cookies_template = cls.DELETE_COOKIES_TEMPLATE.copy() + delete_cookies_template = copy.deepcopy(cls.DELETE_COOKIES_TEMPLATE) delete_cookies_template['params']['name'] = name if url: delete_cookies_template['params']['url'] = url @@ -130,7 +133,7 @@ def get_cookies(cls, urls: list[str] = []): Returns: dict: A command to get cookies associated with the specified URLs. """ - get_cookies_template = cls.GET_COOKIES_TEMPLATE.copy() + get_cookies_template = copy.deepcopy(cls.GET_COOKIES_TEMPLATE) if urls: get_cookies_template['params']['urls'] = urls return get_cookies_template @@ -148,8 +151,8 @@ def get_request_post_data(cls, request_id: str): Returns: dict: A command to get the POST data for the specified request. """ - get_request_post_data_template = ( - cls.GET_REQUEST_POST_DATA_TEMPLATE.copy() + get_request_post_data_template = copy.deepcopy( + cls.GET_REQUEST_POST_DATA_TEMPLATE ) get_request_post_data_template['params']['requestId'] = request_id return get_request_post_data_template @@ -168,7 +171,9 @@ def get_response_body(cls, request_id: str): dict: A command to get the response body associated with the specified request. """ - get_response_body_template = cls.GET_RESPONSE_BODY_TEMPLATE.copy() + get_response_body_template = copy.deepcopy( + cls.GET_RESPONSE_BODY_TEMPLATE + ) get_response_body_template['params']['requestId'] = request_id return get_response_body_template @@ -184,7 +189,9 @@ def set_cache_disabled(cls, cache_disabled: bool): Returns: dict: A command to set the cache state in the browser. """ - set_cache_disabled_template = cls.SET_CACHE_DISABLED_TEMPLATE.copy() + set_cache_disabled_template = copy.deepcopy( + cls.SET_CACHE_DISABLED_TEMPLATE + ) set_cache_disabled_template['params']['cacheDisabled'] = cache_disabled return set_cache_disabled_template @@ -202,7 +209,7 @@ def set_cookie(cls, name: str, value: str, url: str = ''): Returns: dict: A command to set the specified cookie in the browser. """ - set_cookie_template = cls.SET_COOKIE_TEMPLATE.copy() + set_cookie_template = copy.deepcopy(cls.SET_COOKIE_TEMPLATE) set_cookie_template['params']['name'] = name set_cookie_template['params']['value'] = value if url: @@ -221,7 +228,7 @@ def set_cookies(cls, cookies: list[dict]): Returns: dict: A command to set the specified cookies in the browser. """ - set_cookies_template = cls.SET_COOKIES_TEMPLATE.copy() + set_cookies_template = copy.deepcopy(cls.SET_COOKIES_TEMPLATE) set_cookies_template['params']['cookies'] = cookies return set_cookies_template @@ -239,8 +246,8 @@ def set_extra_http_headers(cls, headers: dict): dict: A command to set extra HTTP headers for the browser's network requests. """ - set_extra_http_headers_template = ( - cls.SET_EXTRA_HTTP_HEADERS_TEMPLATE.copy() + set_extra_http_headers_template = copy.deepcopy( + cls.SET_EXTRA_HTTP_HEADERS_TEMPLATE ) set_extra_http_headers_template['params']['headers'] = headers return set_extra_http_headers_template @@ -259,8 +266,8 @@ def set_useragent_override(cls, user_agent: str): dict: A command to override the browser's user agent for network requests. """ - set_useragent_override_template = ( - cls.SET_USERAGENT_OVERRIDE_TEMPLATE.copy() + set_useragent_override_template = copy.deepcopy( + cls.SET_USERAGENT_OVERRIDE_TEMPLATE ) set_useragent_override_template['params']['userAgent'] = user_agent return set_useragent_override_template @@ -300,7 +307,9 @@ def search_in_response( dict: A command to search the specified query within the response body of the given request. """ - search_in_response_template = cls.SEARCH_IN_RESPONSE_TEMPLATE.copy() + search_in_response_template = copy.deepcopy( + cls.SEARCH_IN_RESPONSE_TEMPLATE + ) search_in_response_template['params']['requestId'] = request_id search_in_response_template['params']['query'] = query search_in_response_template['params']['caseSensitive'] = case_sensitive @@ -320,6 +329,6 @@ def set_blocked_urls(cls, urls: list[str]): Returns: dict: A command to set the specified URLs as blocked. """ - set_blocked_urls_template = cls.SET_BLOCKED_URLS.copy() + set_blocked_urls_template = copy.deepcopy(cls.SET_BLOCKED_URLS) set_blocked_urls_template['params']['urls'] = urls return set_blocked_urls_template diff --git a/pydoll/connection.py b/pydoll/connection.py deleted file mode 100644 index 55f2dc03..00000000 --- a/pydoll/connection.py +++ /dev/null @@ -1,284 +0,0 @@ -import asyncio -import json -import logging -from typing import Callable - -import websockets - -from pydoll import exceptions -from pydoll.utils import get_browser_ws_address - -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - - -class ConnectionHandler: - """ - A class to handle WebSocket connections for browser automation. - - This class manages the connection to the browser and the associated page, - providing methods to execute commands and register event callbacks. - """ - - def __init__(self, connection_port: int, page_id: str = 'browser'): - """ - Initializes the ConnectionHandler instance. - - Args: - connection_port (int): The port to connect to the browser. - - Sets up the internal state including WebSocket addresses, - connection instance, event callbacks, and command ID. - """ - self._connection_port = connection_port - self._page_id = page_id - self._connection = None - self._event_callbacks = {} - self._id = 1 - self._callback_id = 0 - self._pending_commands: dict[int, asyncio.Future] = {} - self.network_logs = [] - self.dialog = {} - logger.info('ConnectionHandler initialized.') - - @property - async def connection(self) -> websockets.WebSocketClientProtocol: - """ - Returns the WebSocket connection to the browser. - - If the connection is not established, it is created first. - - Returns: - websockets.WebSocketClientProtocol: The WebSocket connection. - - Raises: - ValueError: If the connection cannot be established. - """ - if self._connection is None or self._connection.closed: - await self.connect_to_page() - return self._connection - - async def execute_command(self, command: dict, timeout: int = 10) -> dict: - """ - Sends a command to the browser and awaits its response. - - Args: - command (dict): The command to send, structured as a dictionary. - timeout (int, optional): Time in seconds to wait for a response. - Defaults to 10. - - Returns: - dict: The response from the browser. - - Raises: - InvalidCommand: If the command is not a dictionary. - TimeoutError: If the command execution exceeds the timeout. - """ - if not isinstance(command, dict): - logger.error('Command must be a dictionary.') - raise exceptions.InvalidCommand('Command must be a dictionary') - - command['id'] = self._id - command_str = json.dumps(command) - future = asyncio.Future() - self._pending_commands[self._id] = future - self._id += 1 - - connection = await self.connection - await connection.send(command_str) - logger.info(f'Sent command with ID {command["id"]}: {command}') - - try: - response: str = await asyncio.wait_for(future, timeout) - logger.info( - f'Received response for command ID {command["id"]}: {response}' - ) - del self._pending_commands[command['id']] - return json.loads(response) - except asyncio.TimeoutError: - del self._pending_commands[command['id']] - logger.warning( - f'Command execution timed out for ID {command["id"]}' - ) - raise TimeoutError('Command execution timed out') - - async def connect_to_page(self) -> websockets.WebSocketClientProtocol: - """ - Establishes a WebSocket connection to the browser page. - - Returns: - websockets.WebSocketClientProtocol: The WebSocket connection. - - Initiates a task to listen for events from the page WebSocket. - """ - if 'browser' in self._page_id: - ws_address = await get_browser_ws_address(self._connection_port) - else: - ws_address = ( - f'ws://localhost:{self._connection_port}/devtools/page/' - + self._page_id - ) - - connection = await websockets.connect(ws_address) - logger.info(f'Connected to page WebSocket at {ws_address}') - asyncio.create_task(self._receive_events()) - self._connection = connection - - async def register_callback( - self, event_name: str, callback: Callable, temporary: bool = False - ) -> None: - """ - Registers a callback function for a specific event. - - Args: - event_name (str): The name of the event to register. - callback (Callable): The function to call when the event - is received. - temporary (bool, optional): If True, the callback will be - removed after one use. Defaults to False. - - Raises: - InvalidCallback: If the callback is not callable. - """ - if not callable(callback): - logger.error('Callback must be a callable function.') - raise exceptions.InvalidCallback( - 'Callback must be a callable function' - ) - self._callback_id += 1 - self._event_callbacks[self._callback_id] = { - 'event': event_name, - 'callback': callback, - 'temporary': temporary, - } - logger.info( - f"Registered callback for event '{event_name}'" - f'with ID {self._callback_id}' - ) - return self._callback_id - - async def remove_callback(self, callback_id: int) -> bool: - """ - Removes a registered event callback by its ID. - - Args: - callback_id (int): The ID of the callback to remove. - - Raises: - InvalidCallback: If the callback ID is not found. - """ - if callback_id not in self._event_callbacks: - logger.warning(f'Callback with ID {callback_id} not found.') - return False - - del self._event_callbacks[callback_id] - logger.info(f'Removed callback with ID {callback_id}') - return True - - async def _receive_events(self): - """ - Listens for incoming events from the WebSocket connection - and processes them. - - Matches responses to pending commands and handles - events based on registered callbacks. - """ - try: - while True: - connection = await self.connection - event = await connection.recv() - try: - event_json = json.loads(event) - except json.JSONDecodeError: - logger.warning(f'Received malformed JSON message: {event}') - continue - - if ( - 'id' in event_json - and event_json['id'] in self._pending_commands - ): - logger.info( - 'Received response for pending ' - f'command ID {event_json["id"]}' - ) - self._pending_commands[event_json['id']].set_result(event) - continue - - logger.info(f'Received event: {event_json["method"]}') - await self._handle_event(event_json) - except websockets.ConnectionClosed: - logger.warning('WebSocket connection closed.') - except Exception as exc: - logger.error(f'Error while receiving event: {exc}', exc_info=True) - - async def _handle_event(self, event: dict): - """ - Processes a received event and triggers the appropriate callback(s). - - Args: - event (dict): The event data in dictionary form. - """ - event_name = event.get('method') - - if event_name: - logger.info(f'Handling event {event}') - else: - logger.warning('Event without a method received.') - - if 'Network.requestWillBeSent' in event_name: - self.network_logs.append(event) - self.network_logs = self.network_logs[-10000:] - - if 'Page.javascriptDialogOpening' in event_name: - self.dialog = event - - if 'Page.javascriptDialogClosed' in event_name: - self.dialog = {} - - event_callbacks = self._event_callbacks.copy() - for callback_id, callback_data in event_callbacks.items(): - if callback_data['event'] == event_name: - callback_func = callback_data['callback'] - - if asyncio.iscoroutinefunction(callback_func): - await callback_func(event) - else: - callback_func(event) - - if callback_data['temporary']: - del self._event_callbacks[callback_id] - logger.info( - f'Removed temporary callback with ID {callback_id}' - ) - - def clear_callbacks(self): - """ - Clears all registered event callbacks. - - Removes all event callbacks from the internal dictionary. - """ - self._event_callbacks = {} - logger.info('All event callbacks cleared.') - - async def close(self): - """ - Closes the WebSocket connection. - - Closes the WebSocket connection and clears all event callbacks. - """ - self.clear_callbacks() - await self._connection.close() - logger.info('WebSocket connection closed.') - - def __repr__(self): - return f'ConnectionHandler(port={self._connection_port})' - - def __str__(self): - return f'ConnectionHandler(port={self._connection_port})' - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - return False diff --git a/pydoll/connection/__init__.py b/pydoll/connection/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pydoll/connection/connection.py b/pydoll/connection/connection.py new file mode 100644 index 00000000..23e55e18 --- /dev/null +++ b/pydoll/connection/connection.py @@ -0,0 +1,228 @@ +import asyncio +import json +import logging +from typing import Callable + +import websockets + +from pydoll import exceptions +from pydoll.connection.managers import CommandManager, EventsHandler +from pydoll.utils import get_browser_ws_address + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class ConnectionHandler: + """ + A class to handle WebSocket connections for browser automation. + + This class manages the connection to the browser and the associated page, + providing methods to execute commands and register event callbacks. + """ + + def __init__( + self, + connection_port: int, + page_id: str = 'browser', + ws_address_resolver: Callable[[int], str] = get_browser_ws_address, + ws_connector: Callable = websockets.connect, + ): + """ + Initializes the ConnectionHandler instance. + + Args: + connection_port (int): The port to connect to the browser. + + Sets up the internal state including WebSocket addresses, + connection instance, event callbacks, and command ID. + """ + self._connection_port = connection_port + self._page_id = page_id + self._ws_address_resolver = ws_address_resolver + self._ws_connector = ws_connector + self._ws_connection = None + self._command_manager = CommandManager() + self._events_handler = EventsHandler() + logger.info('ConnectionHandler initialized.') + + @property + def network_logs(self): + return self._events_handler.network_logs + + async def ping(self) -> bool: + """ + Sends a ping message to the browser. + + Returns: + bool: True if the ping was successful, False otherwise. + """ + try: + await self._ensure_active_connection() + await self._ws_connection.ping() + return True + except Exception: + return False + + async def execute_command(self, command: dict, timeout: int = 10) -> dict: + """ + Sends a command to the browser and awaits its response. + + Args: + command (dict): The command to send, structured as a dictionary. + timeout (int, optional): Time in seconds to wait for a response. + Defaults to 10. + + Returns: + dict: The response from the browser. + + Raises: + InvalidCommand: If the command is not a dictionary. + TimeoutError: If the command execution exceeds the timeout. + """ + if not isinstance(command, dict): + logger.error('Command must be a dictionary.') + raise exceptions.InvalidCommand('Command must be a dictionary') + + await self._ensure_active_connection() + future = self._command_manager.create_command_future(command) + command_str = json.dumps(command) + + try: + await self._ws_connection.send(command_str) + response: str = await asyncio.wait_for(future, timeout) + return json.loads(response) + except asyncio.TimeoutError as exc: + self._command_manager.remove_pending_command(command['id']) + raise exc + except websockets.ConnectionClosed as exc: + await self._handle_connection_loss() + raise exc + + async def register_callback( + self, event_name: str, callback: Callable, temporary: bool = False + ): + return self._events_handler.register_callback( + event_name, callback, temporary + ) + + async def remove_callback(self, callback_id: int): + return self._events_handler.remove_callback(callback_id) + + async def clear_callbacks(self): + return self._events_handler.clear_callbacks() + + async def close(self): + """ + Closes the WebSocket connection. + + Closes the WebSocket connection and clears all event callbacks. + """ + await self.clear_callbacks() + await self._ws_connection.close() + logger.info('WebSocket connection closed.') + + async def _ensure_active_connection(self): + """Guarantee an active connection exists.""" + if self._ws_connection is None or self._ws_connection.closed: + await self._establish_new_connection() + + async def _establish_new_connection(self): + """Create fresh connection and start listening.""" + ws_address = await self._resolve_ws_address() + logger.info(f'Connecting to {ws_address}') + self._ws_connection = await self._ws_connector(ws_address) + self._receive_task = asyncio.create_task(self._receive_events()) + logger.debug('WebSocket connection established') + + async def _resolve_ws_address(self): + """Determine correct WebSocket address.""" + if 'browser' in self._page_id: + return await self._ws_address_resolver(self._connection_port) + return ( + f'ws://localhost:{self._connection_port}/devtools/page/' + f'{self._page_id}' + ) + + async def _handle_connection_loss(self): + """Clean up after connection loss.""" + if self._ws_connection and not self._ws_connection.closed: + await self._ws_connection.close() + self._ws_connection = None + + if self._receive_task and not self._receive_task.done(): + self._receive_task.cancel() + + logger.info('Connection resources cleaned up') + + async def _receive_events(self): + """ + Main loop for receiving and processing incoming WebSocket messages. + Delegates processing to specialized handlers based on message type. + """ + try: + async for raw_message in self._incoming_messages(): + await self._process_single_message(raw_message) + except websockets.ConnectionClosed as e: + logger.info(f'Connection closed gracefully: {e}') + except Exception as e: + logger.error(f'Unexpected error in event loop: {e}') + raise + + async def _incoming_messages(self): + """Generator that yields raw messages while connection is open""" + while not self._ws_connection.closed: + yield await self._ws_connection.recv() + + async def _process_single_message(self, raw_message: str): + """Orchestrates processing of a single raw WebSocket message""" + message = self._parse_message(raw_message) + if not message: + return + + if self._is_command_response(message): + await self._handle_command_message(message) + else: + await self._handle_event_message(message) + + @staticmethod + def _parse_message(raw_message: str) -> dict | None: + """ + Attempts to parse raw message string into JSON. + Returns parsed dict or None if parsing fails. + """ + try: + return json.loads(raw_message) + except json.JSONDecodeError: + logger.warning(f'Failed to parse message: {raw_message[:200]}...') + return None + + @staticmethod + def _is_command_response(message: dict) -> bool: + """Determines if message is a response to a command""" + return 'id' in message and isinstance(message['id'], int) + + async def _handle_command_message(self, message: dict): + """Processes messages that are command responses""" + logger.debug(f'Processing command response: {message.get("id")}') + self._command_manager.resolve_command( + message['id'], json.dumps(message) + ) + + async def _handle_event_message(self, message: dict): + """Processes messages that are spontaneous events""" + event_type = message.get('method', 'unknown-event') + logger.debug(f'Processing {event_type} event') + await self._events_handler.process_event(message) + + def __repr__(self): + return f'ConnectionHandler(port={self._connection_port})' + + def __str__(self): + return f'ConnectionHandler(port={self._connection_port})' + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() diff --git a/pydoll/connection/managers.py b/pydoll/connection/managers.py new file mode 100644 index 00000000..52339cd5 --- /dev/null +++ b/pydoll/connection/managers.py @@ -0,0 +1,129 @@ +import asyncio +import logging +from typing import Callable, Dict + +from pydoll import exceptions + +logger = logging.getLogger(__name__) + + +class CommandManager: + def __init__(self): + self._pending_commands: dict[int, asyncio.Future] = {} + self._id = 1 + + def create_command_future(self, command: dict) -> asyncio.Future: + command['id'] = self._id + future = asyncio.Future() + self._pending_commands[self._id] = future + self._id += 1 + return future + + def resolve_command(self, response_id: int, result: str): + if response_id in self._pending_commands: + self._pending_commands[response_id].set_result(result) + del self._pending_commands[response_id] + + def remove_pending_command(self, command_id: int): + """ + Remove um comando pendente sem resolvê-lo (útil para timeouts). + + Args: + command_id: ID do comando a ser removido + """ + if command_id in self._pending_commands: + del self._pending_commands[command_id] + + +class EventsHandler: + """ + Gerencia registro de callbacks, processamento de eventos e logs de rede. + """ + + def __init__(self): + self._event_callbacks: Dict[int, dict] = {} + self._callback_id = 0 + self.network_logs = [] + logger.info('EventsHandler initialized') + + def register_callback( + self, event_name: str, callback: Callable, temporary: bool = False + ) -> int: + """ + Registra um callback para um tipo específico de evento. + + Retorna: + int: ID do callback registrado + """ + if not callable(callback): + logger.error('Callback must be a callable function.') + raise exceptions.InvalidCallback('Callback must be callable') + + self._callback_id += 1 + self._event_callbacks[self._callback_id] = { + 'event': event_name, + 'callback': callback, + 'temporary': temporary, + } + logger.info( + f"Registered callback '{event_name}' with ID {self._callback_id}" + ) + return self._callback_id + + def remove_callback(self, callback_id: int) -> bool: + """Remove um callback pelo ID.""" + if callback_id not in self._event_callbacks: + logger.warning(f'Callback ID {callback_id} not found') + return False + + del self._event_callbacks[callback_id] + logger.info(f'Removed callback ID {callback_id}') + return True + + def clear_callbacks(self): + """Reseta todos os callbacks registrados.""" + self._event_callbacks.clear() + logger.info('All callbacks cleared') + + async def process_event(self, event_data: dict): + """ + Processa um evento recebido e dispara os callbacks correspondentes. + + Args: + event_data: Dados do evento no formato dicionário + """ + event_name = event_data.get('method') + logger.debug(f'Processing event: {event_name}') + + # Atualiza logs de rede se necessário + if 'Network.requestWillBeSent' in event_name: + self._update_network_logs(event_data) + + # Processa callbacks + await self._trigger_callbacks(event_name, event_data) + + def _update_network_logs(self, event_data: dict): + """Mantém os logs de rede atualizados.""" + self.network_logs.append(event_data) + self.network_logs = self.network_logs[-10000:] # Mantém tamanho máximo + + async def _trigger_callbacks(self, event_name: str, event_data: dict): + """Dispara todos os callbacks registrados para o evento.""" + callbacks_to_remove = [] + + for cb_id, cb_data in list(self._event_callbacks.items()): + if cb_data['event'] == event_name: + try: + if asyncio.iscoroutinefunction(cb_data['callback']): + await cb_data['callback'](event_data) + else: + cb_data['callback'](event_data) + except Exception as e: + logger.error(f'Error in callback {cb_id}: {str(e)}') + + if cb_data['temporary']: + callbacks_to_remove.append(cb_id) + + # Remove callbacks temporários após processamento + for cb_id in callbacks_to_remove: + self.remove_callback(cb_id) diff --git a/pydoll/constants.py b/pydoll/constants.py index 593f0b8d..035be134 100644 --- a/pydoll/constants.py +++ b/pydoll/constants.py @@ -2,7 +2,7 @@ class By(str, Enum): - CSS = 'css' + CSS_SELECTOR = 'css' XPATH = 'xpath' CLASS_NAME = 'class_name' ID = 'id' diff --git a/pydoll/element.py b/pydoll/element.py index 2055aeee..a349228a 100644 --- a/pydoll/element.py +++ b/pydoll/element.py @@ -9,7 +9,7 @@ from pydoll.commands.input import InputCommands from pydoll.commands.page import PageCommands from pydoll.commands.runtime import RuntimeCommands -from pydoll.connection import ConnectionHandler +from pydoll.connection.connection import ConnectionHandler from pydoll.constants import Scripts from pydoll.mixins.find_elements import FindElementsMixin from pydoll.utils import decode_image_to_bytes diff --git a/pydoll/events/__init__.py b/pydoll/events/__init__.py new file mode 100644 index 00000000..c39ef38a --- /dev/null +++ b/pydoll/events/__init__.py @@ -0,0 +1,13 @@ +from pydoll.events.browser import BrowserEvents +from pydoll.events.dom import DomEvents +from pydoll.events.fetch import FetchEvents +from pydoll.events.network import NetworkEvents +from pydoll.events.page import PageEvents + +__all__ = [ + 'BrowserEvents', + 'DomEvents', + 'FetchEvents', + 'NetworkEvents', + 'PageEvents', +] diff --git a/pydoll/mixins/__init__.py b/pydoll/mixins/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pydoll/utils.py b/pydoll/utils.py index 39348758..d827ff4d 100644 --- a/pydoll/utils.py +++ b/pydoll/utils.py @@ -39,19 +39,12 @@ async def get_browser_ws_address(port: int) -> str: ) as response: response.raise_for_status() data = await response.json() - logger.info('Browser WebSocket address fetched successfully.') return data['webSocketDebuggerUrl'] except aiohttp.ClientError as e: - logger.error( - 'Failed to fetch browser WebSocket address due to network error.' - ) raise exceptions.NetworkError(f'Failed to get browser ws address: {e}') except KeyError as e: - logger.error( - 'Failed to get browser WebSocket address due to missing data.' - ) raise exceptions.InvalidResponse( f'Failed to get browser ws address: {e}' ) diff --git a/pyproject.toml b/pyproject.toml index f6168dbb..55b492aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ pytest = "^8.3.3" taskipy = "^1.14.0" pytest-asyncio = "^0.24.0" pytest-cov = "^6.0.0" +aioresponses = "^0.7.7" [build-system] requires = ["poetry-core"] @@ -31,6 +32,7 @@ line-length = 79 [tool.ruff.lint] preview = true select = ['I', 'F', 'E', 'W', 'PL', 'PT'] +exclude = ['tests', 'tests/*'] [tool.ruff.format] preview = true diff --git a/tests/conftest.py b/tests/conftest.py deleted file mode 100644 index 4b28d854..00000000 --- a/tests/conftest.py +++ /dev/null @@ -1,50 +0,0 @@ -import asyncio -import json - -import pytest_asyncio -import websockets - -from pydoll.connection import ConnectionHandler - - -@pytest_asyncio.fixture -async def ws_server(): - async def echo_server(websocket, path): - try: - # Função para enviar um evento - async def send_event(): - await asyncio.sleep(0.1) - await websocket.send( - json.dumps({ - 'method': 'Network.requestWillBeSent', - 'params': {}, - }) - ) - - # Envio de evento em paralelo com a recepção de mensagens - send_event_task = asyncio.create_task(send_event()) - - async for message in websocket: - data = json.loads(message) - if 'id' in data: - response = json.dumps({ - 'id': data['id'], - 'result': 'success', - }) - await websocket.send(response) - - # Espera a tarefa do evento ser concluída antes de fechar a conexão - await send_event_task - except websockets.ConnectionClosed: - pass - - server = await websockets.serve(echo_server, 'localhost', 9222) - - yield server - server.close() - await server.wait_closed() - - -@pytest_asyncio.fixture(scope='function') -async def handler(ws_server): - return ConnectionHandler(connection_port=9222) diff --git a/tests/test_browser_commands.py b/tests/test_browser_commands.py new file mode 100644 index 00000000..4a57f063 --- /dev/null +++ b/tests/test_browser_commands.py @@ -0,0 +1,57 @@ +from pydoll.commands.browser import BrowserCommands + + +def test_close(): + expected_command = {'method': 'Browser.close'} + assert BrowserCommands.close() == expected_command + + +def test_get_window_id(): + expected_command = {'method': 'Browser.WindowID'} + assert BrowserCommands.get_window_id() == expected_command + + +def test_set_download_path(): + path = '/path/to/download' + expected_command = { + 'method': 'Browser.setDownloadBehavior', + 'params': {'behavior': 'allow', 'downloadPath': path}, + } + assert BrowserCommands.set_download_path(path) == expected_command + + +def test_set_window_bounds(): + window_id = 1 + bounds = {'width': 800, 'height': 600} + expected_command = { + 'method': 'Browser.setWindowBounds', + 'params': {'windowId': window_id, 'bounds': bounds}, + } + assert ( + BrowserCommands.set_window_bounds(window_id, bounds) + == expected_command + ) + + +def test_set_window_maximized(): + window_id = 1 + expected_command = { + 'method': 'Browser.setWindowBounds', + 'params': { + 'windowId': window_id, + 'bounds': {'windowState': 'maximized'}, + }, + } + assert BrowserCommands.set_window_maximized(window_id) == expected_command + + +def test_set_window_minimized(): + window_id = 1 + expected_command = { + 'method': 'Browser.setWindowBounds', + 'params': { + 'windowId': window_id, + 'bounds': {'windowState': 'minimized'}, + }, + } + assert BrowserCommands.set_window_minimized(window_id) == expected_command diff --git a/tests/test_browser_managers.py b/tests/test_browser_managers.py new file mode 100644 index 00000000..c14f37e7 --- /dev/null +++ b/tests/test_browser_managers.py @@ -0,0 +1,150 @@ +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from pydoll.browser.managers import ( + BrowserOptionsManager, + BrowserProcessManager, + ProxyManager, + TempDirectoryManager, +) +from pydoll.browser.options import Options + + +@pytest.fixture +def proxy_options(): + return Options() + + +@pytest.fixture +def temp_manager(): + mock_dir = MagicMock() + mock_dir.name = '/fake/temp/dir' + return TempDirectoryManager(temp_dir_factory=lambda: mock_dir) + + +@pytest.fixture +def process_manager(): + mock_creator = Mock(return_value=MagicMock()) + return BrowserProcessManager(process_creator=mock_creator) + + +def test_proxy_manager_no_proxy(proxy_options): + manager = ProxyManager(proxy_options) + result = manager.get_proxy_credentials() + + assert result[0] is False + assert result[1] == (None, None) + + +def test_proxy_manager_with_credentials(proxy_options): + proxy_options.add_argument('--proxy-server=user:pass@example.com') + manager = ProxyManager(proxy_options) + result = manager.get_proxy_credentials() + + assert result[0] is True + assert result[1] == ('user', 'pass') + assert proxy_options.arguments == ['--proxy-server=example.com'] + + +def test_proxy_manager_invalid_credentials_format(proxy_options): + proxy_options.add_argument('--proxy-server=invalidformat@example.com') + manager = ProxyManager(proxy_options) + result = manager.get_proxy_credentials() + + assert result[0] is False + assert result[1] == (None, None) + assert proxy_options.arguments == [ + '--proxy-server=invalidformat@example.com' + ] + + +def test_proxy_manager_invalid_proxy_format(proxy_options): + proxy_options.add_argument('--proxy-server=invalidformat') + manager = ProxyManager(proxy_options) + result = manager.get_proxy_credentials() + + assert result[0] is False + assert result[1] == (None, None) + + +def test_start_browser_process(process_manager): + binary = '/fake/path/browser' + port = 9222 + args = ['--test-arg'] + + process_manager.start_browser_process(binary, port, args) + + expected_command = [binary, f'--remote-debugging-port={port}', *args] + process_manager._process_creator.assert_called_once_with(expected_command) + assert process_manager._process is not None + + +def test_stop_process(process_manager): + mock_process = MagicMock() + process_manager._process = mock_process + + process_manager.stop_process() + + mock_process.terminate.assert_called_once() + + +def test_create_temp_dir(temp_manager): + temp_dir = temp_manager.create_temp_dir() + + assert len(temp_manager._temp_dirs) == 1 + assert temp_dir.name == '/fake/temp/dir' + + +def test_cleanup_temp_dirs(temp_manager): + mock_dir1 = MagicMock() + mock_dir2 = MagicMock() + temp_manager._temp_dirs = [mock_dir1, mock_dir2] + + with patch('shutil.rmtree') as mock_rmtree: + temp_manager.cleanup() + + assert mock_rmtree.call_count == 2 + mock_rmtree.assert_any_call(mock_dir1.name) + mock_rmtree.assert_any_call(mock_dir2.name) + + +def test_initialize_options_with_none(): + result = BrowserOptionsManager.initialize_options(None) + + assert isinstance(result, Options) + assert result.arguments == [] + + +def test_initialize_options_with_valid_options(): + options = Options() + options.add_argument('--test') + result = BrowserOptionsManager.initialize_options(options) + + assert result is options + assert result.arguments == ['--test'] + + +def test_initialize_options_with_invalid_type(): + with pytest.raises(ValueError): + BrowserOptionsManager.initialize_options('invalid') + + +def test_add_default_arguments(): + options = Options() + BrowserOptionsManager.add_default_arguments(options) + + assert '--no-first-run' in options.arguments + assert '--no-default-browser-check' in options.arguments + + +def test_validate_browser_path_valid(): + with patch('os.path.exists', return_value=True): + result = BrowserOptionsManager.validate_browser_path('/fake/path') + assert result == '/fake/path' + + +def test_validate_browser_path_invalid(): + with patch('os.path.exists', return_value=False): + with pytest.raises(ValueError): + BrowserOptionsManager.validate_browser_path('/fake/path') diff --git a/tests/test_browser_options.py b/tests/test_browser_options.py new file mode 100644 index 00000000..30425938 --- /dev/null +++ b/tests/test_browser_options.py @@ -0,0 +1,41 @@ +import pytest + +from pydoll.browser.options import Options + + +def test_initial_arguments(): + options = Options() + assert options.arguments == [] + + +def test_initial_binary_location(): + options = Options() + assert not options.binary_location + + +def test_set_binary_location(): + options = Options() + options.binary_location = '/path/to/browser' + assert options.binary_location == '/path/to/browser' + + +def test_add_argument(): + options = Options() + options.add_argument('--headless') + assert options.arguments == ['--headless'] + + +def test_add_duplicate_argument(): + options = Options() + options.add_argument('--headless') + with pytest.raises( + ValueError, match='Argument already exists: --headless' + ): + options.add_argument('--headless') + + +def test_add_multiple_arguments(): + options = Options() + options.add_argument('--headless') + options.add_argument('--no-sandbox') + assert options.arguments == ['--headless', '--no-sandbox'] diff --git a/tests/test_browser_page.py b/tests/test_browser_page.py new file mode 100644 index 00000000..51debac5 --- /dev/null +++ b/tests/test_browser_page.py @@ -0,0 +1,422 @@ +import pytest +import pytest_asyncio +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +from pydoll.browser.page import Page +from pydoll.element import WebElement + +from pydoll.commands import ( + DomCommands, + RuntimeCommands, + NetworkCommands, + StorageCommands, + PageCommands, + FetchCommands, +) + + +@pytest_asyncio.fixture +async def mock_connection_handler(): + with patch('pydoll.browser.page.ConnectionHandler', autospec=True) as mock: + handler = mock.return_value + handler.execute_command = AsyncMock() + handler.register_callback = AsyncMock() + handler.network_logs = [] + yield handler + + +@pytest_asyncio.fixture +async def page(mock_connection_handler): + page = Page(connection_port=9223, page_id='test_page') + page._connection_handler = mock_connection_handler + return page + + +@pytest.mark.asyncio +async def test_page_initialization(page): + assert page._connection_handler is not None + assert not page.page_events_enabled + assert not page.network_events_enabled + assert not page.fetch_events_enabled + assert not page.dom_events_enabled + + +@pytest.mark.asyncio +async def test_current_url(page): + page._connection_handler.execute_command.return_value = { + 'result': {'result': {'value': 'https://example.com'}} + } + + url = await page.current_url + assert url == 'https://example.com' + page._connection_handler.execute_command.assert_called_once_with( + DomCommands.get_current_url(), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_page_source(page): + page._connection_handler.execute_command.return_value = { + 'result': {'result': {'value': 'Test'}} + } + + source = await page.page_source + assert source == 'Test' + page._connection_handler.execute_command.assert_called_once() + + +@pytest.mark.asyncio +async def test_get_cookies(page): + test_cookies = [{'name': 'test', 'value': 'value'}] + page._connection_handler.execute_command.return_value = { + 'result': {'cookies': test_cookies} + } + + cookies = await page.get_cookies() + assert cookies == test_cookies + page._connection_handler.execute_command.assert_called_once_with( + NetworkCommands.get_all_cookies(), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_set_cookies(page): + test_cookies = [{'name': 'test', 'value': 'value'}] + await page.set_cookies(test_cookies) + page._connection_handler.execute_command.assert_called_once_with( + NetworkCommands.set_cookies(test_cookies), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_delete_all_cookies(page): + await page.delete_all_cookies() + assert page._connection_handler.execute_command.call_count == 2 + page._connection_handler.execute_command.assert_any_call( + StorageCommands.clear_cookies(), timeout=60 + ) + page._connection_handler.execute_command.assert_any_call( + NetworkCommands.clear_browser_cookies(), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_go_to_success(page): + page._wait_page_load = AsyncMock(return_value=None) + page._connection_handler.execute_command.return_value = { + 'result': {'result': {'value': 'https://another.com'}} + } + await page.go_to('https://example.com') + page._connection_handler.execute_command.assert_called_with( + PageCommands.go_to('https://example.com'), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_go_to_timeout(page): + page._connection_handler.execute_command.return_value = { + 'result': {'result': {'value': 'loading'}} + } + page._wait_page_load = AsyncMock( + side_effect=asyncio.TimeoutError('Timeout') + ) + with pytest.raises(TimeoutError): + await page.go_to('https://example.com', timeout=0) + + +@pytest.mark.asyncio +async def test_refresh(page): + page._connection_handler.execute_command.side_effect = [ + {'result': {'result': {'value': 'complete'}}}, + ] + page._wait_page_load = AsyncMock(return_value=None) + await page.refresh() + page._connection_handler.execute_command.assert_called_with( + PageCommands.refresh(), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_get_screenshot(page, tmp_path): + test_image = b'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/wcAAgAB/edzE+oAAAAASUVORK5CYII=' + page._connection_handler.execute_command.return_value = { + 'result': {'data': test_image.decode()} + } + + screenshot_path = tmp_path / 'screenshot.png' + with patch('aiofiles.open') as mock_open: + mock_open.return_value.__aenter__.return_value.write = AsyncMock() + await page.get_screenshot(str(screenshot_path)) + + page._connection_handler.execute_command.assert_called_once_with( + PageCommands.screenshot(), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_enable_events(page): + await page.enable_page_events() + assert page.page_events_enabled + page._connection_handler.execute_command.assert_called_once_with( + PageCommands.enable_page(), timeout=60 + ) + + await page.enable_network_events() + assert page.network_events_enabled + page._connection_handler.execute_command.assert_any_call( + NetworkCommands.enable_network_events(), timeout=60 + ) + + await page.enable_fetch_events() + assert page.fetch_events_enabled + page._connection_handler.execute_command.assert_any_call( + FetchCommands.enable_fetch_events(False, 'Document'), timeout=60 + ) + + await page.enable_dom_events() + assert page.dom_events_enabled + page._connection_handler.execute_command.assert_any_call( + DomCommands.enable_dom_events(), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_disable_events(page): + await page.disable_fetch_events() + assert not page.fetch_events_enabled + page._connection_handler.execute_command.assert_called_once_with( + FetchCommands.disable_fetch_events(), timeout=60 + ) + + await page.disable_page_events() + assert not page.page_events_enabled + page._connection_handler.execute_command.assert_any_call( + PageCommands.disable_page(), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_execute_script(page): + test_script = 'return document.title' + page._connection_handler.execute_command.return_value = { + 'result': {'result': {'value': 'Test Page'}} + } + + result = await page.execute_script(test_script) + page._connection_handler.execute_command.assert_called_once_with( + RuntimeCommands.evaluate_script(test_script), timeout=60 + ) + + # Test with element context + element = WebElement( + object_id='test_id', connection_handler=page._connection_handler + ) + await page.execute_script('argument.click()', element) + page._connection_handler.execute_command.assert_called_with( + RuntimeCommands.call_function_on( + 'test_id', 'function(){ this.click() }', return_by_value=True + ), + timeout=60, + ) + + +@pytest.mark.asyncio +async def test_get_network_logs(page): + page._connection_handler.network_logs = [ + {'params': {'request': {'url': 'https://example.com/api'}}}, + {'params': {'request': {'url': 'https://example.com/other'}}}, + ] + + logs = await page.get_network_logs(['api']) + assert len(logs) == 1 + assert logs[0]['params']['request']['url'] == 'https://example.com/api' + + with pytest.raises(LookupError): + await page.get_network_logs(['nonexistent']) + + +@pytest.mark.asyncio +async def test_get_network_response_body(page): + page._connection_handler.execute_command.return_value = { + 'result': {'body': '{"key": "value"}', 'base64Encoded': False} + } + + body, encoded = await page.get_network_response_body('request_id') + assert body == '{"key": "value"}' + assert not encoded + page._connection_handler.execute_command.assert_called_once_with( + NetworkCommands.get_response_body('request_id'), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_has_dialog(page): + page._connection_handler.dialog = {'params': {'type': 'alert'}} + + result = await page.has_dialog() + assert result is True + + page._connection_handler.dialog = None + result = await page.has_dialog() + assert result is False + + +@pytest.mark.asyncio +async def test_get_dialog_message(page): + page._connection_handler.dialog = {'params': {'message': 'Test message'}} + + message = await page.get_dialog_message() + assert message == 'Test message' + + page._connection_handler.dialog = None + with pytest.raises(LookupError): + await page.get_dialog_message() + + +@pytest.mark.asyncio +async def test_accept_dialog(page): + page._connection_handler.dialog = {'params': {'type': 'alert'}} + await page.accept_dialog() + page._connection_handler.execute_command.assert_called_once_with( + PageCommands.handle_dialog(True), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_accept_dialog_no_dialog(page): + page._connection_handler.dialog = None + with pytest.raises(LookupError): + await page.accept_dialog() + + +@pytest.mark.asyncio +async def test_go_to_same_url(page): + page._connection_handler.execute_command.return_value = { + 'result': {'result': {'value': 'https://example.com'}} + } + page._wait_page_load = AsyncMock(return_value=None) + await page.go_to('https://example.com') + page._connection_handler.execute_command.assert_called_with( + PageCommands.refresh(), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_refresh_timeout(page): + page._wait_page_load = AsyncMock( + side_effect=asyncio.TimeoutError('Timeout') + ) + with pytest.raises(TimeoutError): + await page.refresh() + + +@pytest.mark.asyncio +async def test_set_download_path(page): + await page.set_download_path('/tmp') + page._connection_handler.execute_command.assert_called_once_with( + PageCommands.set_download_path('/tmp'), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_get_pdf_base64(page): + response = {'result': {'data': 'test_pdf'}} + page._connection_handler.execute_command.return_value = response + pdf = await page.get_pdf_base64() + assert pdf == 'test_pdf' + page._connection_handler.execute_command.assert_called_once_with( + PageCommands.print_to_pdf(), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_print_to_pdf(page): + response = { + 'result': { + 'data': 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/wcAAgAB/edzE+oAAAAASUVORK5CYII=' + } + } + page._connection_handler.execute_command.return_value = response + with patch('aiofiles.open') as mock_open: + mock_open.return_value.__aenter__.return_value.write = AsyncMock() + await page.print_to_pdf('/tmp/test.pdf') + page._connection_handler.execute_command.assert_called_once_with( + PageCommands.print_to_pdf('/tmp/test.pdf'), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_get_network_logs(page): + page._connection_handler.network_logs = [ + {'params': {'request': {'url': 'https://example.com/request'}}}, + {'params': {'otherkey': {}}}, + ] + + logs = await page.get_network_logs(['request']) + assert logs[0]['params']['request']['url'] == 'https://example.com/request' + + with pytest.raises(LookupError): + await page.get_network_logs(['nonexistent']) + + +@pytest.mark.asyncio +async def test_get_network_response_bodies(page): + page._connection_handler.network_logs = [ + { + 'params': { + 'request': {'url': 'https://example.com/api'}, + 'requestId': 'request_id', + } + }, + { + 'params': { + 'request': {'url': 'https://example.com/other'}, + 'requestId': 'other_id', + } + }, + ] + page.get_network_response_body = AsyncMock( + return_value=('{"key": "value"}', False) + ) + matches = ['api'] + + responses = await page.get_network_response_bodies(matches) + assert responses[0] == {'key': 'value'} + + with pytest.raises(LookupError): + await page.get_network_response_bodies(['nonexistent']) + + +@pytest.mark.asyncio +async def test_get_network_response_bodies_keyerror(page): + page._connection_handler.network_logs = [ + {'params': {'request': {'url': 'https://example.com/api'}}}, + {'params': {'request': {'url': 'https://example.com/other'}}}, + ] + + matches = ['api'] + + assert await page.get_network_response_bodies(matches) == [] + + +@pytest.mark.asyncio +async def test__wait_page_load(page): + page._connection_handler.execute_command.return_value = { + 'result': {'result': {'value': 'complete'}} + } + await page._wait_page_load() + page._connection_handler.execute_command.assert_called_once_with( + RuntimeCommands.evaluate_script('document.readyState'), timeout=60 + ) + + +@pytest.mark.asyncio +async def test__wait_page_load_timeout(page): + page._connection_handler.execute_command.return_value = { + 'result': {'result': {'value': 'loading'}} + } + + with patch('pydoll.browser.page.asyncio.sleep', AsyncMock()): + with pytest.raises(asyncio.TimeoutError): + await page._wait_page_load(timeout=0.1) diff --git a/tests/test_chrome.py b/tests/test_chrome.py new file mode 100644 index 00000000..a64caa9e --- /dev/null +++ b/tests/test_chrome.py @@ -0,0 +1,359 @@ +from unittest.mock import ANY, AsyncMock, MagicMock, patch + +import pytest +import pytest_asyncio + +from pydoll import exceptions +from pydoll.browser.base import Browser +from pydoll.browser.managers import ( + ProxyManager, +) +from pydoll.browser.options import Options +from pydoll.browser.page import Page +from pydoll.commands.browser import BrowserCommands +from pydoll.commands.dom import DomCommands +from pydoll.commands.fetch import FetchCommands +from pydoll.commands.network import NetworkCommands +from pydoll.commands.page import PageCommands +from pydoll.commands.storage import StorageCommands +from pydoll.commands.target import TargetCommands +from pydoll.events.fetch import FetchEvents + + +class ConcreteBrowser(Browser): + def _get_default_binary_location(self) -> str: + return '/fake/path/to/browser' + + +@pytest_asyncio.fixture +async def mock_browser(): + with patch.multiple( + Browser, + _get_default_binary_location=MagicMock( + return_value='/fake/path/to/browser' + ), + ), patch( + 'pydoll.browser.managers.BrowserProcessManager', + autospec=True, + ) as mock_process_manager, patch( + 'pydoll.browser.managers.TempDirectoryManager', + autospec=True, + ) as mock_temp_dir_manager, patch( + 'pydoll.connection.connection.ConnectionHandler', + autospec=True, + ) as mock_conn_handler, patch( + 'pydoll.browser.managers.ProxyManager', + autospec=True, + ) as mock_proxy_manager: + options = Options() + options.binary_location = None + + browser = ConcreteBrowser(options=options) + browser._browser_process_manager = mock_process_manager.return_value + browser._temp_directory_manager = mock_temp_dir_manager.return_value + browser._proxy_manager = mock_proxy_manager.return_value + browser._connection_handler = mock_conn_handler.return_value + browser._connection_handler.execute_command = AsyncMock() + browser._connection_handler.register_callback = AsyncMock() + + mock_temp_dir_manager.return_value.create_temp_dir.return_value = ( + MagicMock(name='temp_dir') + ) + browser._pages = ['page1'] + + yield browser + + +@pytest.mark.asyncio +async def test_browser_initialization(mock_browser): + assert isinstance(mock_browser.options, Options) + assert isinstance(mock_browser._proxy_manager, ProxyManager) + assert mock_browser._connection_port in range(9223, 9323) + assert mock_browser._pages == ['page1'] + + +@pytest.mark.asyncio +async def test_start_browser_success(mock_browser): + mock_browser._connection_handler.ping.return_value = True + + await mock_browser.start() + + mock_browser._browser_process_manager.start_browser_process.assert_called_once_with( + '/fake/path/to/browser', + mock_browser._connection_port, + mock_browser.options.arguments, + ) + + assert '--user-data-dir=' in str(mock_browser.options.arguments), ( + 'Diretório temporário não configurado' + ) + + assert 'page1' in mock_browser._pages + + +@pytest.mark.asyncio +async def test_start_browser_failure(mock_browser): + mock_browser._connection_handler.ping.return_value = False + with patch('pydoll.browser.base.asyncio.sleep', AsyncMock()) as mock_sleep: + mock_sleep.return_value = False + with pytest.raises(exceptions.BrowserNotRunning): + await mock_browser.start() + + +@pytest.mark.asyncio +async def test_proxy_configuration(mock_browser): + mock_browser._proxy_manager.get_proxy_credentials = MagicMock( + return_value=(True, ('user', 'pass')) + ) + + await mock_browser.start() + + mock_browser._connection_handler.execute_command.assert_any_call( + FetchCommands.enable_fetch_events(True, '') + ) + mock_browser._connection_handler.register_callback.assert_any_call( + FetchEvents.REQUEST_PAUSED, ANY, True + ) + mock_browser._connection_handler.register_callback.assert_any_call( + FetchEvents.AUTH_REQUIRED, + ANY, + True, + ) + + +@pytest.mark.asyncio +async def test_get_page_existing(mock_browser): + page = await mock_browser.get_page() + assert isinstance(page, Page) + assert len(mock_browser._pages) == 0 + + +@pytest.mark.asyncio +async def test_get_page_new(mock_browser): + mock_browser._pages = [] + mock_browser._connection_handler.execute_command.return_value = { + 'result': {'targetId': 'new_page'} + } + + page = await mock_browser.get_page() + assert isinstance(page, Page) + assert len(mock_browser._pages) == 0 + + +@pytest.mark.asyncio +async def test_get_existing_page(mock_browser): + mock_browser._pages = [Page(1234, 'page1')] + mock_browser._connection_handler.execute_command.return_value = { + 'result': {'targetId': 'new_page'} + } + + page = await mock_browser.get_page() + assert isinstance(page, Page) + assert len(mock_browser._pages) == 0 + + +@pytest.mark.asyncio +async def test_cookie_management(mock_browser): + cookies = [{'name': 'test', 'value': '123'}] + await mock_browser.set_cookies(cookies) + mock_browser._connection_handler.execute_command.assert_any_await( + StorageCommands.set_cookies(cookies), timeout=60 + ) + mock_browser._connection_handler.execute_command.assert_any_await( + NetworkCommands.set_cookies(cookies), timeout=60 + ) + + mock_browser._connection_handler.execute_command.return_value = { + 'result': {'cookies': cookies} + } + result = await mock_browser.get_cookies() + assert result == cookies + + await mock_browser.delete_all_cookies() + mock_browser._connection_handler.execute_command.assert_any_await( + StorageCommands.clear_cookies(), timeout=60 + ) + mock_browser._connection_handler.execute_command.assert_any_await( + NetworkCommands.clear_browser_cookies(), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_event_registration(mock_browser): + callback = MagicMock() + mock_browser._connection_handler.register_callback.return_value = 123 + + callback_id = await mock_browser.on('test_event', callback, temporary=True) + assert callback_id == 123 + + mock_browser._connection_handler.register_callback.assert_called_with( + 'test_event', ANY, True + ) + + +@pytest.mark.asyncio +async def test_window_management(mock_browser): + mock_browser._connection_handler.execute_command.return_value = { + 'result': {'windowId': 'window1'} + } + + bounds = {'width': 800, 'height': 600} + await mock_browser.set_window_bounds(bounds) + mock_browser._connection_handler.execute_command.assert_any_await( + BrowserCommands.set_window_bounds('window1', bounds), timeout=60 + ) + + await mock_browser.set_window_maximized() + mock_browser._connection_handler.execute_command.assert_any_await( + BrowserCommands.set_window_maximized('window1'), timeout=60 + ) + + await mock_browser.set_window_minimized() + mock_browser._connection_handler.execute_command.assert_any_await( + BrowserCommands.set_window_minimized('window1'), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_stop_browser(mock_browser): + await mock_browser.stop() + mock_browser._connection_handler.execute_command.assert_any_await( + BrowserCommands.CLOSE, timeout=60 + ) + mock_browser._browser_process_manager.stop_process.assert_called_once() + mock_browser._temp_directory_manager.cleanup.assert_called_once() + + +@pytest.mark.asyncio +async def test_stop_browser_not_running(mock_browser): + mock_browser._connection_handler.ping.return_value = False + with patch('pydoll.browser.base.asyncio.sleep', AsyncMock()) as mock_sleep: + mock_sleep.return_value = False + with pytest.raises(exceptions.BrowserNotRunning): + await mock_browser.stop() + + +@pytest.mark.asyncio +async def test_context_manager(mock_browser): + async with mock_browser as browser: + assert browser == mock_browser + + mock_browser._temp_directory_manager.cleanup.assert_called_once() + mock_browser._browser_process_manager.stop_process.assert_called_once() + + +@pytest.mark.asyncio +async def test_enable_events(mock_browser): + await mock_browser.enable_page_events() + mock_browser._connection_handler.execute_command.assert_called_with( + PageCommands.enable_page() + ) + + await mock_browser.enable_network_events() + mock_browser._connection_handler.execute_command.assert_called_with( + NetworkCommands.enable_network_events() + ) + + await mock_browser.enable_dom_events() + mock_browser._connection_handler.execute_command.assert_called_with( + DomCommands.enable_dom_events() + ) + + await mock_browser.enable_fetch_events( + handle_auth_requests=True, resource_type='XHR' + ) + mock_browser._connection_handler.execute_command.assert_called_with( + FetchCommands.enable_fetch_events(True, 'XHR') + ) + + +@pytest.mark.asyncio +async def test_disable_events(mock_browser): + await mock_browser.disable_fetch_events() + mock_browser._connection_handler.execute_command.assert_called_with( + FetchCommands.disable_fetch_events() + ) + + +@pytest.mark.asyncio +async def test__continue_request(mock_browser): + await mock_browser._continue_request({'params': {'requestId': 'request1'}}) + mock_browser._connection_handler.execute_command.assert_called_with( + FetchCommands.continue_request('request1'), timeout=60 + ) + + +@pytest.mark.asyncio +async def test__continue_request_auth_required(mock_browser): + await mock_browser._continue_request_auth_required( + event={'params': {'requestId': 'request1'}}, + proxy_username='user', + proxy_password='pass', + ) + + mock_browser._connection_handler.execute_command.assert_any_call( + FetchCommands.continue_request_with_auth('request1', 'user', 'pass'), + timeout=60, + ) + + mock_browser._connection_handler.execute_command.assert_any_call( + FetchCommands.disable_fetch_events() + ) + + +def test__is_valid_page(mock_browser): + result = mock_browser._is_valid_page({ + 'type': 'page', + 'url': 'chrome://newtab/', + }) + assert result is True + + +def test__is_valid_page_not_a_page(mock_browser): + result = mock_browser._is_valid_page({ + 'type': 'tab', + 'url': 'chrome://newtab/', + }) + assert result is False + + +@pytest.mark.asyncio +async def test__get_valid_page(mock_browser): + pages = [ + { + 'type': 'page', + 'url': 'chrome://newtab/', + 'targetId': 'valid_page_id', + }, + { + 'type': 'page', + 'url': 'https://example.com/', + 'targetId': 'invalid_page_id', + }, + { + 'type': 'tab', + 'url': 'chrome://newtab/', + 'targetId': 'invalid_page_id', + }, + ] + + result = await mock_browser._get_valid_page(pages) + assert result == 'valid_page_id' + + +@pytest.mark.asyncio +async def test__get_valid_page_key_error(mock_browser): + pages = [ + {'type': 'page', 'url': 'chrome://newtab/'}, + {'type': 'page', 'url': 'https://example.com/'}, + {'type': 'tab', 'url': 'chrome://newtab/'}, + ] + + mock_browser._connection_handler.execute_command.return_value = { + 'result': {'targetId': 'new_page'} + } + result = await mock_browser._get_valid_page(pages) + assert result == 'new_page' + mock_browser._connection_handler.execute_command.assert_called_with( + TargetCommands.create_target(''), timeout=60 + ) diff --git a/tests/test_connection.py b/tests/test_connection.py deleted file mode 100644 index 2d29377e..00000000 --- a/tests/test_connection.py +++ /dev/null @@ -1,153 +0,0 @@ -import asyncio -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest - - -@pytest.mark.asyncio -async def test_connection_initialization(handler): - DEFAULT_PORT = 9222 - assert handler._connection_port == DEFAULT_PORT - assert handler._page_id == 'browser' - assert handler._connection is None - - -@pytest.mark.asyncio -async def test_connect_to_page(handler): - with patch( - 'pydoll.connection.get_browser_ws_address', new_callable=AsyncMock - ) as mock_get_browser_ws_address: - mock_get_browser_ws_address.return_value = 'ws://localhost:9222' - await handler.connect_to_page() - assert handler._connection is not None - - -@pytest.mark.asyncio -async def test_execute_command(handler): - with patch( - 'pydoll.connection.get_browser_ws_address', new_callable=AsyncMock - ) as mock_get_browser_ws_address: - mock_get_browser_ws_address.return_value = 'ws://localhost:9222' - await handler.connect_to_page() - response = await handler.execute_command({'method': 'test'}) - assert response == {'id': 1, 'result': 'success'} - - -@pytest.mark.asyncio -async def test_id_increment(handler): - EXPECTED_ID = 3 - with patch( - 'pydoll.connection.get_browser_ws_address', new_callable=AsyncMock - ) as mock_get_browser_ws_address: - mock_get_browser_ws_address.return_value = 'ws://localhost:9222' - await handler.connect_to_page() - await handler.execute_command({'method': 'test'}) - await handler.execute_command({'method': 'test'}) - assert handler._id == EXPECTED_ID - - -@pytest.mark.asyncio -async def test_execute_command_timeout(handler): - with patch( - 'pydoll.connection.get_browser_ws_address', new_callable=AsyncMock - ) as mock_get_browser_ws_address: - mock_get_browser_ws_address.return_value = 'ws://localhost:9222' - await handler.connect_to_page() - with pytest.raises(asyncio.TimeoutError): - with patch( - 'pydoll.connection.asyncio.wait_for', - side_effect=asyncio.TimeoutError, - ): - await handler.execute_command({'method': 'test'}) - - -@pytest.mark.asyncio -async def test_register_callback(handler): - EXPECTED_CALLBACK_ID = 2 - with patch( - 'pydoll.connection.get_browser_ws_address', new_callable=AsyncMock - ) as mock_get_browser_ws_address: - mock_get_browser_ws_address.return_value = 'ws://localhost:9222' - await handler.connect_to_page() - callback = MagicMock() - await handler.register_callback('test', callback) - assert handler._callback_id == EXPECTED_CALLBACK_ID - assert handler._event_callbacks[1] == { - 'event': 'test', - 'callback': callback, - 'temporary': False, - } - - -@pytest.mark.asyncio -async def test_register_temporary_callback(handler): - EXPECTED_CALLBACK_ID = 2 - with patch( - 'pydoll.connection.get_browser_ws_address', new_callable=AsyncMock - ) as mock_get_browser_ws_address: - mock_get_browser_ws_address.return_value = 'ws://localhost:9222' - await handler.connect_to_page() - callback = MagicMock() - await handler.register_callback('test', callback, temporary=True) - assert handler._callback_id == EXPECTED_CALLBACK_ID - assert handler._event_callbacks[1] == { - 'event': 'test', - 'callback': callback, - 'temporary': True, - } - - -@pytest.mark.asyncio -async def test_callback_id_increment(handler): - EXPECTED_CALLBACK_ID = 3 - with patch( - 'pydoll.connection.get_browser_ws_address', new_callable=AsyncMock - ) as mock_get_browser_ws_address: - mock_get_browser_ws_address.return_value = 'ws://localhost:9222' - await handler.connect_to_page() - callback = MagicMock() - await handler.register_callback('test', callback) - await handler.register_callback('test', callback) - assert handler._callback_id == EXPECTED_CALLBACK_ID - - -@pytest.mark.asyncio -async def test_callback_execution(handler): - with patch( - 'pydoll.connection.get_browser_ws_address', new_callable=AsyncMock - ) as mock_get_browser_ws_address: - mock_get_browser_ws_address.return_value = 'ws://localhost:9222' - callback = MagicMock() - await handler.register_callback('Network.requestWillBeSent', callback) - await handler.connect_to_page() - await asyncio.sleep(0.2) - callback.assert_called_once() - - -@pytest.mark.asyncio -async def test_callback_removal(handler): - with patch( - 'pydoll.connection.get_browser_ws_address', new_callable=AsyncMock - ) as mock_get_browser_ws_address: - mock_get_browser_ws_address.return_value = 'ws://localhost:9222' - callback = MagicMock() - await handler.register_callback( - 'Network.requestWillBeSent', callback, temporary=True - ) - await handler.connect_to_page() - await asyncio.sleep(0.2) - callback.assert_called_once() - assert handler._event_callbacks == {} - - -@pytest.mark.asyncio -async def test_network_events_are_being_saved(handler): - with patch( - 'pydoll.connection.get_browser_ws_address', new_callable=AsyncMock - ) as mock_get_browser_ws_address: - mock_get_browser_ws_address.return_value = 'ws://localhost:9222' - await handler.connect_to_page() - await asyncio.sleep(0.2) - assert handler.network_logs == [ - {'method': 'Network.requestWillBeSent', 'params': {}} - ] diff --git a/tests/test_connection_handler.py b/tests/test_connection_handler.py new file mode 100644 index 00000000..638097f8 --- /dev/null +++ b/tests/test_connection_handler.py @@ -0,0 +1,320 @@ +import asyncio +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest +import pytest_asyncio +import websockets + +from pydoll import exceptions +from pydoll.connection.connection import ConnectionHandler + + +@pytest_asyncio.fixture +async def connection_handler(): + handler = ConnectionHandler(connection_port=9222) + handler._ws_connection = AsyncMock() + handler._ws_connection.closed = False + return handler + + +@pytest_asyncio.fixture +async def connection_handler_closed(): + handler = ConnectionHandler( + connection_port=9222, + ws_address_resolver=AsyncMock(return_value='ws://localhost:9222'), + ws_connector=AsyncMock(), + ) + handler._ws_connection = AsyncMock() + handler._ws_connection.closed = True + return handler + + +@pytest_asyncio.fixture +async def connection_handler_with_page_id(): + handler = ConnectionHandler( + page_id='ABCD', + connection_port=9222, + ws_address_resolver=AsyncMock(return_value='ws://localhost:9222'), + ws_connector=AsyncMock(), + ) + handler._ws_connection = AsyncMock() + handler._ws_connection.closed = True + return handler + + +@pytest.mark.asyncio +async def test_ping_success(connection_handler): + connection_handler._ws_connection.ping = AsyncMock() + result = await connection_handler.ping() + assert result is True + + +@pytest.mark.asyncio +async def test_ping_failure(connection_handler): + connection_handler._ws_connection.ping = AsyncMock( + side_effect=Exception('Ping failed') + ) + result = await connection_handler.ping() + assert result is False + + +@pytest.mark.asyncio +async def test_execute_command_success(connection_handler): + command = {'id': 1, 'method': 'SomeMethod'} + response = json.dumps({'id': 1, 'result': 'success'}) + + connection_handler._ws_connection.send = AsyncMock() + future = asyncio.Future() + future.set_result(response) + connection_handler._command_manager.create_command_future = MagicMock( + return_value=future + ) + result = await connection_handler.execute_command(command) + assert result == {'id': 1, 'result': 'success'} + + +@pytest.mark.asyncio +async def test_execute_command_invalid_command(connection_handler): + with pytest.raises(exceptions.InvalidCommand): + await connection_handler.execute_command('invalid') + + +@pytest.mark.asyncio +async def test_execute_command_timeout(connection_handler): + command = {'id': 2, 'method': 'TimeoutMethod'} + + connection_handler._ws_connection.send = AsyncMock() + connection_handler._command_manager.create_command_future = MagicMock( + return_value=asyncio.Future() + ) + + with pytest.raises(asyncio.TimeoutError): + await connection_handler.execute_command(command, timeout=0.1) + + +@pytest.mark.asyncio +async def test_execute_command_connection_closed_exception(connection_handler): + connection_handler._ws_connection.send = AsyncMock( + side_effect=websockets.ConnectionClosed( + 1000, 'Normal Closure', rcvd_then_sent=True + ) + ) + connection_handler._ws_connection.close = AsyncMock() + connection_handler._receive_task = AsyncMock(spec=asyncio.Task) + connection_handler._receive_task.done = MagicMock(return_value=False) + with pytest.raises(websockets.ConnectionClosed): + await connection_handler.execute_command({ + 'id': 1, + 'method': 'SomeMethod', + }) + + +@pytest.mark.asyncio +async def test_register_callback(connection_handler): + connection_handler._events_handler.register_callback = MagicMock( + return_value=123 + ) + callback_id = await connection_handler.register_callback( + 'event', lambda x: x + ) + assert callback_id == 123 + + +@pytest.mark.asyncio +async def test_remove_callback(connection_handler): + connection_handler._events_handler.remove_callback = MagicMock( + return_value=True + ) + result = await connection_handler.remove_callback(123) + assert result is True + + +@pytest.mark.asyncio +async def test_clear_callbacks(connection_handler): + connection_handler._events_handler.clear_callbacks = MagicMock( + return_value=None + ) + result = await connection_handler.clear_callbacks() + connection_handler._events_handler.clear_callbacks.assert_called_once() + assert result is None + + +@pytest.mark.asyncio +async def test_close(connection_handler): + connection_handler._ws_connection.close = AsyncMock() + connection_handler.clear_callbacks = AsyncMock() + + await connection_handler.close() + connection_handler.clear_callbacks.assert_awaited_once() + connection_handler._ws_connection.close.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_execute_command_connection_closed(connection_handler_closed): + mock_connector = AsyncMock( + return_value=connection_handler_closed._ws_connection + ) + connection_handler_closed._ws_connector = mock_connector + + command = {'id': 1, 'method': 'SomeMethod'} + response = json.dumps({'id': 1, 'result': 'success'}) + + connection_handler_closed._ws_connection.send = AsyncMock() + future = asyncio.Future() + future.set_result(response) + connection_handler_closed._command_manager.create_command_future = ( + MagicMock(return_value=future) + ) + result = await connection_handler_closed.execute_command(command) + mock_connector.assert_awaited_once() # Verifica se tentou reconectar + connection_handler_closed._ws_connection.send.assert_awaited_once_with( + json.dumps(command) + ) + assert result == {'id': 1, 'result': 'success'} + + +@pytest.mark.asyncio +async def test__is_command_response_true(connection_handler): + command = {'id': 1, 'method': 'SomeMethod'} + result = connection_handler._is_command_response(command) + assert result is True + + +@pytest.mark.asyncio +async def test__is_command_response_false(connection_handler): + command = {'id': 'string', 'method': 'SomeMethod'} + result = connection_handler._is_command_response(command) + assert result is False + + +@pytest.mark.asyncio +async def test__resolve_ws_address_with_page_id( + connection_handler_with_page_id, +): + result = await connection_handler_with_page_id._resolve_ws_address() + assert result == 'ws://localhost:9222/devtools/page/ABCD' + + +@pytest.mark.asyncio +async def test__incoming_messages(connection_handler): + connection_handler._ws_connection.recv = AsyncMock( + return_value='{"id": 1, "method": "SomeMethod"}' + ) + async_generator = connection_handler._incoming_messages() + result = await anext(async_generator) + assert result == '{"id": 1, "method": "SomeMethod"}' + + +@pytest.mark.asyncio +async def test__process_single_message(connection_handler): + raw_message = '{"id": 1, "method": "SomeMethod"}' + connection_handler._command_manager.resolve_command = MagicMock() + await connection_handler._process_single_message(raw_message) + connection_handler._command_manager.resolve_command.assert_called_once_with( + 1, raw_message + ) + + +@pytest.mark.asyncio +async def test__process_single_message_invalid_command(connection_handler): + raw_message = 'not a valid JSON' + result = await connection_handler._process_single_message(raw_message) + assert result is None + + +@pytest.mark.asyncio +async def test__process_single_message_event(connection_handler): + event = {'method': 'SomeEvent'} + connection_handler._events_handler.process_event = AsyncMock() + await connection_handler._process_single_message(json.dumps(event)) + connection_handler._events_handler.process_event.assert_called_once_with( + event + ) + + +@pytest.mark.asyncio +async def test__process_single_message_event_with_callback(connection_handler): + event = {'method': 'SomeEvent'} + callback = MagicMock(return_value=None) + await connection_handler.register_callback('SomeEvent', callback) + await connection_handler._process_single_message(json.dumps(event)) + callback.assert_called_once_with(event) + + +@pytest.mark.asyncio +async def test__receive_events_flow(connection_handler): + async def fake_incoming_messages(): + yield '{"id": 1, "method": "TestCommand"}' + yield '{"method": "TestEvent"}' + + connection_handler._incoming_messages = fake_incoming_messages + + connection_handler._handle_command_message = AsyncMock() + connection_handler._handle_event_message = AsyncMock() + + await connection_handler._receive_events() + + connection_handler._handle_command_message.assert_awaited_once_with({ + 'id': 1, + 'method': 'TestCommand', + }) + connection_handler._handle_event_message.assert_awaited_once_with({ + 'method': 'TestEvent' + }) + + +@pytest.mark.asyncio +async def test__receive_events_connection_closed(connection_handler): + async def fake_incoming_messages_connection_closed(): + raise websockets.ConnectionClosed( + 1000, 'Normal Closure', rcvd_then_sent=True + ) + yield # Garante que seja um async generator + + connection_handler._incoming_messages = ( + fake_incoming_messages_connection_closed + ) + await connection_handler._receive_events() + + +@pytest.mark.asyncio +async def test__receive_events_unexpected_exception(connection_handler): + async def fake_incoming_messages_unexpected_error(): + raise ValueError('Unexpected error in async generator') + yield # Garante que seja um async generator + + connection_handler._incoming_messages = ( + fake_incoming_messages_unexpected_error + ) + + with pytest.raises( + ValueError, match='Unexpected error in async generator' + ): + await connection_handler._receive_events() + + +@pytest.mark.asyncio +async def test__aenter__(connection_handler): + result = await connection_handler.__aenter__() + assert result is connection_handler + + +@pytest.mark.asyncio +async def test__aexit__(connection_handler): + await connection_handler.register_callback('SomeEvent', MagicMock()) + connection_handler.clear_callbacks = AsyncMock() + connection_handler._ws_connection.close = AsyncMock() + await connection_handler.__aexit__(None, None, None) + connection_handler.clear_callbacks.assert_awaited_once() + connection_handler._ws_connection.close.assert_awaited_once() + + +def test__repr__(connection_handler): + result = connection_handler.__repr__() + assert result == 'ConnectionHandler(port=9222)' + + +def test__str__(connection_handler): + result = connection_handler.__str__() + assert result == 'ConnectionHandler(port=9222)' diff --git a/tests/test_connection_managers.py b/tests/test_connection_managers.py new file mode 100644 index 00000000..055c4159 --- /dev/null +++ b/tests/test_connection_managers.py @@ -0,0 +1,200 @@ +import pytest + +from pydoll import exceptions +from pydoll.connection.connection import CommandManager, EventsHandler + + +@pytest.fixture +def command_manager(): + """Retorna uma instância fresca de CommandManager para os testes.""" + return CommandManager() + + +@pytest.fixture +def events_handler(): + """Retorna uma instância fresca de EventsHandler para os testes.""" + return EventsHandler() + + +def test_create_command_future(command_manager): + test_command = {'method': 'TestMethod'} + future_result = command_manager.create_command_future(test_command) + + # Verifica se o ID foi atribuído corretamente + assert test_command['id'] == 1, 'The first command ID should be 1' + # Verifica se o future foi armazenado no dicionário de pendentes + assert 1 in command_manager._pending_commands + assert command_manager._pending_commands[1] is future_result + + # Cria um segundo comando e verifica o incremento do ID + second_command = {'method': 'SecondMethod'} + future_second = command_manager.create_command_future(second_command) + assert second_command['id'] == 2, 'The second command ID should be 2' + assert 2 in command_manager._pending_commands + assert command_manager._pending_commands[2] is future_second + + +def test_resolve_command(command_manager): + test_command = {'method': 'TestMethod'} + future_result = command_manager.create_command_future(test_command) + result_payload = '{"result": "success"}' + + # O future não deve estar concluído antes da resolução + assert not future_result.done(), ( + 'The future should not be completed before resolution' + ) + + # Resolve o comando e verifica o resultado + command_manager.resolve_command(1, result_payload) + assert future_result.done(), ( + 'The future should be completed after resolution' + ) + assert future_result.result() == result_payload, ( + 'The future result does not match the expected result' + ) + # O comando pendente deve ser removido + assert 1 not in command_manager._pending_commands + + +def test_resolve_unknown_command(command_manager): + test_command = {'method': 'TestMethod'} + future_result = command_manager.create_command_future(test_command) + + # Tenta resolver um ID inexistente; o future original deve permanecer pendente + command_manager.resolve_command(999, '{"result": "ignored"}') + assert not future_result.done(), ( + 'The future should not be completed after resolving an unknown command' + ) + + +def test_remove_pending_command(command_manager): + test_command = {'method': 'TestMethod'} + _ = command_manager.create_command_future(test_command) + + # Remove o comando pendente e verifica se ele foi removido + command_manager.remove_pending_command(1) + assert 1 not in command_manager._pending_commands, ( + 'The pending command should be removed' + ) + command_manager.remove_pending_command(1) + + +def test_register_callback_success(events_handler): + dummy_callback = lambda event: event + callback_id = events_handler.register_callback('TestEvent', dummy_callback) + + assert callback_id == 1, 'The first callback ID should be 1' + assert callback_id in events_handler._event_callbacks, ( + 'The callback must be registered' + ) + callback_info = events_handler._event_callbacks[callback_id] + assert callback_info['temporary'] is False, ( + 'The temporary flag should be False by default' + ) + + +def test_register_callback_invalid(events_handler): + with pytest.raises(exceptions.InvalidCallback): + events_handler.register_callback('TestEvent', 'Not a callback') + + +def test_remove_existing_callback(events_handler): + dummy_callback = lambda event: event + callback_id = events_handler.register_callback('TestEvent', dummy_callback) + removal_result = events_handler.remove_callback(callback_id) + + assert removal_result is True, ( + 'The removal of a existing callback should be successful' + ) + assert callback_id not in events_handler._event_callbacks, ( + 'The callback should be removed' + ) + + +def test_remove_nonexistent_callback(events_handler): + removal_result = events_handler.remove_callback(999) + assert removal_result is False, ( + 'The removal of a nonexistent callback should return False' + ) + + +def test_clear_callbacks(events_handler): + dummy_callback = lambda event: event + events_handler.register_callback('EventA', dummy_callback) + events_handler.register_callback('EventB', dummy_callback) + + events_handler.clear_callbacks() + assert len(events_handler._event_callbacks) == 0, ( + 'All callbacks should be cleared' + ) + + +@pytest.mark.asyncio +async def test_process_event_updates_network_logs(events_handler): + assert events_handler.network_logs == [] + network_event = { + 'method': 'Network.requestWillBeSent', + 'url': 'http://example.com', + } + + await events_handler.process_event(network_event) + + assert network_event in events_handler.network_logs, ( + 'The network event should be added to the logs' + ) + + +@pytest.mark.asyncio +async def test_process_event_triggers_callbacks(events_handler): + callback_results = [] + + def sync_callback(event): + callback_results.append(('sync', event.get('value'))) + + async def async_callback(event): + callback_results.append(('async', event.get('value'))) + + sync_callback_id = events_handler.register_callback( + 'MyCustomEvent', sync_callback, temporary=True + ) + async_callback_id = events_handler.register_callback( + 'MyCustomEvent', async_callback, temporary=False + ) + + test_event = {'method': 'MyCustomEvent', 'value': 123} + await events_handler.process_event(test_event) + + assert ('sync', 123) in callback_results, ( + 'The synchronous callback was not triggered correctly' + ) + assert ('async', 123) in callback_results, ( + 'The asynchronous callback was not triggered correctly' + ) + + assert sync_callback_id not in events_handler._event_callbacks, ( + 'The temporary callback should be removed after execution' + ) + + assert async_callback_id in events_handler._event_callbacks, ( + 'The permanent callback should remain registered' + ) + + +@pytest.mark.asyncio +async def test_trigger_callbacks_error_handling(events_handler, caplog): + def faulty_callback(event): + raise ValueError('Error in callback') + + faulty_callback_id = events_handler.register_callback( + 'ErrorEvent', faulty_callback, temporary=True + ) + test_event = {'method': 'ErrorEvent'} + + await events_handler.process_event(test_event) + assert faulty_callback_id not in events_handler._event_callbacks, ( + 'The callback with error should be removed after execution' + ) + error_logged = any( + 'Error in callback' in record.message for record in caplog.records + ) + assert error_logged, 'The error in the callback should be logged' diff --git a/tests/test_dom_commands.py b/tests/test_dom_commands.py new file mode 100644 index 00000000..30a76217 --- /dev/null +++ b/tests/test_dom_commands.py @@ -0,0 +1,468 @@ +# tests/test_dom_commands.py +import pytest +from unittest.mock import patch +from pydoll.commands.dom import ( + DomCommands, +) +from pydoll.commands.runtime import RuntimeCommands +from pydoll.constants import By + + +@pytest.fixture +def mock_runtime_commands(): + with patch('pydoll.commands.dom.RuntimeCommands') as mock: + yield mock + + +def test_enable_dom_events(): + expected = {'method': 'DOM.enable'} + result = DomCommands.enable_dom_events() + assert result == expected, ( + 'The enable_dom_events method did not return the expected dictionary.' + ) + + +def test_dom_document(): + expected = {'method': 'DOM.getDocument'} + result = DomCommands.dom_document() + assert result == expected, ( + 'The dom_document method did not return the expected dictionary.' + ) + + +def test_scroll_into_view(): + object_id = '12345' + expected = { + 'method': 'DOM.scrollIntoViewIfNeeded', + 'params': {'objectId': object_id}, + } + result = DomCommands.scroll_into_view(object_id) + assert result == expected, ( + 'The scroll_into_view method did not return the expected dictionary.' + ) + + +def test_get_outer_html(): + object_id = 67890 + expected = { + 'method': 'DOM.getOuterHTML', + 'params': {'objectId': object_id}, + } + result = DomCommands.get_outer_html(object_id) + assert result == expected, ( + 'The get_outer_html method did not return the expected dictionary.' + ) + + +def test_request_node(): + object_id = 'abcde' + expected = {'method': 'DOM.requestNode', 'params': {'objectId': object_id}} + result = DomCommands.request_node(object_id) + assert result == expected, ( + 'The request_node method did not return the expected dictionary.' + ) + + +def test_describe_node(): + object_id = 'fghij' + expected = { + 'method': 'DOM.describeNode', + 'params': {'objectId': object_id}, + } + result = DomCommands.describe_node(object_id) + assert result == expected, ( + 'The describe_node method did not return the expected dictionary.' + ) + + +def test_box_model(): + object_id = 'klmno' + expected = {'method': 'DOM.getBoxModel', 'params': {'objectId': object_id}} + result = DomCommands.box_model(object_id) + assert result == expected, ( + 'The box_model method did not return the expected dictionary.' + ) + + +def test_get_current_url(mock_runtime_commands): + expected_command = RuntimeCommands.evaluate_script('window.location.href') + mock_runtime_commands.evaluate_script.return_value = expected_command + result = DomCommands.get_current_url() + mock_runtime_commands.evaluate_script.assert_called_once_with( + 'window.location.href' + ) + assert result == expected_command, ( + 'The get_current_url method did not return the expected command.' + ) + + +def test_find_element_css(mock_runtime_commands): + by = By.CSS_SELECTOR + value = 'test-class' + expected_selector = 'test-class' + expected_expression = f'document.querySelector("{expected_selector}");' + expected_command = { + 'method': 'Runtime.evaluate', + 'params': { + 'expression': expected_expression, + }, + } + mock_runtime_commands.evaluate_script.return_value = expected_command + result = DomCommands.find_element(by, value) + mock_runtime_commands.evaluate_script.assert_called_once_with( + expected_expression + ) + assert result == expected_command, ( + 'The find_element method with CSS did not return the expected command.' + ) + + +def test_find_element_xpath(mock_runtime_commands): + by = By.XPATH + value = "//div[@id='test']" + expected_expression = ( + '\n var element = document.evaluate(\n' + ' "//div[@id=\'test\']", document, null,\n' + ' XPathResult.FIRST_ORDERED_NODE_TYPE, null\n' + ' ).singleNodeValue;\n' + ' element;\n ' + ) + expected_command = { + 'method': 'Runtime.evaluate', + 'params': { + 'expression': expected_expression, + }, + } + mock_runtime_commands.evaluate_script.return_value = expected_command + result = DomCommands.find_element(by, value) + mock_runtime_commands.evaluate_script.assert_called_once_with( + expected_expression + ) + assert result == expected_command, ( + 'The find_element method with XPATH did not return the expected command.' + ) + + +def test_find_element_id(mock_runtime_commands): + by = By.ID + value = 'test-id' + expected_selector = '#test-id' + expected_expression = f'document.querySelector("{expected_selector}");' + expected_command = { + 'method': 'Runtime.evaluate', + 'params': { + 'expression': expected_expression, + 'returnByValue': False, + }, + } + mock_runtime_commands.evaluate_script.return_value = expected_command + result = DomCommands.find_element(by, value) + mock_runtime_commands.evaluate_script.assert_called_once_with( + expected_expression + ) + assert result == expected_command, ( + 'The find_element method with ID did not return the expected command.' + ) + + +def test_find_element_class_name(mock_runtime_commands): + by = By.CLASS_NAME + value = 'test-class' + expected_selector = '.test-class' + expected_expression = f'document.querySelector("{expected_selector}");' + expected_command = { + 'method': 'Runtime.evaluate', + 'params': { + 'expression': expected_expression, + 'returnByValue': False, + }, + } + mock_runtime_commands.evaluate_script.return_value = expected_command + result = DomCommands.find_element(by, value) + mock_runtime_commands.evaluate_script.assert_called_once_with( + expected_expression + ) + assert result == expected_command, ( + 'The find_element method with CLASS_NAME did not return the expected command.' + ) + + +def test_find_element_relative_css(mock_runtime_commands): + by = By.CSS_SELECTOR + value = 'div[id="test"]' + object_id = '12345' + expected_expression = ( + '\n function() {\n' + ' return this.querySelector("div[id=\\"test\\"]");\n' + ' }\n ' + ) + expected_command = { + 'method': 'Runtime.callFunctionOn', + 'params': { + 'functionDeclaration': expected_expression, + 'objectId': object_id, + 'returnByValue': False, + }, + } + mock_runtime_commands.call_function_on.return_value = expected_command + result = DomCommands.find_element(by, value, object_id) + mock_runtime_commands.call_function_on.assert_called_once_with( + object_id, expected_expression, return_by_value=False + ) + + assert result == expected_command, ( + 'The find_element relative method did not return the expected command.' + ) + + +def test_find_element_relative_class_name(mock_runtime_commands): + by = By.CLASS_NAME + value = 'test-class' + object_id = '12345' + expected_selector = '.test-class' + expected_expression = ( + f'\n function() {{\n' + f' return this.querySelector("' + f'{expected_selector}");\n' + f' }}\n ' + ) + expected_command = { + 'method': 'Runtime.callFunctionOn', + 'params': { + 'functionDeclaration': expected_expression, + 'objectId': object_id, + 'returnByValue': False, + }, + } + mock_runtime_commands.call_function_on.return_value = expected_command + result = DomCommands.find_element(by, value, object_id) + mock_runtime_commands.call_function_on.assert_called_once_with( + object_id, expected_expression, return_by_value=False + ) + assert result == expected_command, ( + 'The find_element relative method did not return the expected command.' + ) + + +def test_find_element_relative_id(mock_runtime_commands): + by = By.ID + value = 'test-id' + object_id = '12345' + expected_selector = '#test-id' + expected_command = { + 'method': 'Runtime.callFunctionOn', + 'params': { + 'functionDeclaration': ( + f'function() {{ return this.querySelector("' + f'{expected_selector}"); }}' + ), + 'objectId': object_id, + 'returnByValue': False, + }, + } + mock_runtime_commands.call_function_on.return_value = expected_command + result = DomCommands.find_element(by, value, object_id) + mock_runtime_commands.call_function_on.assert_called_once_with( + object_id, + ( + f'\n function() {{\n' + f' return this.querySelector("' + f'{expected_selector}");\n' + f' }}\n ' + ), + return_by_value=False, + ) + assert result == expected_command, ( + 'The find_element relative method did not return the expected command.' + ) + + +def test_find_element_relative_xpath(mock_runtime_commands): + by = By.XPATH + value = '//div[@id="test"]' + object_id = '12345' + expected_command = { + 'method': 'Runtime.callFunctionOn', + 'params': { + 'functionDeclaration': ( + '\n function() {\n' + ' return document.evaluate(\n' + ' ".//div[@id=\\"test\\"]", this, null,\n' + ' XPathResult.FIRST_ORDERED_NODE_TYPE, null\n' + ' ).singleNodeValue;\n' + ' }\n ' + ), + 'objectId': object_id, + 'returnByValue': False, + }, + } + mock_runtime_commands.call_function_on.return_value = expected_command + result = DomCommands.find_element(by, value, object_id) + mock_runtime_commands.call_function_on.assert_called_once_with( + object_id, + ( + '\n function() {\n' + ' return document.evaluate(\n' + ' ".//div[@id=\\"test\\"]", this, null,\n' + ' XPathResult.FIRST_ORDERED_NODE_TYPE, null\n' + ' ).singleNodeValue;\n' + ' }\n ' + ), + return_by_value=False, + ) + assert result == expected_command, ( + 'The find_elements relative method did not return the expected command.' + ) + + +def test_find_elements_class_name(mock_runtime_commands): + by = By.CLASS_NAME + value = 'test-class' + expected_selector = '.test-class' + expected_expression = f'document.querySelectorAll("{expected_selector}");' + expected_command = { + 'method': 'Runtime.evaluate', + 'params': { + 'expression': expected_expression, + }, + } + + mock_runtime_commands.evaluate_script.return_value = expected_command + result = DomCommands.find_elements(by, value) + mock_runtime_commands.evaluate_script.assert_called_once_with( + expected_expression + ) + assert result == expected_command, ( + 'The find_elements method with CLASS_NAME did not return the expected command.' + ) + + +def test_find_elements_xpath(mock_runtime_commands): + by = By.XPATH + value = "//div[@class='test']" + expected_expression = ( + '\n var elements = document.evaluate(\n' + ' "//div[@class=\'test\']", document, null,\n' + ' XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null\n' + ' );\n var results = [];\n' + ' for (var i = 0; i < elements.snapshotLength; i++) {\n' + ' results.push(elements.snapshotItem(i));\n' + ' }\n results;\n ' + ) + expected_command = { + 'method': 'Runtime.evaluate', + 'params': { + 'expression': expected_expression, + }, + } + mock_runtime_commands.evaluate_script.return_value = expected_command + result = DomCommands.find_elements(by, value) + mock_runtime_commands.evaluate_script.assert_called_once_with( + expected_expression + ) + assert result == expected_command, ( + 'The find_elements method with XPATH did not return the expected command.' + ) + + +def test_find_elements_id(mock_runtime_commands): + by = By.ID + value = 'test-id' + expected_selector = '#test-id' + expected_command = { + 'method': 'Runtime.evaluate', + 'params': { + 'expression': f'document.querySelectorAll("{expected_selector}")' + }, + } + mock_runtime_commands.evaluate_script.return_value = expected_command + result = DomCommands.find_elements(by, value) + mock_runtime_commands.evaluate_script.assert_called_once_with( + 'document.querySelectorAll("#test-id");' + ) + assert result == expected_command, ( + 'The find_elements method with ID did not return the expected command.' + ) + + +def test_find_elements_css(mock_runtime_commands): + by = By.CSS_SELECTOR + value = 'test-class' + expected_selector = 'test-class' + expected_expression = f'document.querySelectorAll("{expected_selector}");' + expected_command = { + 'method': 'Runtime.evaluate', + 'params': { + 'expression': expected_expression, + }, + } + mock_runtime_commands.evaluate_script.return_value = expected_command + result = DomCommands.find_elements(by, value) + mock_runtime_commands.evaluate_script.assert_called_once_with( + expected_expression + ) + assert result == expected_command, ( + 'The find_elements method with CSS did not return the expected command.' + ) + + +def test_find_elements_relative_xpath(mock_runtime_commands): + by = By.XPATH + value = '//div[@id="test"]' + object_id = '12345' + expected_expression = ( + '\n function() {\n' + ' var elements = document.evaluate(\n' + ' ".//div[@id=\\"test\\"]", this, null,\n' + ' XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null\n' + ' );\n' + ' var results = [];\n' + ' for (var i = 0; i < elements.snapshotLength; i++) {\n' + ' results.push(elements.snapshotItem(i));\n' + ' }\n' + ' return results;\n' + ' }\n ' + ) + expected_command = { + 'method': 'Runtime.callFunctionOn', + 'params': { + 'functionDeclaration': expected_expression, + 'objectId': object_id, + 'returnByValue': False, + }, + } + mock_runtime_commands.call_function_on.return_value = expected_command + result = DomCommands.find_elements(by, value, object_id) + mock_runtime_commands.call_function_on.assert_called_once_with( + object_id, expected_expression, return_by_value=False + ) + assert result == expected_command, ( + 'The find_elements relative method did not return the expected command.' + ) + + +def test_find_elements_relative_css(mock_runtime_commands): + by = By.CSS_SELECTOR + value = 'div[id="test"]' + object_id = '12345' + expected_expression = ( + '\n function() {\n' + ' return this.querySelectorAll("div[id=\\"test\\"]");\n' + ' }\n ' + ) + expected_command = { + 'method': 'Runtime.callFunctionOn', + 'params': { + 'functionDeclaration': expected_expression, + 'objectId': object_id, + 'returnByValue': False, + }, + } + mock_runtime_commands.call_function_on.return_value = expected_command + result = DomCommands.find_elements(by, value, object_id) + mock_runtime_commands.call_function_on.assert_called_once_with( + object_id, expected_expression, return_by_value=False + ) + assert result == expected_command, ( + 'The find_elements relative method did not return the expected command.' + ) diff --git a/tests/test_events.py b/tests/test_events.py new file mode 100644 index 00000000..4853394a --- /dev/null +++ b/tests/test_events.py @@ -0,0 +1,87 @@ +from pydoll.events import ( + BrowserEvents, + DomEvents, + FetchEvents, + NetworkEvents, + PageEvents, +) + + +def test_browser_events(): + assert BrowserEvents.DOWNLOAD_PROGRESS == 'Browser.downloadProgress' + assert BrowserEvents.DOWNLOAD_WILL_BEGIN == 'Browser.downloadWillBegin' + + +def test_dom_events(): + assert DomEvents.ATTRIBUTE_MODIFIED == 'DOM.attributeModified' + assert DomEvents.ATTRIBUTE_REMOVED == 'DOM.attributeRemoved' + assert DomEvents.CHARACTER_DATA_MODIFIED == 'DOM.characterDataModified' + assert DomEvents.CHILD_NODE_COUNT_UPDATED == 'DOM.childNodeCountUpdated' + assert DomEvents.CHILD_NODE_INSERTED == 'DOM.childNodeInserted' + assert DomEvents.CHILD_NODE_REMOVED == 'DOM.childNodeRemoved' + assert DomEvents.DOCUMENT_UPDATED == 'DOM.documentUpdated' + assert DomEvents.SCROLLABLE_FLAG_UPDATED == 'DOM.scrollableFlagUpdated' + assert DomEvents.SHADOW_ROOT_POPPED == 'DOM.shadowRootPopped' + assert DomEvents.SHADOW_ROOT_PUSHED == 'DOM.shadowRootPushed' + assert ( + DomEvents.TOP_LAYER_ELEMENTS_UPDATED == 'DOM.topLayerElementsUpdated' + ) + + +def test_fetch_events(): + assert FetchEvents.AUTH_REQUIRED == 'Fetch.authRequired' + assert FetchEvents.REQUEST_PAUSED == 'Fetch.requestPaused' + + +def test_network_events(): + assert NetworkEvents.DATA_RECEIVED == 'Network.dataReceived' + assert NetworkEvents.REQUEST_WILL_BE_SENT == 'Network.requestWillBeSent' + assert NetworkEvents.RESPONSE_RECEIVED == 'Network.responseReceived' + assert NetworkEvents.WEB_SOCKET_CLOSED == 'Network.webSocketClosed' + assert NetworkEvents.WEB_SOCKET_CREATED == 'Network.webSocketCreated' + assert ( + NetworkEvents.WEB_SOCKET_FRAME_ERROR == 'Network.webSocketFrameError' + ) + assert ( + NetworkEvents.WEB_SOCKET_FRAME_RECEIVED + == 'Network.webSocketFrameReceived' + ) + assert NetworkEvents.WEB_SOCKET_FRAME_SENT == 'Network.webSocketFrameSent' + assert NetworkEvents.WEB_TRANSPORT_CLOSED == 'Network.webTransportClosed' + assert NetworkEvents.WEB_TRANSPORT_CONNECTION_ESTABLISHED == ( + 'Network.webTransportConnectionEstablished' + ) + assert NetworkEvents.WEB_TRANSPORT_CREATED == 'Network.webTransportCreated' + assert NetworkEvents.POLICY_UPDATED == 'Network.policyUpdated' + assert NetworkEvents.REQUEST_INTERCEPTED == 'Network.requestIntercepted' + assert ( + NetworkEvents.REQUEST_SERVED_FROM_CACHE + == 'Network.requestServedFromCache' + ) + assert NetworkEvents.LOADING_FAILED == 'Network.loadingFailed' + assert NetworkEvents.LOADING_FINISHED == 'Network.loadingFinished' + assert ( + NetworkEvents.EVENT_SOURCE_MESSAGE_RECEIVED + == 'Network.eventSourceMessageReceived' + ) + + +def test_page_events(): + assert PageEvents.PAGE_LOADED == 'Page.loadEventFired' + assert PageEvents.DOM_CONTENT_LOADED == 'Page.domContentEventFired' + assert PageEvents.FRAME_ATTACHED == 'Page.frameAttached' + assert PageEvents.FRAME_DETACHED == 'Page.frameDetached' + assert PageEvents.FRAME_NAVIGATED == 'Page.frameNavigated' + assert PageEvents.FRAME_STARTED_LOADING == 'Page.frameStartedLoading' + assert PageEvents.FRAME_STOPPED_LOADING == 'Page.frameStoppedLoading' + assert PageEvents.JS_DIALOG_CLOSED == 'Page.javascriptDialogClosed' + assert PageEvents.JS_DIALOG_OPENING == 'Page.javascriptDialogOpening' + assert ( + PageEvents.NAVIGATED_WITHIN_DOCUMENT == 'Page.navigatedWithinDocument' + ) + assert PageEvents.DOWNLOAD_PROGRESS == 'Page.downloadProgress' + assert PageEvents.DOWNLOAD_WILL_BEGIN == 'Page.downloadWillBegin' + assert PageEvents.LIFECYCLE_EVENT == 'Page.lifecycleEvent' + assert PageEvents.WINDOW_OPENED == 'Page.windowOpen' + assert PageEvents.DOCUMENT_OPENED == 'Page.documentOpened' + assert PageEvents.FILE_CHOOSER_OPENED == 'Page.fileChooserOpened' diff --git a/tests/test_fetch_commands.py b/tests/test_fetch_commands.py new file mode 100644 index 00000000..ff70ad8a --- /dev/null +++ b/tests/test_fetch_commands.py @@ -0,0 +1,163 @@ +from pydoll.commands.fetch import FetchCommands + + +def test_continue_request(): + request_id = '123' + url = 'http://example.com' + method = 'POST' + post_data = 'data' + headers = {'Content-Type': 'application/json'} + intercept_response = True + + expected_result = { + 'method': 'Fetch.continueRequest', + 'params': { + 'requestId': request_id, + 'url': url, + 'method': method, + 'postData': post_data, + 'headers': headers, + 'interceptResponse': intercept_response, + }, + } + + result = FetchCommands.continue_request( + request_id, url, method, post_data, headers, intercept_response + ) + assert result == expected_result + + +def test_continue_request_with_auth(): + request_id = '123' + proxy_username = 'user' + proxy_password = 'pass' + + expected_result = { + 'method': 'Fetch.continueWithAuth', + 'params': { + 'requestId': request_id, + 'authChallengeResponse': { + 'response': 'ProvideCredentials', + 'username': proxy_username, + 'password': proxy_password, + }, + }, + } + + result = FetchCommands.continue_request_with_auth( + request_id, proxy_username, proxy_password + ) + assert result == expected_result + + +def test_disable_fetch_events(): + expected_result = {'method': 'Fetch.disable', 'params': {}} + result = FetchCommands.disable_fetch_events() + assert result == expected_result + + +def test_enable_fetch_events(): + handle_auth_requests = True + resource_type = 'Document' + + expected_result = { + 'method': 'Fetch.enable', + 'params': { + 'patterns': [{'urlPattern': '*', 'resourceType': resource_type}], + 'handleAuthRequests': handle_auth_requests, + }, + } + + result = FetchCommands.enable_fetch_events( + handle_auth_requests, resource_type + ) + assert result == expected_result + + +def test_fail_request(): + request_id = '123' + error_reason = 'Failed' + + expected_result = { + 'method': 'Fetch.failRequest', + 'params': { + 'requestId': request_id, + 'errorReason': error_reason, + }, + } + + result = FetchCommands.fail_request(request_id, error_reason) + assert result == expected_result + + +def test_fulfill_request(): + request_id = '123' + response_code = 200 + response_headers = {'Content-Type': 'application/json'} + binary_response_headers = 'binary_headers' + body = 'response_body' + response_phrase = 'OK' + + expected_result = { + 'method': 'Fetch.fulfillRequest', + 'params': { + 'requestId': request_id, + 'responseCode': response_code, + 'responseHeaders': response_headers, + 'binaryResponseHeaders': binary_response_headers, + 'body': body, + 'responsePhrase': response_phrase, + }, + } + + result = FetchCommands.fulfill_request( + request_id, + response_code, + response_headers, + binary_response_headers, + body, + response_phrase, + ) + assert result == expected_result + + +def test_get_response_body(): + request_id = '123' + + expected_result = { + 'method': 'Fetch.getResponseBody', + 'params': { + 'requestId': request_id, + }, + } + + result = FetchCommands.get_response_body(request_id) + assert result == expected_result + + +def test_continue_response(): + request_id = '123' + response_code = 200 + response_headers = {'Content-Type': 'application/json'} + binary_response_headers = 'binary_headers' + response_phrase = 'OK' + + expected_result = { + 'method': 'Fetch.continueResponse', + 'params': { + 'requestId': request_id, + 'responseCode': response_code, + 'responseHeaders': response_headers, + 'binaryResponseHeaders': binary_response_headers, + 'responsePhrase': response_phrase, + }, + } + + result = FetchCommands.continue_response( + request_id, + response_code, + response_headers, + binary_response_headers, + response_phrase, + ) + assert result == expected_result diff --git a/tests/test_input_commands.py b/tests/test_input_commands.py new file mode 100644 index 00000000..d5dec062 --- /dev/null +++ b/tests/test_input_commands.py @@ -0,0 +1,56 @@ +from pydoll.commands.input import InputCommands + + +def test_mouse_press(): + x, y = 100, 200 + expected_command = { + 'method': 'Input.dispatchMouseEvent', + 'params': { + 'type': 'mousePressed', + 'button': 'left', + 'x': x, + 'y': y, + 'clickCount': 1, + 'modifiers': 0, + }, + } + assert InputCommands.mouse_press(x, y) == expected_command + + +def test_mouse_release(): + x, y = 100, 200 + expected_command = { + 'method': 'Input.dispatchMouseEvent', + 'params': { + 'type': 'mouseReleased', + 'button': 'left', + 'x': x, + 'y': y, + 'clickCount': 1, + 'modifiers': 0, + }, + } + assert InputCommands.mouse_release(x, y) == expected_command + + +def test_key_press(): + char = 'a' + expected_command = { + 'method': 'Input.dispatchKeyEvent', + 'params': { + 'type': 'char', + 'text': char, + }, + } + assert InputCommands.key_press(char) == expected_command + + +def test_insert_text(): + text = 'hello' + expected_command = { + 'method': 'Input.insertText', + 'params': { + 'text': text, + }, + } + assert InputCommands.insert_text(text) == expected_command diff --git a/tests/test_network_commands.py b/tests/test_network_commands.py new file mode 100644 index 00000000..bc631d97 --- /dev/null +++ b/tests/test_network_commands.py @@ -0,0 +1,172 @@ +from pydoll.commands.network import NetworkCommands + + +def test_clear_browser_cache(): + assert NetworkCommands.clear_browser_cache() == { + 'method': 'Network.clearBrowserCache' + } + + +def test_clear_browser_cookies(): + assert NetworkCommands.clear_browser_cookies() == { + 'method': 'Network.clearBrowserCookies' + } + + +def test_delete_cookies(): + name = 'test_cookie' + url = 'http://example.com' + expected_command = { + 'method': 'Network.deleteCookies', + 'params': {'name': name, 'url': url}, + } + assert NetworkCommands.delete_cookies(name, url) == expected_command + + expected_command_without_url = { + 'method': 'Network.deleteCookies', + 'params': {'name': name}, + } + assert NetworkCommands.delete_cookies(name) == expected_command_without_url + + +def test_disable_network_events(): + assert NetworkCommands.disable_network_events() == { + 'method': 'Network.disable' + } + + +def test_enable_network_events(): + assert NetworkCommands.enable_network_events() == { + 'method': 'Network.enable' + } + + +def test_get_cookies(): + urls = ['http://example.com'] + expected_command = { + 'method': 'Network.getCookies', + 'params': {'urls': urls}, + } + assert NetworkCommands.get_cookies(urls) == expected_command + + expected_command_without_urls = { + 'method': 'Network.getCookies', + 'params': {}, + } + assert NetworkCommands.get_cookies() == expected_command_without_urls + + +def test_get_request_post_data(): + request_id = '12345' + expected_command = { + 'method': 'Network.getRequestPostData', + 'params': {'requestId': request_id}, + } + assert ( + NetworkCommands.get_request_post_data(request_id) == expected_command + ) + + +def test_get_response_body(): + request_id = '12345' + expected_command = { + 'method': 'Network.getResponseBody', + 'params': {'requestId': request_id}, + } + assert NetworkCommands.get_response_body(request_id) == expected_command + + +def test_set_cache_disabled(): + cache_disabled = True + expected_command = { + 'method': 'Network.setCacheDisabled', + 'params': {'cacheDisabled': cache_disabled}, + } + assert ( + NetworkCommands.set_cache_disabled(cache_disabled) == expected_command + ) + + +def test_set_cookie(): + name = 'test_cookie' + value = 'test_value' + url = 'http://example.com' + expected_command = { + 'method': 'Network.setCookie', + 'params': {'name': name, 'value': value, 'url': url}, + } + assert NetworkCommands.set_cookie(name, value, url) == expected_command + + expected_command_without_url = { + 'method': 'Network.setCookie', + 'params': {'name': name, 'value': value}, + } + assert ( + NetworkCommands.set_cookie(name, value) == expected_command_without_url + ) + + +def test_set_cookies(): + cookies = [{'name': 'test_cookie', 'value': 'test_value'}] + expected_command = { + 'method': 'Network.setCookies', + 'params': {'cookies': cookies}, + } + assert NetworkCommands.set_cookies(cookies) == expected_command + + +def test_set_extra_http_headers(): + headers = {'Authorization': 'Bearer token'} + expected_command = { + 'method': 'Network.setExtraHTTPHeaders', + 'params': {'headers': headers}, + } + assert NetworkCommands.set_extra_http_headers(headers) == expected_command + + +def test_set_useragent_override(): + user_agent = 'Mozilla/5.0' + expected_command = { + 'method': 'Network.setUserAgentOverride', + 'params': {'userAgent': user_agent}, + } + assert ( + NetworkCommands.set_useragent_override(user_agent) == expected_command + ) + + +def test_get_all_cookies(): + assert NetworkCommands.get_all_cookies() == { + 'method': 'Network.getAllCookies' + } + + +def test_search_in_response(): + request_id = '12345' + query = 'test_query' + case_sensitive = True + is_regex = True + expected_command = { + 'method': 'Network.searchInResponseBody', + 'params': { + 'requestId': request_id, + 'query': query, + 'caseSensitive': case_sensitive, + 'isRegex': is_regex, + }, + } + assert ( + NetworkCommands.search_in_response( + request_id, query, case_sensitive, is_regex + ) + == expected_command + ) + + +def test_set_blocked_urls(): + urls = ['http://example.com'] + expected_command = { + 'method': 'Network.setBlockedURLs', + 'params': {'urls': urls}, + } + assert NetworkCommands.set_blocked_urls(urls) == expected_command diff --git a/tests/test_page_commands.py b/tests/test_page_commands.py new file mode 100644 index 00000000..e1508998 --- /dev/null +++ b/tests/test_page_commands.py @@ -0,0 +1,135 @@ +from pydoll.commands.page import PageCommands + + +def test_set_download_path(): + path = '/path/to/download' + expected_command = { + 'method': 'Page.setDownloadBehavior', + 'params': { + 'behavior': 'allow', + 'downloadPath': path, + }, + } + assert PageCommands.set_download_path(path) == expected_command + + +def test_screenshot_default(): + expected_command = { + 'method': 'Page.captureScreenshot', + 'params': { + 'format': 'jpeg', + 'quality': 100, + }, + } + assert PageCommands.screenshot() == expected_command + + +def test_screenshot_jpeg(): + expected_command = { + 'method': 'Page.captureScreenshot', + 'params': { + 'format': 'jpeg', + 'quality': 80, + }, + } + assert ( + PageCommands.screenshot(format='jpeg', quality=80) == expected_command + ) + + +def test_screenshot_png(): + expected_command = { + 'method': 'Page.captureScreenshot', + 'params': { + 'format': 'png', + 'quality': 100, + }, + } + assert PageCommands.screenshot(format='png') == expected_command + + +def test_screenshot_with_clip(): + clip = { + 'x': 10, + 'y': 20, + 'width': 30, + 'height': 40, + 'scale': 1, + } + expected_command = { + 'method': 'Page.captureScreenshot', + 'params': { + 'format': 'jpeg', + 'quality': 100, + 'clip': clip, + }, + } + assert PageCommands.screenshot(clip=clip) == expected_command + + +def test_go_to(): + url = 'https://example.com' + expected_command = { + 'method': 'Page.navigate', + 'params': { + 'url': url, + }, + } + assert PageCommands.go_to(url) == expected_command + + +def test_refresh_default(): + expected_command = { + 'method': 'Page.reload', + 'params': { + 'ignoreCache': False, + }, + } + assert PageCommands.refresh() == expected_command + + +def test_refresh_ignore_cache(): + expected_command = { + 'method': 'Page.reload', + 'params': { + 'ignoreCache': True, + }, + } + assert PageCommands.refresh(ignore_cache=True) == expected_command + + +def test_print_to_pdf_default(): + expected_command = { + 'method': 'Page.printToPDF', + 'params': { + 'scale': 1, + 'paperWidth': 8.5, + 'paperHeight': 11, + }, + } + assert PageCommands.print_to_pdf() == expected_command + + +def test_print_to_pdf_custom(): + expected_command = { + 'method': 'Page.printToPDF', + 'params': { + 'scale': 2, + 'paperWidth': 5.5, + 'paperHeight': 8.5, + }, + } + assert ( + PageCommands.print_to_pdf(scale=2, paper_width=5.5, paper_height=8.5) + == expected_command + ) + + +def test_enable_page(): + expected_command = {'method': 'Page.enable'} + assert PageCommands.enable_page() == expected_command + + +def test_disable_page(): + expected_command = {'method': 'Page.disable'} + assert PageCommands.disable_page() == expected_command diff --git a/tests/test_runtime_commands.py b/tests/test_runtime_commands.py new file mode 100644 index 00000000..823db2d2 --- /dev/null +++ b/tests/test_runtime_commands.py @@ -0,0 +1,39 @@ +from pydoll.commands.runtime import RuntimeCommands + + +def test_get_properties(): + object_id = '12345' + expected_command = { + 'method': 'Runtime.getProperties', + 'params': {'objectId': object_id, 'ownProperties': True}, + } + assert RuntimeCommands.get_properties(object_id) == expected_command + + +def test_call_function_on(): + object_id = '12345' + function_declaration = 'function() { return this; }' + return_by_value = True + expected_command = { + 'method': 'Runtime.callFunctionOn', + 'params': { + 'objectId': object_id, + 'functionDeclaration': function_declaration, + 'returnByValue': return_by_value, + }, + } + assert ( + RuntimeCommands.call_function_on( + object_id, function_declaration, return_by_value + ) + == expected_command + ) + + +def test_evaluate_script(): + expression = '2 + 2' + expected_command = { + 'method': 'Runtime.evaluate', + 'params': {'expression': expression, 'returnByValue': False}, + } + assert RuntimeCommands.evaluate_script(expression) == expected_command diff --git a/tests/test_storage_commands.py b/tests/test_storage_commands.py new file mode 100644 index 00000000..7933d641 --- /dev/null +++ b/tests/test_storage_commands.py @@ -0,0 +1,20 @@ +from pydoll.commands.storage import StorageCommands + + +def test_clear_cookies(): + expected = {'method': 'Storage.clearCookies', 'params': {}} + assert StorageCommands.clear_cookies() == expected + + +def test_set_cookies(): + cookies = [ + {'name': 'cookie1', 'value': 'value1'}, + {'name': 'cookie2', 'value': 'value2'}, + ] + expected = {'method': 'Storage.setCookies', 'params': {'cookies': cookies}} + assert StorageCommands.set_cookies(cookies) == expected + + +def test_get_cookies(): + expected = {'method': 'Storage.getCookies', 'params': {}} + assert StorageCommands.get_cookies() == expected diff --git a/tests/test_target_commands.py b/tests/test_target_commands.py new file mode 100644 index 00000000..0f6f2e61 --- /dev/null +++ b/tests/test_target_commands.py @@ -0,0 +1,39 @@ +from pydoll.commands.target import TargetCommands + + +def test_activate_target(): + target_id = 'test_target_id' + expected_result = { + 'method': 'Target.attachToTarget', + 'params': {'targetId': target_id}, + } + assert TargetCommands.activate_target(target_id) == expected_result + + +def test_attach_to_target(): + target_id = 'test_target_id' + expected_result = { + 'method': 'Target.attachToTarget', + 'params': {'targetId': target_id}, + } + assert TargetCommands.attach_to_target(target_id) == expected_result + + +def test_close_target(): + target_id = 'test_target_id' + expected_result = { + 'method': 'Target.closeTarget', + 'params': {'targetId': target_id}, + } + assert TargetCommands.close_target(target_id) == expected_result + + +def test_create_target(): + url = 'http://example.com' + expected_result = {'method': 'Target.createTarget', 'params': {'url': url}} + assert TargetCommands.create_target(url) == expected_result + + +def test_get_targets(): + expected_result = {'method': 'Target.getTargets', 'params': {}} + assert TargetCommands.get_targets() == expected_result diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..c4030e3f --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,74 @@ +import aiohttp +import pytest +from aioresponses import aioresponses + +from pydoll import exceptions +from pydoll.utils import decode_image_to_bytes, get_browser_ws_address + + +class TestUtils: + """ + Classe de testes para as funções utilitárias do módulo pydoll.utils. + Agrupa testes relacionados à decodificação de imagens e comunicação com o navegador. + """ + + def test_decode_image_to_bytes(self): + """ + Testa a função decode_image_to_bytes. + Verifica se a função consegue decodificar corretamente uma string base64 + para seus bytes originais. + """ + base64code = 'aGVsbG8gd29ybGQ=' # 'hello world' em base64 + assert decode_image_to_bytes(base64code) == b'hello world' + + @pytest.mark.asyncio + async def test_successful_response(self): + """ + Testa o cenário de sucesso ao obter o endereço WebSocket do navegador. + Verifica se a função retorna corretamente a URL do WebSocket quando + a resposta da API contém o campo esperado. + """ + port = 9222 + expected_url = 'ws://localhost:9222/devtools/browser/abc123' + + with aioresponses() as mocked: + mocked.get( + f'http://localhost:{port}/json/version', + payload={'webSocketDebuggerUrl': expected_url}, + ) + result = await get_browser_ws_address(port) + assert result == expected_url + + @pytest.mark.asyncio + async def test_network_error(self): + """ + Testa o comportamento da função quando ocorre um erro de rede. + Verifica se a função lança a exceção NetworkError apropriada + quando há falha na comunicação com o navegador. + """ + port = 9222 + + with pytest.raises(exceptions.NetworkError): + with aioresponses() as mocked: + mocked.get( + f'http://localhost:{port}/json/version', + exception=aiohttp.ClientError, + ) + await get_browser_ws_address(port) + + @pytest.mark.asyncio + async def test_missing_websocket_url(self): + """ + Testa o comportamento quando a resposta da API não contém a URL do WebSocket. + Verifica se a função lança a exceção InvalidResponse quando o campo + 'webSocketDebuggerUrl' está ausente na resposta. + """ + port = 9222 + + with aioresponses() as mocked: + mocked.get( + f'http://localhost:{port}/json/version', + payload={'someOtherKey': 'value'}, + ) + with pytest.raises(exceptions.InvalidResponse): + await get_browser_ws_address(port) diff --git a/tests/test_web_element.py b/tests/test_web_element.py new file mode 100644 index 00000000..e374f7be --- /dev/null +++ b/tests/test_web_element.py @@ -0,0 +1,473 @@ +import pytest +import pytest_asyncio +from unittest.mock import AsyncMock, MagicMock, patch +import json + +from pydoll.exceptions import ( + ElementNotVisible, + ElementNotInteractable, + ElementNotFound, +) +from pydoll.commands.dom import DomCommands +from pydoll.commands.input import InputCommands + +from pydoll.element import WebElement + + +@pytest_asyncio.fixture +async def mock_connection_handler(): + with patch( + 'pydoll.connection.connection.ConnectionHandler', autospec=True + ) as mock: + handler = mock.return_value + handler.execute_command = AsyncMock() + yield handler + + +@pytest.fixture +def web_element(mock_connection_handler): + attributes_list = [ + 'id', + 'test-id', + 'class', + 'test-class', + 'value', + 'test-value', + 'tag_name', + 'div', + ] + return WebElement( + object_id='test-object-id', + connection_handler=mock_connection_handler, + method='css', + selector='#test', + attributes_list=attributes_list, + ) + + +@pytest.mark.asyncio +async def test_web_element_initialization(web_element): + assert web_element._object_id == 'test-object-id' + assert web_element._search_method == 'css' + assert web_element._selector == '#test' + assert web_element._attributes == { + 'id': 'test-id', + 'class_name': 'test-class', + 'value': 'test-value', + 'tag_name': 'div', + } + + +def test_web_element_properties(web_element): + assert web_element.value == 'test-value' + assert web_element.class_name == 'test-class' + assert web_element.id == 'test-id' + assert web_element.is_enabled == True + + # Test disabled attribute + disabled_element = WebElement( + 'test-id', MagicMock(), attributes_list=['disabled', 'true'] + ) + assert disabled_element.is_enabled == False + + +@pytest.mark.asyncio +async def test_bounds_property(web_element): + expected_bounds = {'content': [0, 0, 100, 100]} + web_element._connection_handler.execute_command.return_value = { + 'result': {'model': expected_bounds} + } + + bounds = await web_element.bounds + assert bounds == expected_bounds['content'] + web_element._connection_handler.execute_command.assert_called_once_with( + DomCommands.box_model(object_id='test-object-id'), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_inner_html(web_element): + expected_html = '
Test
' + web_element._connection_handler.execute_command.return_value = { + 'result': {'outerHTML': expected_html} + } + + html = await web_element.inner_html + assert html == expected_html + web_element._connection_handler.execute_command.assert_called_once_with( + DomCommands.get_outer_html('test-object-id'), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_get_bounds_using_js(web_element): + expected_bounds = {'x': 0, 'y': 0, 'width': 100, 'height': 100} + web_element._connection_handler.execute_command.return_value = { + 'result': {'result': {'value': json.dumps(expected_bounds)}} + } + + bounds = await web_element.get_bounds_using_js() + assert bounds == expected_bounds + + +@pytest.mark.asyncio +async def test_get_screenshot(web_element, tmp_path): + bounds = {'x': 0, 'y': 0, 'width': 100, 'height': 100} + web_element._connection_handler.execute_command.side_effect = [ + {'result': {'result': {'value': json.dumps(bounds)}}}, + { + 'result': { + 'data': 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/wcAAgAB/edzE+oAAAAASUVORK5CYII=' + } + }, + ] + + screenshot_path = tmp_path / 'element.png' + with patch('aiofiles.open') as mock_open: + mock_open.return_value.__aenter__.return_value.write = AsyncMock() + await web_element.get_screenshot(str(screenshot_path)) + + assert web_element._connection_handler.execute_command.call_count == 2 + + +@pytest.mark.asyncio +async def test_get_element_text(web_element): + test_html = '
Test Text
' + web_element._connection_handler.execute_command.return_value = { + 'result': {'outerHTML': test_html} + } + + text = await web_element.get_element_text() + assert text == 'Test Text' + + +@pytest.mark.asyncio +async def test_scroll_into_view(web_element): + await web_element.scroll_into_view() + web_element._connection_handler.execute_command.assert_called_once_with( + DomCommands.scroll_into_view(object_id='test-object-id'), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_click_using_js_not_visible(web_element): + web_element._execute_script = AsyncMock( + return_value={'result': {'result': {'value': False}}} + ) + + with pytest.raises(ElementNotVisible): + await web_element.click_using_js() + + +@pytest.mark.asyncio +async def test_click_using_js_not_interactable(web_element): + web_element._execute_script = AsyncMock( + side_effect=[ + {'result': {'result': {'value': True}}}, # _is_element_visible + {'result': {'result': {'value': False}}}, # click result + ] + ) + web_element.scroll_into_view = AsyncMock() + + with pytest.raises(ElementNotInteractable): + await web_element.click_using_js() + + +@pytest.mark.asyncio +async def test_click_using_js_option_tag(web_element): + option_element = WebElement( + 'test-id', + web_element._connection_handler, + method='css', + selector='#test', + attributes_list=[ + 'id', + 'test-id', + 'value', + 'test-value', + 'tag_name', + 'option', + ], + ) + option_element._execute_script = AsyncMock( + return_value={'result': {'result': {'value': False}}} + ) + + await option_element.click_using_js() + + web_element._connection_handler.execute_command.assert_called_once() + + +@pytest.mark.asyncio +async def test_click(web_element): + bounds = [0, 0, 100, 100, 100, 100, 0, 100] + web_element._connection_handler.execute_command.side_effect = [ + {'result': {'result': {'value': True}}}, # _is_element_visible + {'result': {'result': {'value': True}}}, # scroll_into_view + {'result': {'model': {'content': bounds}}}, # self.bounds + None, # mouse_press + None, # mouse_release + ] + await web_element.click() + assert web_element._connection_handler.execute_command.call_count == 5 + + +@pytest.mark.asyncio +async def test_click_element_not_visible(web_element): + web_element._is_element_visible = AsyncMock(return_value=False) + with pytest.raises(ElementNotVisible): + await web_element.click() + + +@pytest.mark.asyncio +async def test_click_bounds_key_error(web_element): + web_element._connection_handler.execute_command.side_effect = [ + {'result': {'result': {'value': True}}}, # _is_element_visible + {'result': {'result': {'value': True}}}, # scroll_into_view + {'result': {'model': {'invalid_key': [10]}}}, # self.bounds + { + 'result': { + 'result': { + 'value': '{"x": 0, "y": 0, "width": 100, "height": 100}' + } + } + }, # bounds_using_js + None, # mouse_press + None, # mouse_release + ] + + await web_element.click() + assert web_element._connection_handler.execute_command.call_count == 6 + + +@pytest.mark.asyncio +async def test_click_option_tag(web_element): + option_element = WebElement( + 'test-id', + web_element._connection_handler, + attributes_list=['tag_name', 'option', 'value', 'test-value'], + ) + + await option_element.click() + web_element._connection_handler.execute_command.assert_called_once() + + +@pytest.mark.asyncio +async def test__is_element_on_top(web_element): + web_element._connection_handler.execute_command.return_value = { + 'result': {'result': {'value': True}} + } + + result = await web_element._is_element_on_top() + assert result is True + + web_element._connection_handler.execute_command.return_value = { + 'result': {'result': {'value': False}} + } + + result = await web_element._is_element_on_top() + assert result is False + + +@pytest.mark.asyncio +async def test_send_keys(web_element): + test_text = 'Hello World' + await web_element.send_keys(test_text) + web_element._connection_handler.execute_command.assert_called_once_with( + InputCommands.insert_text(test_text), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_type_keys(web_element): + test_text = 'Hi' + with patch('asyncio.sleep') as mock_sleep: + await web_element.type_keys(test_text) + + assert web_element._connection_handler.execute_command.call_count == len( + test_text + ) + web_element._connection_handler.execute_command.assert_any_call( + InputCommands.key_press('H'), timeout=60 + ) + web_element._connection_handler.execute_command.assert_any_call( + InputCommands.key_press('i'), timeout=60 + ) + + +def test_calculate_center(): + bounds = [0, 0, 100, 0, 100, 100, 0, 100] # Rectangle corners + x_center, y_center = WebElement._calculate_center(bounds) + assert x_center == 50 + assert y_center == 50 + + +def test_get_attribute(web_element): + assert web_element.get_attribute('id') == 'test-id' + assert web_element.get_attribute('class_name') == 'test-class' + assert web_element.get_attribute('nonexistent') is None + + +@pytest.mark.asyncio +async def test_wait_element_success(web_element): + mock_element = MagicMock() + web_element.find_element = AsyncMock( + side_effect=[None, None, mock_element] + ) + + result = await web_element.wait_element('css', '#test-selector') + assert result == mock_element + assert web_element.find_element.call_count == 3 + + +@pytest.mark.asyncio +async def test_wait_element_timeout(web_element): + web_element.find_element = AsyncMock(return_value=None) + + with pytest.raises(TimeoutError): + await web_element.wait_element('css', '#test-selector', timeout=1) + + +@pytest.mark.asyncio +async def test_wait_element_no_exception(web_element): + web_element.find_element = AsyncMock(return_value=None) + + result = await web_element.wait_element( + 'css', '#test-selector', timeout=1, raise_exc=False + ) + assert result is None + + +@pytest.mark.asyncio +async def test_find_element_success(web_element): + node_response = {'result': {'result': {'objectId': 'test-object-id'}}} + + describe_response = { + 'result': { + 'node': {'nodeName': 'DIV', 'attributes': ['class', 'test-class']} + } + } + + web_element._connection_handler.execute_command.side_effect = [ + node_response, + describe_response, + ] + + element = await web_element.find_element('css', '.test-selector') + + assert isinstance(element, WebElement) + assert element._object_id == 'test-object-id' + assert element._search_method == 'css' + assert element._selector == '.test-selector' + assert 'test-class' in element._attributes.values() + + +@pytest.mark.asyncio +async def test_find_element_not_found(web_element): + web_element._connection_handler.execute_command.return_value = { + 'result': {'result': {}} + } + + with pytest.raises(ElementNotFound): + await web_element.find_element('css', '.non-existent') + + +@pytest.mark.asyncio +async def test_find_element_no_exception(web_element): + web_element._connection_handler.execute_command.return_value = { + 'result': {'result': {}} + } + + result = await web_element.find_element( + 'css', '.non-existent', raise_exc=False + ) + assert result is None + + +@pytest.mark.asyncio +async def test_find_elements_success(web_element): + find_elements_response = { + 'result': {'result': {'objectId': 'parent-object-id'}} + } + + properties_response = { + 'result': { + 'result': [ + {'value': {'type': 'object', 'objectId': 'child-1'}}, + {'value': {'type': 'object', 'objectId': 'child-2'}}, + ] + } + } + + node_description = { + 'result': { + 'node': {'nodeName': 'DIV', 'attributes': ['class', 'test-class']} + } + } + + web_element._connection_handler.execute_command.side_effect = [ + find_elements_response, + properties_response, + node_description, + node_description, + ] + + elements = await web_element.find_elements('css', '.test-selector') + + assert len(elements) == 2 + assert all(isinstance(elem, WebElement) for elem in elements) + assert elements[0]._object_id == 'child-1' + assert elements[1]._object_id == 'child-2' + + +@pytest.mark.asyncio +async def test_find_elements_not_found(web_element): + web_element._connection_handler.execute_command.return_value = { + 'result': {'result': {}} + } + + with pytest.raises(ElementNotFound): + await web_element.find_elements('css', '.non-existent') + + +@pytest.mark.asyncio +async def test_find_elements_no_exception(web_element): + web_element._connection_handler.execute_command.return_value = { + 'result': {'result': {}} + } + + result = await web_element.find_elements( + 'css', '.non-existent', raise_exc=False + ) + assert result == [] + + +@pytest.mark.asyncio +async def test_describe_node(web_element): + expected_node = {'nodeName': 'DIV', 'attributes': ['class', 'test-class']} + + web_element._connection_handler.execute_command.return_value = { + 'result': {'node': expected_node} + } + + result = await web_element._describe_node('test-object-id') + assert result == expected_node + web_element._connection_handler.execute_command.assert_called_once_with( + DomCommands.describe_node(object_id='test-object-id'), timeout=60 + ) + + +@pytest.mark.asyncio +async def test_execute_command(web_element): + expected_response = {'result': 'test'} + web_element._connection_handler.execute_command.return_value = ( + expected_response + ) + + test_command = {'method': 'test', 'params': {}} + result = await web_element._execute_command(test_command) + + assert result == expected_response + web_element._connection_handler.execute_command.assert_called_once_with( + test_command, timeout=60 + )