In [1]:
from dotenv import load_dotenv
from crewai import Crew
from textwrap import dedent
from datetime import date
import json
import os
import requests
from crewai import Agent, Task
from langchain.llms import OpenAI
from langchain_openai import ChatOpenAI
from langchain.tools import tool
import gradio as gr

* 'allow_population_by_field_name' has been renamed to 'populate_by_name'
* 'smart_union' has been removed


In [2]:
# Load environment variables.
load_dotenv()
# Set up API keys
SERPER_API_KEY = os.getenv("SERPER_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
print(SERPER_API_KEY)
print(OPENAI_API_KEY)

OPENAI_MODEL = "gpt-3.5-turbo"


None
None


In [3]:
# SearchTools class with search_internet method
class SearchTools:
    def search_internet(self, query):
        """Search the internet and return relevant results"""
        top_result_to_return = 4
        url = "https://google.serper.dev/search"
        payload = json.dumps({"q": query})
        headers = {
            'X-API-KEY': SERPER_API_KEY,
            'content-type': 'application/json'
        }
        response = requests.post(url, headers=headers, data=payload)

        if response.status_code != 200:
            return {"error": "Error occurred while searching the internet."}

        try:
            results = response.json().get('organic', [])
        except KeyError:
            return {"error": "No relevant search results found."}

        if not results:
            return {"error": "Sorry, no relevant results were found for this search."}

        output = []
        for result in results[:top_result_to_return]:
            output.append({
                "title": result.get('title', 'No title available'),
                "link": result.get('link', 'No link available'),
                "snippet": result.get('snippet', 'No snippet available')
            })

        return {"results": output}

In [4]:
# Define agent classes that execute tasks using the tools provided
class CaviarExpert:
    def __init__(self, search_tool):
        self.search_tool = search_tool

    def recommend_caviar(self, taste_profile, texture_preference, budget):
        query = f"Luxury caviar suppliers for taste profile '{taste_profile}' and texture '{texture_preference}' under budget {budget} USD."
        result = self.search_tool.search_internet(query)
        return result

class Sommelier:
    def __init__(self, search_tool):
        self.search_tool = search_tool

    def recommend_champagne(self, taste_profile, texture_preference):
        query = f"Best luxury champagne pairing for caviar with taste profile '{taste_profile}' and texture '{texture_preference}'."
        result = self.search_tool.search_internet(query)
        return result

class ContentCurator:
    def __init__(self, search_tool):
        self.search_tool = search_tool

    def curate_content(self):
        query = "Luxury dining trends for caviar and champagne pairings."
        result = self.search_tool.search_internet(query)
        return result

In [5]:
def format_results(results, limit=2):
    """Format the search results into a readable text, showing only the titles."""
    if 'results' in results:
        formatted_output = []
        for result in results['results'][:limit]:  # Limit the number of results
            title = result.get('title', 'No title available')
            formatted_output.append(f"• {title}")
        return "\n".join(formatted_output)
    else:
        return "No results available."

In [6]:
class CaviarCrewSystem:
    def __init__(self, taste_profile, texture_preference, budget):
        self.taste_profile = taste_profile
        self.texture_preference = texture_preference
        self.budget = budget

        # Instantiate the search tool and agents
        self.search_tool = SearchTools()
        self.caviar_expert = CaviarExpert(self.search_tool)
        self.sommelier = Sommelier(self.search_tool)
        self.content_curator = ContentCurator(self.search_tool)

    def run(self):
        try:
            # Execute the tasks using agents
            print("Requesting caviar recommendations...")
            caviar_recommendation = self.caviar_expert.recommend_caviar(
                self.taste_profile, self.texture_preference, self.budget
            )
            formatted_caviar_recommendation = format_results(caviar_recommendation)

            print("Requesting champagne recommendations...")
            champagne_recommendation = self.sommelier.recommend_champagne(
                self.taste_profile, self.texture_preference
            )
            formatted_champagne_recommendation = format_results(champagne_recommendation)

            print("Requesting luxury dining content...")
            luxury_content = self.content_curator.curate_content()
            formatted_luxury_content = format_results(luxury_content)

            # Format the response based on agent results
            response = f"""
            Your Imperial Highness,

            Based on your exquisite preferences, I am honored to present our recommendation:

            🥄 Caviar Selection:
            {formatted_caviar_recommendation}

            🍾 Champagne Pairing:
            {formatted_champagne_recommendation}

            📜 Exclusive Insights:
            {formatted_luxury_content}

            May this experience elevate your senses and leave an indelible mark on your palate.

            Bon appétit, Your Highness.
            """

            # Log the full response for debugging
            print(f"Final Response: {response}")
            return response

        except Exception as e:
            return f"An error occurred: {e}"

In [7]:
# Gradio interface to interact with the CaviarCrewSystem class
def caviar_coach(taste_profile, texture_preference, budget):
    caviar_crew = CaviarCrewSystem(taste_profile, texture_preference, int(budget))
    final_response = caviar_crew.run()
    
    # Log the response passed to Gradio for debugging
    print(f"Response Passed to Gradio: {final_response}")
    
    return final_response

In [8]:
# Create Gradio Interface
iface = gr.Interface(
    fn=caviar_coach,
    inputs=[
        gr.Dropdown(["Mild & Delicate", "Rich & Buttery", "Bold & Intense"], label="Preferred Taste Profile"),
        gr.Dropdown(["Soft & Creamy", "Firm & Distinct"], label="Preferred Texture"),
        gr.Slider(minimum=50, maximum=1000, step=50, label="Budget (USD)")
    ],
    outputs=gr.Textbox(label="Your Royal Recommendation"),
    title="🍾 Imperial Caviar Connoisseur 🥂",
    description="Your Imperial Highness, please share your preferences for an exquisite caviar experience."
)

In [9]:

if __name__ == "__main__":
    iface.launch()

Running on local URL:  http://127.0.0.1:7861

To create a public link, set `share=True` in `launch()`.
