In [2]:
from openai import OpenAI
import json
from dotenv import load_dotenv, find_dotenv

_ : bool = load_dotenv(find_dotenv()) # read local .env file

## Step 1 Create Function And Pydantic Modals To Control The Map

#### A. Pydantic Modals

In [3]:
# models layer
from pydantic import BaseModel, model_validator
from typing_extensions import Annotated

# Validator functions
def validate_latitude(v: float) -> float:
    assert -90 <= v <= 90, 'Invalid latitude'
    return v

def validate_longitude(v: float) -> float:
    assert -180 <= v <= 180, 'Invalid longitude'
    return v

# Annotated types
Latitude = Annotated[float, validate_latitude]
Longitude = Annotated[float, validate_longitude]

# Pydantic models
class MapState(BaseModel):
    latitude: Latitude
    longitude: Longitude
    zoom: float

class MarkersState(BaseModel):
    latitudes: list[Latitude]
    longitudes: list[Longitude]
    labels: list[str]

    @model_validator(mode='after')
    def validate_marker_length(self):
        if len(self.latitudes) != len(self.longitudes) or len(self.latitudes) != len(self.labels):
            raise ValueError(
                "Latitudes, longitudes, and labels must have the same number of elements")
        return self

#### B. Function To Get Updated Map Coordindates and an inital Map State

In [4]:
# data layer
from typing import Optional, Any
from pydantic import ValidationError

# Initial map state with class instances
ai_powered_map: dict[str, Any] = {
    "map_state": MapState(latitude=39.949610, longitude=75.150282, zoom=16).model_dump(),
    "markers_state": MarkersState(latitudes=[], longitudes=[], labels=[]).model_dump()
}

# Function to update map and markers
def update_map_and_markers(
    map_state: Optional[MapState] = None,
    markers_state: Optional[MarkersState] = None
) -> dict[str, Any]:

    response_format = {"status": "", "values": ai_powered_map}
    try:
        if map_state is not None:
            ai_powered_map["map_state"] = map_state.model_dump()

        if markers_state is not None:
            ai_powered_map["markers_state"] = markers_state.model_dump()

        response_format["status"] = "Map location and markers updated Now continue answering my last question"

    except ValidationError as e:
        response_format["status"] = f"Error update map: {
            e}, continue answering my last question"

    return response_format

## Step 2. Building the Open AI Streaming Travel AI Service

#### A. OpenAI Schema for the Function & Base Prompt

In [5]:
from openai.types.shared_params import FunctionDefinition

map_ai_control_schema = FunctionDefinition(
    name="update_map_and_markers",
    description="Update map to center on a particular location and add list of markers to the map",
    parameters={
        "type": "object",
        "properties": {
            "longitude": {
                "type": "number",
                "description": "Longitude of the location to center the map on"
            },
            "latitude": {
                "type": "number",
                "description": "Latitude of the location to center the map on"
            },
            "zoom": {
                "type": "integer",
                "description": "Zoom level of the map"
            },
            "longitudes": {
                "type": "array",
                "items": {
                    "type": "number"
                },
                "description": "List of longitudes for each marker"
            },
            "latitudes": {
                "type": "array",
                "items": {
                    "type": "number"
                },
                "description": "List of latitudes for each marker"
            },
            "labels": {
                "type": "array",
                "items": {
                    "type": "string"
                },
                "description": "List of labels for each marker"
            }
        },
        "required": ["longitude", "latitude", "zoom", "longitudes", "latitudes", "labels"]
    }
)

In [6]:
from openai.types.chat.chat_completion_tool_param import ChatCompletionToolParam

map_ai_control_tool: ChatCompletionToolParam = ChatCompletionToolParam(
    function=map_ai_control_schema, type="function")

In [7]:
## To Map the received name to the function
available_functions = {
    "update_map_and_markers": update_map_and_markers,
}

In [8]:
## Seed Prompt
BASE_PROMPT: str = """You are an AI Travel Assistant who make global travellers traval planning fun and interactive:

Before replying perform the following steps:

1. If user share any travel location name, update the map to go to that place and Add markers on the place.
2. if user shared any travel suggestions update them map.

If user sends any general message share with them you are a helpful AI Travel Assistant and you can help them with travel planning.

"""

In [9]:
# data layer to manage chat_history
import shelve

class Database:
    def __init__(self, dbName: str = "chat_history") -> None:
        self.dbName = dbName

    # Load chat history from shelve file
    def load_chat_history(self) -> [dict]:
        with shelve.open(self.dbName) as db:
            return db.get("messages", [{"role": "system", "content": BASE_PROMPT}])

    # Save chat history to shelve file

    def save_chat_history(self, messages: [dict]):
        print("Database: Save", messages)
        with shelve.open(self.dbName) as db:
            db["messages"] = messages



In [10]:
# Service Layer 

# A Class to Call the Assistant, Stream Text and Function Calling and send the final response back
from dotenv import load_dotenv, find_dotenv
from openai import OpenAI
from typing import Any
from openai import Stream
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk
# Service Layer

# A Class to Call the Assistant, Stream Text and Function Calling and send the final response back
class OpenAITravelBotModel:
    def __init__(self, name: str, model: str = "gpt-3.5-turbo-1106") -> None:
        self.name: str = name
        self.model: str = model
        load_dotenv(find_dotenv())
        self.client: OpenAI = OpenAI()
        self.db: Database = Database()
        self.messages = self.load_chat_history()
        self.map_control_values = ai_powered_map

    def load_chat_history(self) -> []:
        return self.db.load_chat_history()

    def save_chat_history(self):
        # print("Model: Save", self.messages)
        self.db.save_chat_history(messages=self.messages)

    def delete_chat_history(self):
        print("Model: Delete")
        self.messages = [{"role": "system", "content": BASE_PROMPT}]
        self.save_chat_history()

    def get_messages(self) -> [dict]:
        return self.messages

    def append_message(self, message: dict):
        self.messages.append(message)

    def get_map_control_values(self):
        return self.map_control_values

    def send_message(self, message: dict | None = None, append_message: bool = True, include_func_messages: bool = False, func_message_list: list[dict] | None = None) -> Any:
        # Appending our own message to the chat history
        if append_message and message is not None:
            self.append_message(message=message)

        # We will not include the function calling cycle message in our chat array i.e Assistant : Call Function | User: Complted
        if include_func_messages and func_message_list is not None:
            combined_list = self.messages + func_message_list
            message_stream = combined_list
            # print("Combined message_stream", message_stream)
        else:
            message_stream = self.messages
            # print("Regular message_stream", message_stream)


        stream: Stream[ChatCompletionChunk] = self.client.chat.completions.create(
            model=self.model,
            messages=message_stream,
            stream=True,
            tools=[map_ai_control_tool],
        )
        return stream


    def run_streaming_assistant(self, prompt: str):
        response_stream = self.send_message(
            {"role": "user", "content": prompt})

        current_tool_name = None
        accumulated_args = ""

        for message in response_stream:
            response_message = message.choices[0].delta

            if response_message.content:
                yield response_message.content
            elif response_message.tool_calls:
                for tool_call in response_message.tool_calls:
                    tool_function = tool_call.function

                    if tool_function:
                        if tool_function.name:
                            if current_tool_name:
                                # Process the previous tool call before starting a new one
                                yield self.process_streaming_tool_call(current_tool_name, accumulated_args)

                            current_tool_name = tool_function.name
                            accumulated_args = tool_function.arguments
                        else:
                            # Append fragment to the accumulated arguments string
                            accumulated_args += tool_function.arguments

        # Process the last tool call if it exists
        if current_tool_name:
            function_call_response = self.process_streaming_tool_call(
                current_tool_name, accumulated_args)

            # Calling OpenAI Again to get the final response
            print("Calling OpenAI Again to get the final response")
            second_response_stream = self.send_message(
                message=None,
                append_message=False,
                include_func_messages=True,
                func_message_list=[{"role": "assistant", "content": f'Call {current_tool_name} with arguments: {accumulated_args}'}, {"role": "user", "content": function_call_response}]

            )
            for message in second_response_stream:
                response_message = message.choices[0].delta

                if response_message.content:
                    print("response_message", response_message.content)
                    yield response_message.content

        # Signal the end of the stream
        yield "__END__"

    def process_streaming_tool_call(self, tool_name, args_str):
        print(f"Processed {tool_name} with arguments: {args_str}")
        # Append the tool call to the chat history - assistant response
        # self.messages.append({"role": "assistant", "content": f'Call {tool_name} with arguments: {args_str}'})

        try:
            # Parse the argument string into a dictionary
            args = json.loads(args_str)
            function_to_call = available_functions[tool_name]

            map_state: MapState | None = None
            markers_state: MarkersState | None = None

            # Create MapState object if map-related args are present
            if 'latitude' in args and 'longitude' in args and 'zoom' in args:
                map_state = MapState(
                    latitude=args['latitude'],
                    longitude=args['longitude'],
                    zoom=args['zoom']
                )
                # print ("map_state", map_state)

            # Create MarkersState object if marker-related args are present
            if 'latitudes' in args and 'longitudes' in args and 'labels' in args:
                markers_state = MarkersState(
                    latitudes=args['latitudes'],
                    longitudes=args['longitudes'],
                    labels=args['labels']
                )

                print

            update_map_res = function_to_call(
                map_state=map_state,
                markers_state=markers_state
            )

            print("map_update_call", update_map_res['status'])
            # print ("map_update_call", update_map_res['values'])
            self.map_control_values = update_map_res['values']

            return update_map_res['status']

        except KeyError:
            return f"Error: {tool_name} is not a valid tool name"

        except ValidationError as e:
            return f"Error: {e}"

        except AssertionError as e:
            return f"Error: {e}"

        except Exception as e:
            return f"Error: {e}"

In [11]:
ai_powered_map

{'map_state': {'latitude': 39.94961, 'longitude': 75.150282, 'zoom': 16.0},
 'markers_state': {'latitudes': [], 'longitudes': [], 'labels': []}}

#### Testing The OpenAITravelBotModel Class

In [13]:
bot = OpenAITravelBotModel("My Travel Assistant")

In [14]:
def openai_streaming_travel_ai(prompt: str):
    # Collect Streaming CHunks for modal Response
    complete_response = ""

    # Example usage
    for response in bot.run_streaming_assistant(prompt):
        #  We will get chunks of response from the assistant and will stream it in web layer.
        yield (response)
        if response == "__END__":
            break
        complete_response += response  # Accumulate the response

    print('complete_response', complete_response)
    bot.messages.append({"role": "assistant", "content": complete_response})

In [15]:
for part_res in openai_streaming_travel_ai("Show Kulala Lumpur on Map"):
        # Put each character into the queue
        print(part_res)

Processed update_map_and_markers with arguments: {"longitude":101.6869,"latitude":3.139,"zoom":12,"longitudes":[101.6869],"latitudes":[3.139],"labels":["Kuala Lumpur"]}
map_update_call Map location and markers updated Now continue answering my last question
Calling OpenAI Again to get the final response
response_message As
As
response_message  an
 an
response_message  AI
 AI
response_message  Travel
 Travel
response_message  Assistant
 Assistant
response_message ,
,
response_message  I
 I
response_message 'm
'm
response_message  here
 here
response_message  to
 to
response_message  help
 help
response_message  you
 you
response_message  with
 with
response_message  travel
 travel
response_message  planning
 planning
response_message  and
 and
response_message  any
 any
response_message  questions
 questions
response_message  you
 you
response_message  have
 have
response_message  about
 about
response_message  your
 your
response_message  travel
 travel
response_message  destinations
 

In [16]:
bot.get_map_control_values()

{'map_state': {'latitude': 3.139, 'longitude': 101.6869, 'zoom': 12.0},
 'markers_state': {'latitudes': [3.139],
  'longitudes': [101.6869],
  'labels': ['Kuala Lumpur']}}

### Additional Functions to Create Seamless AI Travel Agent Experience

In [17]:
# a. Get Map Control Values
def get_map_coordinates():
    return bot.get_map_control_values()


# b. Save Current Response to Database (Shelf currently)
def save_chat_to_db():
    return bot.save_chat_history()


# c. Get Database Chat (Shelf currently)
def load_database_chat_history():
    return bot.load_chat_history()


# d. Get All Messages Present In Class Instance
def get_all_messages():
    return bot.get_messages()
    
def delete_chat_history():
    return bot.delete_chat_history()

## Create AI Powered Map Interactions using Plotly

Now whenever we call the bot.run_streaming_assistant() function, it will return a generator that yields the assistant’s responses as they come in. 

We can then use these responses to update the chat interface. Function Calling is determined and completed by the OpenAI llm. And after which the ai_powered_map state is updated

Now let's plot the maps using Plotly and create a function that response and then updated the map

In [None]:
%pip install plotly

### Function To Plot and Update the Map

In [18]:
import plotly.graph_objects as go
from typing import Union

# Function to create and display the map
def create_map(map_state: dict[str, Union[float, str]], markers_state: dict[str, list[Union[float, str]]]) -> None:
    
    figure = go.Figure(go.Scattermapbox(mode="markers"))
    
    figure.add_trace(
        go.Scattermapbox(
            mode="markers",
            marker=dict(color='red', size=14),
            lat=markers_state["latitudes"],
            lon=markers_state["longitudes"],
            text=markers_state["labels"]
        )
    )
    
    figure.update_layout(
        mapbox=dict(
            style="open-street-map",
            # accesstoken=MAPBOX_ACCESS_TOKEN, # If you don't have MAPBOX token, replace with style="open-street-map".
            center=go.layout.mapbox.Center(
                lat=map_state["latitude"],
                lon=map_state["longitude"]
            ),
            zoom=map_state["zoom"]
        ),
        margin=dict(l=0, r=0, t=0, b=0)
    )
    figure.show()

In [19]:
create_map(map_state=ai_powered_map['map_state'], markers_state=ai_powered_map['markers_state'])


Congrats on building the Streaming Travel AI Assistant Proof of concept proof of concept. Currently the concept is well suited for single user.

- We have used shelf to store user messages and create a stateful Travel AI Assistant Experience
- For general queries we immediatily start getting the streamed response
- For Travel  queries that assistant does Function Calling to update Map Coordianted and returns the Streamed Response.
- Finally we have used plotly to visualzie how will AI Powered Map interactions Look like.

Now we have the complete code and we can move on designing the FastAPI Microservice. We will use the above plotly map to create AI powered map interactions with streamlit.

Does the Map looks outdated? Go to MapBox, signup and get your access token.

Next in the create_map function you can pass this token comment the map-style argument,

    figure.update_layout(
        mapbox=dict(
            accesstoken=MAPBOX_ACCESS_TOKEN, # If you don't have MAPBOX token, replace with style="open-street-map".
            center=go.layout.mapbox.Center(
                lat=map_state["latitude"],
                lon=map_state["longitude"]
            ),
            zoom=map_state["zoom"]
        ),
        margin=dict(l=0, r=0, t=0, b=0)
    )