# adulib.llm

In [None]:
#|default_exp llm

In [None]:
#|hide
import nblite; from nbdev.showdoc import show_doc; nblite.nbl_export()

In [None]:
#|export
try:
    import os
    import json
    import tempfile
    from pathlib import Path
    from openai import OpenAI, AsyncOpenAI
    import tiktoken
    from asynciolimiter import Limiter
    import diskcache
    from typing import List, Optional, Type, Dict, Any
    import asyncio
    import time
    from tqdm import tqdm
    from pydantic import BaseModel
except ImportError as e:
    raise ImportError(f"Install adulib[{__name__.split('.')[-1]}] to use this API.") from e

In [None]:
from dotenv import load_dotenv

In [None]:
import adulib.llm

In [None]:
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")

In [None]:
#|export
model_context_windows = {
    'gpt-4o': 128000 * 0.8,
    'gpt-4o-mini': 128000 * 0.8,
}

model_rate_limits = {
    'gpt-4o': 10000,
    'gpt-4o-mini': 30000,
}

In [None]:
show_doc(adulib.llm.tokenize_text)

---

### tokenize_text

>      tokenize_text (text, llm_model)

In [None]:
#|export
__model_tokenisers = {}

def tokenize_text(text, llm_model):
    if llm_model not in __model_tokenisers:
        __model_tokenisers[llm_model] = tiktoken.encoding_for_model(llm_model)
    return __model_tokenisers[llm_model].encode(text)

In [None]:
tokenize_text("hello world", 'gpt-4o')

[24912, 2375]

In [None]:
show_doc(adulib.llm.detokenize_text)

---

### detokenize_text

>      detokenize_text (tokens, llm_model)

In [None]:
#|export
def detokenize_text(tokens, llm_model):
    if llm_model not in __model_tokenisers:
        __model_tokenisers[llm_model] = tiktoken.encoding_for_model(llm_model)
    tokeniser_enc = tiktoken.encoding_for_model(llm_model)
    return tokeniser_enc.decode(tokens)

In [None]:
llm_model = 'gpt-4o'
detokenize_text(tokenize_text("hello world", llm_model), llm_model)

'hello world'

In [None]:
show_doc(adulib.llm.get_token_count)

---

### get_token_count

>      get_token_count (text, llm_model)

In [None]:
#|export
def get_token_count(text, llm_model):
    return len(tokenize_text(text, llm_model))

In [None]:
get_token_count("hello world", "gpt-4o")

2

In [None]:
show_doc(adulib.llm.async_prompt)

---

### async_prompt

>      async_prompt (model, prompt, context='You are a helpful assistant.',
>                    api_key=None, response_format=None, use_cache=True,
>                    cache_dir=None, include_model_in_cache_key=False,
>                    cache_key_prepend='')

In [None]:
#|export
__client = None
__model_rate_limiters = {}
__llm_cache = None

async def async_prompt(model,
                           prompt,
                           context="You are a helpful assistant.",
                           api_key=None,
                           response_format=None,
                           use_cache=True,
                           cache_dir=None,
                           include_model_in_cache_key=False,
                           cache_key_prepend=''):
    global __client, __llm_cache
    if __client is None:
        if api_key is None: api_key = os.environ.get("PROJ_OPENAI_API_KEY")
        __client = AsyncOpenAI(api_key=api_key)
        
    if model not in __model_rate_limiters:
        __model_rate_limiters[model] = Limiter(model_rate_limits[model]/60)
        
    response_schema = str(response_format.model_json_schema()) if response_format else ""
        
    if __llm_cache is None:
        if cache_dir is None: cache_dir = tempfile.mkdtemp()
        abs_cache_path = Path(cache_dir).resolve().as_posix()
        __llm_cache = diskcache.Cache(abs_cache_path, eviction_policy="none", size_limit=2**40)
    
    _model_key = model if include_model_in_cache_key else '*'
    cache_key = f'{cache_key_prepend}:{_model_key}:{prompt}:{context}:{response_schema}'
    
    if use_cache and cache_key in __llm_cache:
        output =  __llm_cache[cache_key]
    else:
        await __model_rate_limiters[model].wait()
        if not response_schema:
            chat_completion = await __client.chat.completions.create(
                messages=[
                    {
                        "role": "system",
                        "content": context
                    },
                    {
                        "role": "user",
                        "content": prompt,
                    }
                ],
                model=model,
            )
        else:
            chat_completion = await __client.beta.chat.completions.parse(
                messages=[
                    {
                        "role": "system",
                        "content": context
                    },
                    {
                        "role": "user",
                        "content": prompt,
                    }
                ],
                model=model,
                response_format=response_format
            )
        output = chat_completion.choices[0].message.content
        __llm_cache[cache_key] = output
    
    if response_format:
        return response_format(**json.loads(output))
    else:
        return output

In [None]:
await async_prompt(
    model='gpt-4o',
    context='You are a helpful assistant.',
    prompt='Hello, how are you?',
    cache_dir='./tmp/llm_cache'
)

"Hello! I'm just a computer program, so I don't have feelings, but I'm here and ready to help you with whatever you need. How can I assist you today?"

In [None]:
from pydantic import BaseModel
from pprint import pprint

In [None]:
class Step(BaseModel):
    explanation: str
    output: str

class MathReasoning(BaseModel):
    steps: list[Step]
    final_answer: str
    
res = await async_prompt(
    model='gpt-4o',
    context='You are a helpful math tutor. Guide the user through the solution step by step.',
    prompt='How can I solve 8x + 7 = -23?',
    response_format=MathReasoning,
    cache_dir='./tmp/llm_cache'
)

pprint(res.model_dump())

{'final_answer': 'x = -\\frac{15}{4}',
 'steps': [{'explanation': 'Subtract 7 from both sides to isolate the term '
                           'with x.',
            'output': '8x + 7 - 7 = -23 - 7'},
           {'explanation': 'Simplify both sides: 7 - 7 is 0 on the left, and '
                           'we calculate -23 - 7 on the right.',
            'output': '8x = -30'},
           {'explanation': 'Divide both sides by 8 to solve for x.',
            'output': 'x = -30 / 8'},
           {'explanation': 'Simplify the fraction by dividing both the '
                           'numerator and the denominator by 2.',
            'output': 'x = -15 / 4'}]}


In [None]:
show_doc(adulib.llm.async_prompts)

---

### async_prompts

>      async_prompts (model:str, context:str, items:List[str],
>                     prompt_template:str, response_format:Optional[Type[pydanti
>                     c.main.BaseModel]]=None, api_key:Optional[str]=None,
>                     cache_dir:Optional[str]=None,
>                     include_model_in_cache_key:bool=False,
>                     cache_key_prepend:str='', max_retries:int=3,
>                     timeout:int=30, time_window=20,
>                     concurrency_limit:Optional[int]=None)

In [None]:
#|export
async def async_prompts(
    model: str,
    context: str,
    items: List[str],
    prompt_template: str,
    response_format: Optional[Type[BaseModel]] = None,
    api_key: Optional[str] = None,
    cache_dir: Optional[str] = None,
    include_model_in_cache_key: bool = False,
    cache_key_prepend: str = '',
    max_retries: int = 3,
    timeout: int = 30,
    time_window = 20,
    concurrency_limit: Optional[int] = None
    ):
    semaphore = asyncio.Semaphore(concurrency_limit) if concurrency_limit else asyncio.Semaphore(len(items))
    
    async def process_item(item: str):
        prompt = prompt_template.format(item=item)

        for attempt in range(max_retries):
            async with semaphore:
                start_time = time.time()
                try:
                    task = asyncio.create_task(
                        async_prompt(
                            model=model,
                            prompt=prompt,
                            context=context,
                            response_format=response_format,
                            api_key=api_key,
                            use_cache=True,
                            cache_dir=cache_dir,
                            include_model_in_cache_key=include_model_in_cache_key,
                            cache_key_prepend=cache_key_prepend
                        )
                    )

                    done, pending = await asyncio.wait({task}, timeout=timeout)

                    if task in done:
                        return task.result()

                    else:
                        task.cancel()
                        print(f"[Timeout] '{item}' attempt {attempt + 1}/{max_retries} exceeded {timeout}s")

                except asyncio.CancelledError:
                    print(f"[Cancelled] '{item}' attempt {attempt + 1}/{max_retries}")

                except Exception as e:
                    print(f"[Error] '{item}' attempt {attempt + 1}/{max_retries}: {e}")

                await asyncio.sleep(2 ** attempt)  # exponential backoff

                elapsed = time.time() - start_time
                if elapsed < time_window:
                    await asyncio.sleep(time_window - elapsed)

        print(f"[Fail] '{item}' max retries exceeded.")
        return None

    tasks = {item: asyncio.create_task(process_item(item)) for item in items}
    output = []
    for item in tqdm(items, desc="Processing"):
        try: 
            result = await process_item(item)
            if result:
                output.append({
                    "item": item,
                    "response": result.model_dump() if hasattr(result, "model_dump") else result
                })
        except Exception as e:
            print(f"[Unhandled Error] '{item}': {e}")

    return output

In [None]:
class FilmRecommendation(BaseModel):
    title: str
    blurb: str
    review: str

res = await async_prompts(
    model='gpt-4o',
    context="""You are an eager member of staff at the last surviving Blockbuster store.
    Recommend a film to the user based on their stated preferences.
    You should give them the film title, the blurb in a sentence and then a quick one 
    sentence long review to hype them up for it.""",
    items = ["  a psychological horror film that will actually unsettle me",
                "  something very romantic, Y2K, nostalgic",
                "  strange, obscure and quite old",
                "  a film where I don't have to pay attention and can doomscroll on my phone"],
    prompt_template = "User preference: {item}",
    response_format=FilmRecommendation,
    cache_dir='./tmp/llm_cache'
)

Processing: 100%|██████████| 4/4 [00:00<00:00, 1160.17it/s]


In [None]:
res

[{'item': '  a psychological horror film that will actually unsettle me',
  'response': {'title': 'Hereditary',
   'blurb': 'When a family begins to unravel after the death of their secretive grandmother, they uncover terrifying truths about their ancestry that will test their sanity.',
   'review': 'This chilling masterpiece leaves you in a state of dread long after the credits roll, with haunting performances and an unforgettable atmosphere that digs deep under your skin.'}},
 {'item': '  something very romantic, Y2K, nostalgic',
  'response': {'title': 'Serendipity',
   'blurb': 'A romantic tale of fate and destiny, where two strangers, Jonathan and Sara, impulsively explore a connection over Christmas in New York City, only to leave their future up in the air to chance.',
   'review': 'This movie will sweep you off your feet with its enchanting story of love and destiny, leaving you with a warm nostalgia for the early 2000s romance genre.'}},
 {'item': '  strange, obscure and quite

In [None]:
show_doc(adulib.llm.prompt)

---

### prompt

>      prompt (model, prompt, context='You are a helpful assistant.',
>              api_key=None, response_format=None, use_cache=True,
>              cache_dir=None, include_model_in_cache_key=False,
>              cache_key_prepend='')

In [None]:
#|export
__client = None

def prompt(model,
                prompt,
                context="You are a helpful assistant.",
                api_key=None,
                response_format=None,
                use_cache=True,
                cache_dir=None,
                include_model_in_cache_key=False,
                cache_key_prepend=''):
    global __client, __llm_cache
    if __client is None:
        if api_key is None: api_key = os.environ.get("PROJ_OPENAI_API_KEY")
        __client = OpenAI(api_key=api_key)
        
    response_schema = str(response_format.model_json_schema()) if response_format else ""
        
    if __llm_cache is None:
        if cache_dir is None: cache_dir = tempfile.mkdtemp()
        abs_cache_path = Path(cache_dir).resolve().as_posix()
        __llm_cache = diskcache.Cache(abs_cache_path, eviction_policy="none", size_limit=2**40)
    
    _model_key = model if include_model_in_cache_key else '*'
    cache_key = f'{cache_key_prepend}:{_model_key}:{prompt}:{context}:{response_schema}'
    
    if use_cache and cache_key in __llm_cache:
        output =  __llm_cache[cache_key]
    else:
        if not response_schema:
            chat_completion = __client.chat.completions.create(  # Changed to synchronous call
                messages=[
                    {
                        "role": "system",
                        "content": context
                    },
                    {
                        "role": "user",
                        "content": prompt,
                    }
                ],
                model=model,
            )
        else:
            chat_completion = __client.beta.chat.completions.parse(  # Changed to synchronous call
                messages=[
                    {
                        "role": "system",
                        "content": context
                    },
                    {
                        "role": "user",
                        "content": prompt,
                    }
                ],
                model=model,
                response_format=response_format
            )
            
        output = chat_completion.choices[0].message.content
        __llm_cache[cache_key] = output
    
    if response_format:
        return response_format(**json.loads(output))
    else:
        return output

In [None]:
prompt(
    model='gpt-4o',
    context='You are a helpful assistant.',
    prompt='Hello, how are you?',
    cache_dir='./tmp/llm_cache'
)

"Hello! I'm just a computer program, so I don't have feelings, but I'm here and ready to help you with whatever you need. How can I assist you today?"