In [None]:
import time

start = time.time()

In [None]:
import concurrent.futures
import json
import os
import re
import sys
import tkinter as tk
import typing as T
from string import punctuation
from threading import Lock
from tkinter import ttk
from typing import Dict, List, TypedDict

import dotenv
import googlemaps
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from geopy.geocoders import Nominatim
from langchain.output_parsers import PydanticOutputParser, ResponseSchema, StructuredOutputParser
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain.prompts import PromptTemplate
from langchain_community.utils.openai_functions import convert_pydantic_to_openai_function
from langchain_core.pydantic_v1 import BaseModel, Field, validator
from langchain_openai import ChatOpenAI
from openai import OpenAI

dotenv.load_dotenv()

# Secrets
GOOGLE_API_KEY = os.getenv("GOOGLE_PLACES_API_KEY")
assert GOOGLE_API_KEY is not None, "GOOGLE_PLACES_API_KEY is not set in the `.env` file"
OPEN_AI_API_KEY = os.getenv("OPEN_AI_API_KEY")
assert OPEN_AI_API_KEY is not None, "OPEN_AI_API_KEY is not set in the `.env` file"
MAPBOX_API_KEY = os.getenv("MAPBOX_API_KEY")
assert MAPBOX_API_KEY is not None, "MAPBOX_API_KEY is not set in the `.env` file"

CURRENT_DIR = %pwd
ROOT_DIR = os.path.dirname(CURRENT_DIR)
SRC_DIR = os.path.join(ROOT_DIR, "src")

sys.path.append(SRC_DIR)

In [None]:
gmaps = googlemaps.Client(key=GOOGLE_API_KEY)
px.set_mapbox_access_token(MAPBOX_API_KEY)

TYPES = [
    "restaurant",  # default
    "bakery",
    "sandwich_shop",
    "coffee_shop",
    "cafe",
    "fast_food_restaurant",
    "store",
    "food",
    "point_of_interest",
    "establishment",
]

DEFAULT_TYPE = TYPES[0]

In [None]:
inputs = {
    "location": "South Beach Miami, FL",
    "number_of_people": 6,
    "date": "November 2026",
    "duration_days": 3,
    "group_type": "bachelorette party",
    "description": (
        "include boutique hotel options must 4 stars higher onsite spa beach clubs djs"
        "day high end nightclubs list least six restaurant options dinner include least "
        "one nice steakhouse one nice sushi restaurant"
    ),
}

In [None]:
##############################################################################
# Skip this block if you want to use the hard coded inputs
##############################################################################
root = tk.Tk()
root.title("User Form")


def on_submit():
    location = location_entry.get()
    number_of_adults = adults_spinbox.get()
    date_of_month = date_entry.get()
    duration = duration_spinbox.get()
    group_info = group_entry.get()
    description = description_text.get("1.0", tk.END)

    inputs["location"] = location
    inputs["number_of_people"] = number_of_adults
    inputs["date"] = date_of_month
    inputs["duration_days"] = duration
    inputs["group_type"] = group_info
    inputs["description"] = description.strip()

    print("Form submitted with the following details:")
    print(f"Location: {location}")
    print(f"Number of Adults: {number_of_adults}")
    print(f"Date of Month: {date_of_month}")
    print(f"Duration (days): {duration}")
    print(f"Group Information: {group_info}")
    print(f"Description: {description.strip()}")
    root.destroy()


main_frame = ttk.Frame(root)
main_frame.grid(padx=10, pady=10, sticky="ew")

main_frame.columnconfigure(1, weight=1)

ttk.Label(main_frame, text="Location: *").grid(row=0, column=0, sticky="w")
location_entry = ttk.Entry(main_frame)
location_entry.grid(row=0, column=1, sticky="ew")

ttk.Label(main_frame, text="Number of Adults: *").grid(row=1, column=0, sticky="w")
adults_spinbox = ttk.Spinbox(main_frame, from_=1, to=100)
adults_spinbox.grid(row=1, column=1, sticky="ew")

ttk.Label(main_frame, text="Date of month: *").grid(row=2, column=0, sticky="w")
date_entry = ttk.Entry(main_frame)
date_entry.grid(row=2, column=1, sticky="ew")

ttk.Label(main_frame, text="Durations (day): *").grid(row=3, column=0, sticky="w")
duration_spinbox = ttk.Spinbox(main_frame, from_=1, to=30)
duration_spinbox.grid(row=3, column=1, sticky="ew")

ttk.Label(main_frame, text="Tell us about your group: *").grid(row=4, column=0, sticky="w")
group_entry = ttk.Entry(main_frame)
group_entry.grid(row=4, column=1, sticky="ew")

ttk.Label(
    main_frame,
    text=(
        "Briefly describe the trip you envision including "
        "the group's interests in activities, food etc:"
    ),
).grid(row=5, column=0, sticky="nw", pady=(10, 0))
description_text = tk.Text(main_frame, height=5)
description_text.grid(row=5, column=1, sticky="ew")

submit_button = ttk.Button(main_frame, text="FIND", command=on_submit)
submit_button.grid(row=6, column=1, sticky="ew", pady=10)

root.mainloop()

In [None]:
# these columns should match the members of the Itinerary class
DAY_COLUMN = "day"
ACTIVITY_TYPE_COLUMN = "activity_type"
LOCATION_COLUMN = "location"
DESCRIPTION_COLUMN = "description"


class Itinerary(BaseModel):
    day: List[str] = Field(
        description="a list that indicates the day of the corresponding activity"
    )
    activity_type: List[str] = Field(
        description="a list of the activities type of the itinerary like breakfast, dinner etc"
    )
    location: List[str] = Field(
        description="a list of the location that existis in google places of the corresponding activity type"
    )
    description: List[str] = Field(
        description="a brief description of the activity. The description must be 2-3 word phrases"
    )

In [None]:
METERS_PER_MILE = 1609.34


def get_model_fields(model: BaseModel):
    fields = {}
    for field_name, field_info in model.__fields__.items():
        fields[field_name] = {
            "type": field_info.type_,
            "description": field_info.field_info.description,
        }
    return fields


def calculate_tokens(data: Dict[str, str], output: Dict[str, str]) -> T.Tuple[int, int, int]:
    input_tokens = sum(len(str(value).split()) for value in data.values())
    output_tokens = sum(len(str(value).split()) for value in output.values())
    total_tokens = input_tokens + output_tokens

    return input_tokens, output_tokens, total_tokens


def miles_to_meters(miles: float) -> float:
    return miles * METERS_PER_MILE


def get_city_center_coordinates(city_name: str) -> T.Optional[T.Tuple[float, float]]:
    geolocator = Nominatim(user_agent="itinerary-planner")

    location = geolocator.geocode(city_name)

    if not location:
        return None

    return (location.latitude, location.longitude)


def clean_text(text: str) -> str:
    """
    Function to remove stop words, punctuation, and double quotes
    """
    text = text.lower()
    text = text.replace('"', "")
    text = text.replace("“", "").replace("”", "")
    text = text.replace("\n", " ")
    text = text.translate(str.maketrans("", "", punctuation))
    return text


def parse_itenerary_day(lines: T.List[str]) -> T.List[T.Tuple[str, str]]:
    day_plan = []
    # This pattern is designed to capture two groups separated by various delimiters
    pattern = re.compile(r"\-?\s*([\w\s]+?)\s*(?::|at|-)\s*([\w\s'&]+)")

    for line in lines:
        match = pattern.match(line)
        if match:
            activity_type, place = match.groups()
            activity_type = activity_type.strip()
            place = place.split("(")[0].strip()  # Removes anything within parentheses
            day_plan.append((activity_type, place))
        else:
            print(f"Could not parse line: {line}")

    return day_plan


def parse_itinerary_content(content: str) -> T.List[T.Dict[str, str]]:
    """Split the content into days and activities"""
    days = content.split("\n\n")
    data = []
    for day in days:
        lines = day.split("\n")
        day_number = lines[0].split(" ")[1]
        day_plan = parse_itenerary_day(lines[1:])
        for activity_type, place in day_plan:
            data.append(
                {
                    DAY_COLUMN: day_number,
                    ACTIVITY_TYPE_COLUMN: activity_type,
                    LOCATION_COLUMN: place,
                }
            )

    return data

In [None]:
openai_functions = [convert_pydantic_to_openai_function(Itinerary)]
parser = JsonOutputFunctionsParser()

prompt = PromptTemplate(
    template="""Given the user's input below:
Location: {location}
Number of People: {number_of_people}
Date: {date}
Duration (Days): {duration_days}
Tell Us About Your Group: {group_type}
Briefly describe the trip you envision including the groups interests in activities, food etc.: {description}

Create an itinerary providing the activities for breakfast, morning activity, lunch, afternoon activity, dinner and evening activity, the location which must exist in google places and a brief description of the activity or the place

Make sure to include all the itinerary days""",
    input_variables=[
        "location",
        "number_of_people",
        "date",
        "duration",
        "group_type",
        "description",
    ],
)

In [None]:
model = ChatOpenAI(api_key=OPEN_AI_API_KEY, temperature=0, model="gpt-3.5-turbo-0125")

chain = prompt | model.bind(functions=openai_functions) | parser
output = chain.invoke(inputs)

print(output)

input_tokens, output_tokens, total_tokens = calculate_tokens(inputs, output)
print(f"Input Tokens: {input_tokens}")
print(f"Output Tokens: {output_tokens}")
print(f"Total Tokens: {total_tokens}")

In [None]:
# Extract the itinerary content from the response
df = pd.DataFrame(output)

print(df)

In [None]:
# Inputs for the nearby lookup, if store_type is left as None, it will
# default to match the type of the place that we are searching nearby from
MAX_NEARBY_PLACES = 3

keyword = "breakfast"
store_type = None  # None will default to the type of the place that we are searching nearby from
radius_meters = 1500
# radius_from_place_miles = 1.0
# radius_meters = miles_to_meters(radius_from_place_miles)

In [None]:
city_coordinates = get_city_center_coordinates(inputs["location"])
print(f"{inputs['location']} coordinates: {city_coordinates}")

assert store_type in TYPES, f"Invalid store type: {store_type}, must be one of {','.join(TYPES)}"

itinerary_place_details = []
nearby_place_details = {}

lock = Lock()


def get_place_details(row, store_type=None):
    try:
        result = gmaps.places(query=row[LOCATION_COLUMN], location=city_coordinates)
    except:
        print(f"Unable to get places info for {row[LOCATION_COLUMN]}")
        return None

    if not result or result.get("status") != "OK":
        print(f"Unable to get places info for {row[LOCATION_COLUMN]}")
        return None

    place_result = result["results"][0]
    with lock:
        itinerary_place_details.append((row[LOCATION_COLUMN], place_result))

    store_type = store_type if store_type else place_result.get("types", [DEFAULT_TYPE])[0]

    print(
        f"Getting nearby places for {row[LOCATION_COLUMN]} at {place_result['geometry']['location']}"
    )
    try:
        nearby_places = gmaps.places_nearby(
            location=place_result["geometry"]["location"],
            radius=radius_meters,
            keyword=keyword if keyword else None,
            type=store_type,
        )
    except:
        print(f"Unable to get nearby places info for {row[LOCATION_COLUMN]}")
        return None

    if not nearby_places or nearby_places.get("status") != "OK":
        print(f"Unable to get nearby places info for {row[LOCATION_COLUMN]}")
        return None

    with lock:
        nearby_place_details[row[LOCATION_COLUMN]] = nearby_places["results"]

    print(f"Found {len(nearby_places['results'])} nearby places for {row[LOCATION_COLUMN]}")
    return None


with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(get_place_details, row, store_type) for _, row in df.iterrows()]
    concurrent.futures.wait(futures)

In [None]:
print(f"Num results: {len(itinerary_place_details)}")
for place, item in itinerary_place_details:
    print(item["name"])
    nearby_names = [i["name"] for i in nearby_place_details.get(place, [])]
    for name in nearby_names:
        print(f"  - {name}")

In [None]:
total_time = time.time() - start

print(f"Total time taken: {total_time:.2f} seconds")

In [None]:
# Create a map showing the places in the itinerary and nearby places
print("Creating map...")

places_dfs = {}
color = 0
color_increment = 1.0 / len(itinerary_place_details)
for name, place in itinerary_place_details:
    color += color_increment
    rows = [(place["name"], *place["geometry"]["location"].values(), color)]
    max_clamp = min(MAX_NEARBY_PLACES, len(nearby_place_details.get(name, [])))
    rows.extend(
        [
            (x["name"], *x["geometry"]["location"].values(), color)
            for x in nearby_place_details.get(name, [])[:max_clamp]
        ]
    )
    places_dfs[name] = pd.DataFrame(rows, columns=["name", "latitude", "longitude", "color"])


fig = go.Figure()

for row in df.iterrows():
    places_coords_df = places_dfs[row[1][LOCATION_COLUMN]]
    trace_name = (
        f"Day {row[1][DAY_COLUMN]}: {row[1][ACTIVITY_TYPE_COLUMN]} at {row[1][LOCATION_COLUMN]}"
    )
    fig.add_trace(
        go.Scattermapbox(
            lat=places_coords_df["latitude"],
            lon=places_coords_df["longitude"],
            mode="markers",
            marker=go.scattermapbox.Marker(
                size=5, color=places_coords_df["color"], colorscale="Rainbow", cmin=0, cmax=1
            ),
            text=places_coords_df["name"],
            name=trace_name,
        )
    )

fig.update_layout(
    autosize=True,
    hovermode="closest",
    mapbox=go.layout.Mapbox(
        accesstoken=os.getenv("MAPBOX_API_KEY"),
        bearing=0,
        center=go.layout.mapbox.Center(lat=city_coordinates[0], lon=city_coordinates[1]),
        pitch=0,
        zoom=12,
    ),
    height=800,
    width=1000,
    title_text=f"Itinerary for `{inputs['group_type']}` in {inputs['location']}",
    title_x=0.5,
    annotations=[
        dict(
            text=(
                f"Nearby {MAX_NEARBY_PLACES} places for "
                f"`{keyword}` near the places for each itinerary"
            ),
            showarrow=False,
            xref="paper",
            yref="paper",
            x=0.5,
            y=0.0,
            xanchor="center",
            yanchor="top",
            font=dict(size=14, color="black"),
        )
    ],
)

fig.show()