In [6]:
import logging
import multiprocessing
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.common.by import By
from selenium.webdriver.edge.options import Options as EdgeOptions
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait

# Configure logging
logging.basicConfig(level=logging.INFO)
# Define browser options and URLs
browser_options = {
    "chrome": ChromeOptions(),
    "firefox": FirefoxOptions(),
    "edge": EdgeOptions(),
}
hub_url = 'http://localhost:4444/wd/hub'


# Use 'http://localhost:4444/wd/hub' for local testing.
# If running tests on a remote hub, replace 'localhost' with the hub's IP address.
def setup_driver(browser):
    try:
        options = browser_options.get(browser)
        driver = webdriver.Remote(command_executor=hub_url, options=options)
        return driver
    except WebDriverException as e:
        logging.error(f"Error setting up {browser} WebDriver: {e}")
        return None


def test_check_stable_python_version(browser):
    driver = setup_driver(browser)
    if driver is None:
        return
    try:
        driver.get("https://www.selenium.dev")
        downloads_link = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.LINK_TEXT, "Downloads"))
        )
        downloads_link.click()
        WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, "//p[contains(text(), 'Python')]"))
        )
        # Take a screenshot when the condition is met
        driver.save_screenshot(f"{browser}_screenshot.png")
        logging.info(f"Screenshot taken for {browser}")
    except Exception as e:
        logging.error(f"Error in {browser} test: {e}")
    finally:
        driver.quit()


if __name__ == "__main__":
    browsers = ["chrome", "firefox", "edge"]  # Define the browsers to test
    processes = []
    for browser in browsers:
        process = multiprocessing.Process(target=test_check_stable_python_version, args=(browser,))
        processes.append(process)
        process.start()
    for process in processes:
        process.join()

INFO:root:Screenshot taken for chrome


In [39]:
import requests, json


def clear_sessions(session_id=None):
    """
    Here we query and delete orphan sessions
    docs: https://www.selenium.dev/documentation/grid/advanced_features/endpoints/
    :return: None
    """
    url = "http://127.0.0.1:4444"
    if not session_id:
        # delete all sessions
        r = requests.get("{}/status".format(url))
        data = json.loads(r.text)
        for node in data["value"]["nodes"]:
            for slot in node["slots"]:
                if slot["session"]:
                    id = slot["session"]["sessionId"]
                    r = requests.delete("{}/session/{}".format(url, id))
    else:
        # delete session from params
        r = requests.delete("{}/session/{}".format(url, session_id))


clear_sessions()