In [8]:
from dataclasses import dataclass, field
import platformdirs, itertools
from pathlib import Path

In [3]:
from typing import ForwardRef

In [9]:
@dataclass
class App:
    import platformdirs, pluggy, argparse
    from functools import partial
    name: str = "a11yhood"
    verbosity: int = 2
    cache: Path = field(default_factory=partial(platformdirs.user_cache_path, name))
    argv: list | tuple | None = None
    manager: pluggy.PluginManager = field(default_factory=partial(pluggy.PluginManager, name))
    dotenv: Path | None = None
    params: dict = field(default_factory=dict)
    @classmethod
    def from_argv(cls, *argv):
        pass
    
    def __post_init__(self):
        import dotenv
        dotenv.load_dotenv(self.dotenv)
        self.manager.add_hookspecs(self.Api)
    
    parser = argparse.ArgumentParser(name)
    
    impl = pluggy.HookimplMarker(name)
    
    class Api:
        import pluggy 
        spec = pluggy.HookspecMarker("a11yhood") 
        impl = pluggy.HookimplMarker("a11yhood") 
        
        @spec
        def set_config(self, argv, app,  manager)  -> None:
            """directly modify the configuration element"""
            pass
        
        @spec
        @impl(trylast=True)
        def set_environment(self, environ, app, manager) -> None:
            """modify the os environment variables"""
            
        @spec
        def set_params(self, params, app, manager) -> None:
            """directly modify the parameters to pass to the requests made"""
            
        @spec
        def get_responses(self, options, app, manager) -> list | None:
            """get the responses from remote requests"""
            # there isn't a reason to return anything cause we cache to disc for reuse
            
        @spec
        def get_frames(self, options, app, manager) -> None | ForwardRef("pandas.DataFrame"):
            """expand the config to cache the responses"""
            
        @spec
        def finalize(self, df, options, app, manager) -> None:
            pass
            
    
    def initialize(self, *implementations, argv: tuple | list | None = None):
        for implementation in implementations:
            self.manager.register(implementation)
        return self
    
    
                
    def expand(self, **options):
        import itertools, os
        
        process_async(self.manager.hook.set_config(argv=self.argv, app=self, manager=self.manager))
        
        
        process_async(self.manager.hook.set_environment(environ=os.environ, app=self, manager=self.manager))
        process_async(self.manager.hook.set_params(params=self.params, app=self, manager=self.manager)  )
        return list(
            itertools.chain.from_iterable(process_async(self.manager.hook.get_responses(options=options, app=self, manager=self.manager)))
        )
        
    def finalize(self, df=None, **options):
        if df is None:
            df = self.compact(**options)
        return list(
            itertools.chain.from_iterable(process_async(
                self.manager.hook.finalize(df=df, options=options, app=self, manager=self.manager)
            ))
        )
            
    
    def compact(self, **options):
        from pandas import concat
        frames = list(
            x for x in self.manager.hook.get_frames(options=options, app=self, manager=self.manager)
            if x is not None
        )
        if frames:
            return concat(frames)
    
    def main(self, *implementations, argv: tuple | list | None = None, run: bool = True, **options):
        self = self.initialize(*implementations, argv,)
        if run:
            self.expand(**options)
        results = self.compact(**options)
        self.finalize(results, **options)
        return results
        
        

def process_async(results):
    from asyncio import gather, get_event_loop, Future
    
    import nest_asyncio
    from inspect import iscoroutine
    coro = {}
    for i, result in enumerate(results):
        if iscoroutine(result) or isinstance(result, Future):
            coro[i] = result
            
    if coro:
        if not getattr(get_event_loop(), "_nest_patched", None):
            nest_asyncio.apply()
        values = get_event_loop().run_until_complete(gather(*coro.values()))
        for i, v in zip(coro, values):
            results[i] = v
        
    return results
        
            

In [10]:
def path_client(path, **kwargs):
    import requests_cache
    kwargs.setdefault("backend", "filesystem")
    return requests_cache.CachedSession(path, **kwargs)

# attach this the path type we use
type(Path()).client = path_client
