In [None]:
import pandas
import asyncio
import sys
import traceback
from playwright.async_api import async_playwright, Playwright, expect
from playwright_stealth import stealth_async
from collections.abc import Callable

wh = None
configs = {
    'NavigationDefaultTimeoutSec': 5,
    'PageDefaultTimeoutSec': 5,
}

class ManualLoopInterrupt(Exception):
    pass

In [None]:
class WebHandler:
    def __init__(self):
        self.callee = None
        
    async def main_loop(self):
        async with async_playwright() as playwright:
            global browser_context
            chromium = playwright.chromium
            browser_context = await chromium.launch_persistent_context('', headless=False)
            browser_context.set_default_navigation_timeout(configs['NavigationDefaultTimeoutSec'] * 1000)
            browser_context.set_default_timeout(configs['PageDefaultTimeoutSec'] * 1000)
            stealth_page = browser_context.pages[0]
            await stealth_async(stealth_page)
            
            print('Launched!')
            while True:
                try:
                    if isinstance(self.callee, Callable):
                        await self.callee(stealth_page, browser_context)
                except ManualLoopInterrupt:
                    break
                except:
                    print(''.join(traceback.format_exception(*sys.exc_info())))
                finally:
                    self.callee = None
                    await asyncio.sleep(1)
                    
            await browser_context.close()

In [None]:
global wh
wh = WebHandler()
asyncio.create_task(wh.main_loop())

In [None]:
async def test(stealth_page, browser_context):
    # raise ManualLoopInterrupt()
    print(type(browser_context))

wh.callee = test

In [None]:
async def test(stealth_page, browser_context):
    # raise ManualLoopInterrupt()
    print(f'stealth_page.url: {stealth_page.url}')
    await stealth_page.goto('https://www.google.com')
    print(f'stealth_page.url: {stealth_page.url}')

wh.callee = test

In [None]:
async def test(stealth_page, browser_context):
    # raise ManualLoopInterrupt()
    print(f'stealth_page.url: {stealth_page.url}')
    await stealth_page.goto('https://arh.antoinevastel.com/bots/areyouheadless')
    print(f'stealth_page.url: {stealth_page.url}')

wh.callee = test

In [None]:
async def test(stealth_page, browser_context):
    # raise ManualLoopInterrupt()
    msg_div = stealth_page.locator('#res')
    msg_contents = await msg_div.inner_text()
    print(f'msg_contents: {msg_contents}')

wh.callee = test

In [None]:
async def test(stealth_page, browser_context):
    # raise ManualLoopInterrupt()
    # stealth_page = await browser_context.new_page()
    # await stealth_async(stealth_page)
    user_agent = await stealth_page.evaluate("""() => navigator.userAgent""")
    print(f'user_agent: {user_agent}')

wh.callee = test

In [None]:
async def test(stealth_page, browser_context):
    # raise ManualLoopInterrupt()
    api_req_context = browser_context.request
    response = await api_req_context.get('https://www.pagina12.com.ar/robots.txt')
    print(response)
    text = await response.text()
    print(text)

wh.callee = test