In [11]:
from dotenv import load_dotenv
load_dotenv(override=True)


True

In [12]:
from langsmith import traceable

inputs = [
  {"role": "system", "content": "You are a mysterious tavern keeper who speaks in riddles and has seen countless adventures unfold in your establishment."},
  {"role": "user", "content": "I seek a table for two weary travelers who have journeyed far through enchanted forests."},
]

output = {
  "choices": [
      {
          "message": {
              "role": "assistant",
              "content": "Ah, two souls seeking respite... The corner table by the flickering candlelight awaits, but tell me, do you prefer to dine when the moon reaches its zenith, or when the shadows grow long with twilight's embrace?"
          }
      }
  ]
}

@traceable(
  run_type="llm",
  metadata={
      "ls_provider": "openai",
      "ls_model_name": "gpt-4"
  }
)
def chat_model(messages: list):
  return output

chat_model(inputs)

{'choices': [{'message': {'role': 'assistant',
    'content': "Ah, two souls seeking respite... The corner table by the flickering candlelight awaits, but tell me, do you prefer to dine when the moon reaches its zenith, or when the shadows grow long with twilight's embrace?"}}]}

In [13]:
def _reduce_chunks(chunks: list):
    all_text = "".join([chunk["choices"][0]["message"]["content"] for chunk in chunks])
    return {"choices": [{"message": {"content": all_text, "role": "assistant"}}]}

@traceable(
    run_type="llm",
    metadata={"ls_provider": "my_provider", "ls_model_name": "my_model"},
    reduce_fn=_reduce_chunks,
)
def my_streaming_chat_model(messages: list):
    for chunk in ["Greetings, noble " + messages[1]["content"] + "! Welcome to our magical realm where adventure awaits!"]:
        yield {
            "choices": [
                {
                    "message": {
                        "content": chunk,
                        "role": "assistant",
                    }
                }
            ]
        }

list(
    my_streaming_chat_model(
        [
            {"role": "system", "content": "You are an ancient wizard's talking mirror that greets all who dare approach with mystical salutations."},
            {"role": "user", "content": "Zara the Dragon Whisperer"},
        ],
    )
)

[{'choices': [{'message': {'content': 'Greetings, noble Zara the Dragon Whisperer! Welcome to our magical realm where adventure awaits!',
     'role': 'assistant'}}]}]

In [14]:
	
from langsmith import traceable

def _convert_docs(results):
  return [
      {
          "page_content": r,
          "type": "Document",
          "metadata": {"ancient_library": "Alexandria", "scroll_condition": "mystical"}
      }
      for r in results
  ]

@traceable(
    run_type="retriever"
)
def retrieve_docs(query):
  contents = [
      "The Sacred Tome of Elemental Magic reveals the secrets of fire, water, earth, and air...", 
      "Chronicle of the Last Dragon: Here lies the tale of Pyraxis, the golden drake who guarded the Crystal of Eternity...", 
      "Alchemist's Handbook: Chapter XVII - The Philosopher's Stone and the Art of Transmutation..."
  ]
  return _convert_docs(contents)

retrieve_docs("Searching the forbidden archives for ancient spells of protection")

[{'page_content': 'The Sacred Tome of Elemental Magic reveals the secrets of fire, water, earth, and air...',
  'type': 'Document',
  'metadata': {'ancient_library': 'Alexandria',
   'scroll_condition': 'mystical'}},
 {'page_content': 'Chronicle of the Last Dragon: Here lies the tale of Pyraxis, the golden drake who guarded the Crystal of Eternity...',
  'type': 'Document',
  'metadata': {'ancient_library': 'Alexandria',
   'scroll_condition': 'mystical'}},
 {'page_content': "Alchemist's Handbook: Chapter XVII - The Philosopher's Stone and the Art of Transmutation...",
  'type': 'Document',
  'metadata': {'ancient_library': 'Alexandria',
   'scroll_condition': 'mystical'}}]

In [15]:
from langsmith import traceable
from openai import OpenAI
from typing import List, Optional
import json

openai_client = OpenAI()

@traceable(
  run_type="tool"
)
def get_current_temperature(location: str, unit: str):
    return 65 if unit == "Fahrenheit" else 17

@traceable(run_type="llm")
def call_openai(
    messages: List[dict], tools: Optional[List[dict]]
) -> str:
  return openai_client.chat.completions.create(
    model="gpt-4o-mini",
    messages=messages,
    temperature=0,
    tools=tools
  )

@traceable(run_type="chain")
def ask_about_the_weather(inputs, tools):
  response = call_openai(inputs, tools)
  tool_call_args = json.loads(response.choices[0].message.tool_calls[0].function.arguments)
  location = tool_call_args["location"]
  unit = tool_call_args["unit"]
  tool_response_message = {
    "role": "tool",
    "content": json.dumps({
        "location": location,
        "unit": unit,
        "temperature": get_current_temperature(location, unit),
    }),
    "tool_call_id": response.choices[0].message.tool_calls[0].id
  }
  inputs.append(response.choices[0].message)
  inputs.append(tool_response_message)
  output = call_openai(inputs, None)
  return output

tools = [
    {
      "type": "function",
      "function": {
        "name": "get_current_temperature",
        "description": "Consult the mystical weather spirits to divine the atmospheric conditions",
        "parameters": {
          "type": "object",
          "properties": {
            "location": {
              "type": "string",
              "description": "The kingdom or realm you wish to know about, e.g., The Floating City of Nimbus, The Frost Peaks of Valdris"
            },
            "unit": {
              "type": "string",
              "enum": ["Celsius", "Fahrenheit"],
              "description": "Choose your preferred mystical temperature measurement. The ancient scrolls suggest inferring from regional customs."
            }
          },
          "required": ["location", "unit"]
        }
      }
    }
]
inputs = [
  {"role": "system", "content": "You are Atmospherica, the Weather Oracle of the Seven Realms, capable of sensing the elemental forces across all lands."},
  {"role": "user", "content": "O wise Oracle, what do the wind spirits whisper about today's conditions in the great metropolis of New York?"},
]

ask_about_the_weather(inputs, tools)

ChatCompletion(id='chatcmpl-CMrBMgZTqB594rZJFV6chvzdHNXHr', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content="The wind spirits whisper of a temperate day in the great metropolis of New York, where the air is a gentle 17 degrees Celsius. A soft breeze dances through the streets, carrying with it the scents of autumn. The skies may be adorned with wisps of clouds, but the sun's warmth breaks through, inviting the denizens to embrace the day. It is a time for exploration and connection with the vibrant life of the city.", refusal=None, role='assistant', annotations=[], audio=None, function_call=None, tool_calls=None))], created=1759563684, model='gpt-4o-mini-2024-07-18', object='chat.completion', service_tier='default', system_fingerprint='fp_51db84afab', usage=CompletionUsage(completion_tokens=88, prompt_tokens=107, total_tokens=195, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning

In [16]:
from langsmith import traceable
import random

@traceable(
    run_type="tool"
)
def consult_storm_crystals(region: str):
    """Divine weather patterns by gazing into enchanted storm crystals"""
    mystical_conditions = {
        "sunny": "Golden rays of the Sun God illuminate the realm",
        "rainy": "The Sky Serpent weeps tears of silver across the land", 
        "cloudy": "Ancient cloud dragons dance through twilight skies",
        "stormy": "Thunder spirits clash in celestial battles above",
        "snowy": "Ice phoenixes scatter their frozen feathers from the heavens"
    }
    condition = random.choice(list(mystical_conditions.keys()))
    return {
        "region": region,
        "mystical_forecast": mystical_conditions[condition],
        "magical_intensity": random.randint(1, 10)
    }

@traceable(
    run_type="chain"
)
def forecast_the_seven_realms():
    """Grand weather divination across all mystical kingdoms"""
    realms = [
        "The Crystal Peaks of Eldoria",
        "Shadowmere Swamplands", 
        "The Floating Islands of Aethermoor",
        "Dragonfire Desert",
        "The Whispering Woods of Luna"
    ]
    
    prophecies = []
    for realm in realms:
        vision = consult_storm_crystals(realm)
        prophecies.append(f"In {vision['region']}: {vision['mystical_forecast']} (Intensity: {vision['magical_intensity']}/10)")
    
    return {
        "grand_prophecy": "Behold! The Great Weather Oracle has spoken...",
        "realm_forecasts": prophecies,
        "prophetic_wisdom": "The elements dance differently in each realm, mortal. Heed these visions well."
    }

divine_weather_vision = forecast_the_seven_realms()
print(divine_weather_vision)

{'grand_prophecy': 'Behold! The Great Weather Oracle has spoken...', 'realm_forecasts': ['In The Crystal Peaks of Eldoria: Ancient cloud dragons dance through twilight skies (Intensity: 2/10)', 'In Shadowmere Swamplands: Ice phoenixes scatter their frozen feathers from the heavens (Intensity: 2/10)', 'In The Floating Islands of Aethermoor: Thunder spirits clash in celestial battles above (Intensity: 2/10)', 'In Dragonfire Desert: The Sky Serpent weeps tears of silver across the land (Intensity: 8/10)', 'In The Whispering Woods of Luna: Ancient cloud dragons dance through twilight skies (Intensity: 2/10)'], 'prophetic_wisdom': 'The elements dance differently in each realm, mortal. Heed these visions well.'}
