In [1]:
#initialize
#     model
#     environment variables

import sys
sys.path.append('..')
from src.utils.llamaindex_retriever import LlamaIndexRetriever
from langchain.vectorstores import FAISS
from langchain.embeddings.azure_openai import AzureOpenAIEmbeddings
from langchain.embeddings.openai import OpenAIEmbeddings
from dotenv import load_dotenv
import os
from langchain.chat_models import AzureChatOpenAI

def load_env_variables(file_path):
    load_dotenv(file_path)
    print("Environment variables loaded successfully!")

env_file_path = "../.env"
load_env_variables = load_env_variables(env_file_path)
max_tokens = 3500
temperature = 0.1

# embeddings = AzureOpenAIEmbeddings(azure_deployment=azure_deployment, openai_api_version=openai_api_version)
embeddings =  AzureOpenAIEmbeddings(
        deployment=os.getenv("EMB_DEPLOYMENT"),
        openai_api_version=os.getenv("EMB_OPENAI_API_VERSION"),
        model=os.getenv("EMB_MODEL"),
        openai_api_key=os.getenv("EMB_OPENAI_API_KEY"),
        openai_api_base=os.getenv("EMB_OPENAI_ENDPOINT"),
        openai_api_type=os.getenv("EMB_API_TYPE"),
    )

llm_gpt = AzureChatOpenAI(deployment_name=os.getenv('AZURE_OPENAI_DEPLOYMENT_NAME'), openai_api_version=os.getenv("OPENAI_API_VERSION"),
                        openai_api_base=os.getenv("OPENAI_API_BASE"), 
                        openai_api_type= os.getenv("OPENAI_API_TYPE"),
                        openai_api_key=os.getenv("OPENAI_API_KEY"),
                        max_tokens=max_tokens,
                        temperature=temperature)

Environment variables loaded successfully!


  warn_deprecated(
  warn_deprecated(


In [2]:
#define utils
from src.utils.cube_semantic_custom import CubeSemanticLoader
def fetch_cube_metadata(*args, **kwargs):
    try:
        # # Load document from Cube meta api
        loader = CubeSemanticLoader(os.getenv("CUBE_API_URL"), os.getenv("CUBE_TOKEN"), False)
        documents = loader.load()
        # to_json()
        return documents
    except Exception as e:
        # Handle exceptions gracefully and return an error response
        print("Error in fetching metadata from cube: " + str(e))
        return 0

def create_vector_store(documents, local_vector_store_path, *args, **kwargs):
    print("Loaded documents: " + str(documents))
    vectorstore = FAISS.from_documents(documents, embeddings)
    vectorstore.save_local(local_vector_store_path)
    print("Vector store created and saved successfully!")

def load_vector_store(vector_store_path, embeddings, *args, **kwargs):
    # Load the vector store from the local file system
    vectorstore = FAISS.load_local(vector_store_path, embeddings, allow_dangerous_deserialization=True)
    print("Vector store loaded successfully!")
    
    return vectorstore

In [3]:
#load existing vector store
vector_store_path = "/Users/k.abhishek/Documents/experiments/metric_store/metric_store_gen_ai/data/vector_store/cube_meta_faiss_index"
vectorstore = load_vector_store(vector_store_path, embeddings)

Vector store loaded successfully!


Tools 

In [4]:
import json
from crewai import Agent, Task, Crew
from langchain.tools import tool
from src.utils.llamaindex_retriever import LlamaIndexRetriever
from typing import Optional, Type
from langchain.callbacks.manager import (
    AsyncCallbackManagerForToolRun,
    CallbackManagerForToolRun,
)
# Import things that are needed generically
from langchain.pydantic_v1 import BaseModel, Field
# from langchain.tools import BaseTool, StructuredTool, tool
from crewai_tools import BaseTool


def get_similar_documents_faiss(query, max_number_documents=3):
  vectorstore = FAISS.load_local(vector_store_path, embeddings, allow_dangerous_deserialization=True)
  docs = vectorstore.similarity_search_with_relevance_scores(query, max_number_documents)
  relevant_documents = []
  for doc in docs:
      doc = doc[0]
      meta = {'text':doc.page_content, 'table_metadata': doc.metadata}
      relevant_documents.append(meta)
  return relevant_documents

def get_similar_documents(query, max_number_documents=3):
    return get_similar_documents_faiss(query, max_number_documents)


class QueryInput(BaseModel):
    query: str = Field(description="should be enquiry query")

class TerminationHandler(CallbackManagerForToolRun):
    pass

class RephraseInputQuery(BaseTool):
    name:str = "rephrase_input_query"
    description :str = "Useful to rephrase the query to capture the intent of the user regarding metric information"
    args_schema: Type[BaseModel] = QueryInput

    def _run(
        self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None
    ) -> str:
        """Use the tool."""
        metric_description = self.rephrase_input_query(query)
        return metric_description

    async def _arun(
        self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None
    ) -> str:
        """Use the tool asynchronously."""
        raise NotImplementedError("custom_search does not support async")
    
    
    def rephrase_input_query(self, query, *args, **kwargs):
        agent = Agent(
                role='Intent Capturer',
                goal=
                'Rephrasing the query to capture the intent of the user regarding metric information',
                backstory=
                "You are an expert to understand the user's intent and rephrase the query to capture the intent of the user accurately.",
                llm = llm_gpt,
                allow_delegation=False)
        task = Task(
                agent=agent,
                description=
                f'Rephrase the query to capture the intent of the user regarding metric information. The query is {query}. Donot add any noise to the response',
                expected_output="some string",
        
            )
        extracted_metrics = task.execute()

        return extracted_metrics
    

    
class MetricDiscovery(BaseTool):
    name :str = "metric_discovery"
    description:str = """Useful for general user questions related to discovery, explaination, description, interpretation of metrics/measures/KPIs, tables or columns."""
    args_schema: Type[BaseModel] = QueryInput

    def _run(
        self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None
    ) -> str:
        """Use the tool."""
        metric_description = self.metric_discovery(query)
        return metric_description

    async def _arun(
        self, query: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None
    ) -> str:
        """Use the tool asynchronously."""
        raise NotImplementedError("custom_search does not support async")
    
    
    def metric_discovery(self, query, *args, **kwargs):
        """Useful for general user questions related to discovery, explaination, description, interpretation of metrics/measures/KPIs, tables or columns."""
        relevant_documents = get_similar_documents(query)
        agent = Agent(
                role='Data Analyst Assistant',
                goal=
                'Empower users to understand and utilize data effectively. This includes helping them discover relevant metrics, interpreting their meaning',
                backstory=
                "The primary purpose is to bridge the gap between raw data and user comprehension, fostering a data-driven culture within the organization.",
                llm = llm_gpt,
                allow_delegation=False)  
        
        task = Task(
                agent=agent,
                description=
                """ You are responding to  question {metric_description} with answer from the Metadata provided to you as {relevant_documents}}.
                    Strictly answer the question with the information present in metadata only.
                    Respond with "Sorry, the query is out of scope." if the answer is not present in metadata and terminate further reasoning and prcoess.
                    """,
                expected_output="some string",
        
            )
        output = task.execute()

        return output




Tasks

def callback_function(output: TaskOutput):
    # Do something after the task is completed
    # Example: Send an email to the manager
    print(f"""
        Task completed!
        Task: {output.description}
        Output: {output.raw_output}
    """)

In [20]:
# def check_termination(task_output):
#     agent = Agent(
#                 role='Determine query completion',
#                 goal=
#                 'Given task output, determine termination of crew workflow',
#                 backstory=
#                 "Given user query request from user determine if no more new task is needed to be executed",
#                 llm = llm_gpt,
#                 allow_delegation=False)
#     task = Task(
#                 agent=agent,
#                 description=
#                 f'Given task output, if the query is completed and the task result determines that the query is out of scope then terminate the workflow or else the required query is satisfied. Following is the task result {task_output}',
#                 expected_output="some string",
        
#             )
#     termination_status = task.execute()
#     return termination_status
class MetricDiscoveryTasks():
  def metric_isolation(self, agent, query):
    return Task(description=(f"""
        Rephrase the query to capture the intent of the user regarding metric information. The query is {query}. Donot add any noise to the response.
        {self.__tip_section()}"""),
      agent=agent,
      expected_output="Reformatted query to capture the intent of the user regarding metric information.",
      # callback=callback_function,
    )
  
  def metric_discovery(self, agent):
    return Task(description=(f"""
        Answer the general user questions related to discovery, explaination, description, interpretation of metrics/measures/KPIs, tables or columns.
        {self.__tip_section()}"""),
      agent=agent,
      expected_output="If relevant metric exists in metadata, provide the answer. Else, respond with 'Sorry, the query is out of scope.' and terminate process.",
      # callback=callback_function,
    )
  def __tip_section(self):
    return "If you do your BEST WORK, I'll give you a $10,000 commission!"
  
  

Agents

In [21]:
class MetricDiscoveryAgent():
  def user_intent_capture(self):
    return Agent(
      role='Intent Capturer',
      goal=
      'Rephrasing the query to capture the intent of the user regarding metric information',
      backstory=
      "You are an expert to understand the user's intent and rephrase the query to capture the intent of the user accurately.",
      verbose=True,
      tools=[
        RephraseInputQuery()
      ],
      llm = llm_gpt,
    )
  def discover_metric_info(self):
    return Agent(
     role='Data Analyst Assistant',
      goal=
      'Empower users to understand and utilize data effectively. This includes helping them discover relevant metrics, interpreting their meaning',
      backstory=
      "The primary purpose is to bridge the gap between raw data and user comprehension, fostering a data-driven culture within the organization.",
      llm = llm_gpt,
      verbose=True,
      tools=[
        MetricDiscovery()
      ],
    )

Crew

In [22]:
def after_task_callback(output):
  # Perform actions after the task, 
  # for example, logging or updating agent state
  
  print(f"Agent completed task with result: {output}")

# Assigning the function to task_callback
task_callback = after_task_callback

In [23]:
def test_callback():
        # Accessing local variables
        print("Local variables:", locals())
        
        # Accessing global variables
        print("Global variables:", globals())    
        
        # Accessing built-in variables
        print("Built-in variables:", dir(__builtins__))
        after_task_callback("output")
  

In [34]:
from abc import ABC, abstractmethod
class Expert(ABC):
    name: str = ""
    description: str = ""
    public_description: str = ""
    arg_description: str = "The argument to the function."

    @abstractmethod
    def call(
        self,
        goal: str,
        task: str,
        input_str: str,
    ) -> str:
        pass

In [37]:
from crewai.process import Process
from typing import Any
class MetricDiscoveryInputCrew(Expert):
  name = "MetricDiscovery"
  description = (
       "Use this to answer the general user questions related to discovery, explaination, description, interpretation of metrics/measures/KPIs, tables or columns."
    )
  public_description = "Metric discovery crew"
  arg_description = "Query related to metrics."

  def __init__(self, query):
    self.query = query
    self.crew_process = self.crew_process()
    # self.llm = llm_gpt
  def crew_process(self):
    agents = MetricDiscoveryAgent()
    tasks = MetricDiscoveryTasks()
    # print(agents)
    # print(tasks)
    user_intent_capture_agent = agents.user_intent_capture()
    discover_metric_info_agent = agents.discover_metric_info()
    # print(metric_isolator_agent)
    # metric_isolator_task = tasks.metric_isolation(metric_isolator_agent, self.query)
    metric_isolator_task = tasks.metric_isolation(user_intent_capture_agent, self.query)
    metric_discover_task = tasks.metric_discovery(discover_metric_info_agent)
    
    # print("Metric_isolator_task", metric_isolator_task)
    crew = Crew(
      agents=[
        user_intent_capture_agent,
        discover_metric_info_agent,
      ],
      tasks=[
        metric_isolator_task,
        metric_discover_task
      ],
      verbose=False,
      process=Process.sequential,
      step_callback=test_callback()
    )
    return crew

  def run(self):
    result = self.crew_process.kickoff()
    return result
  
  def call(
        self, goal: str, task: str, input_str: str, *args: Any, **kwargs: Any
    ) -> str:
    pass

In [36]:
query = "What is the most popular feature used by our paid subscribers"
# formatted_query = input(
#     dedent("""
#       {What is the most popular feature used by our paid subscribers}
#     """))
# print(formatted_query)
crew = MetricDiscoveryInputCrew(query)
result = crew.run()



Local variables: {}
Global variables: {'__name__': '__main__', '__doc__': 'Automatically created module for IPython interactive environment', '__package__': None, '__loader__': None, '__spec__': None, '__builtin__': <module 'builtins' (built-in)>, '__builtins__': <module 'builtins' (built-in)>, '_ih': ['', '#initialize\n#     model\n#     environment variables\n\nimport sys\nsys.path.append(\'..\')\nfrom src.utils.llamaindex_retriever import LlamaIndexRetriever\nfrom langchain.vectorstores import FAISS\nfrom langchain.embeddings.azure_openai import AzureOpenAIEmbeddings\nfrom langchain.embeddings.openai import OpenAIEmbeddings\nfrom dotenv import load_dotenv\nimport os\nfrom langchain.chat_models import AzureChatOpenAI\n\ndef load_env_variables(file_path):\n    load_dotenv(file_path)\n    print("Environment variables loaded successfully!")\n\nenv_file_path = "../.env"\nload_env_variables = load_env_variables(env_file_path)\nmax_tokens = 3500\ntemperature = 0.1\n\n# embeddings = AzureOp

AttributeError: 'NoneType' object has no attribute 'kickoff'

In [65]:
from langchain import PromptTemplate
# from langchain.prompts import ChatPromptTemplate, SystemMessagePromptTemplate
# Define the prompt template
prompt_create_objectives = PromptTemplate(
    input_variables=["goal", "chat_history"],
    template="""
You are an intelligent assistant that helps users break down their goals into high-level, smaller objectives. The system you are part of serves as an interface between users and a database containing various metrics. You have access to metadata of these metrics, including table columns information.
Goal: {goal}
{chat_history}
Using the provided goal and any relevant chat history, break down the goal into high-level, smaller objectives whose intents are different. Make sure each objective is clear and distinct.
) """)

goal ="Can you show me the average user signup rate for the past month, segmented by device type?"
chat_history = ""

prompt = prompt_create_objectives.format_prompt(
                goal=goal,
                chat_history=chat_history)


In [67]:
str(prompt)

"text='\\nYou are an intelligent assistant that helps users break down their goals into high-level, smaller objectives. The system you are part of serves as an interface between users and a database containing various metrics. You have access to metadata of these metrics, including table columns information.\\nGoal: Can you show me the average user signup rate for the past month, segmented by device type?\\n\\nUsing the provided goal and any relevant chat history, break down the goal into high-level, smaller objectives whose intents are different. Make sure each objective is clear and distinct.\\n) '"

In [68]:

response = llm_gpt.invoke(str(prompt))


In [72]:
response = response.content

In [73]:
response

"To achieve the goal of showing the average user signup rate for the past month, segmented by device type, we can break down the task into the following high-level, smaller objectives:\n\n1. **Data Retrieval Objective**: Access the database to retrieve user signup data for the past month. This involves:\n   - Identifying the correct database and table(s) that contain user signup information.\n   - Determining the relevant columns that store signup dates and device types.\n   - Constructing and executing a query to fetch the data within the date range of the past month.\n\n2. **Data Segmentation Objective**: Segment the retrieved data by device type. This requires:\n   - Grouping the signup data based on the device type column.\n   - Ensuring that each group represents a unique device type.\n\n3. **Calculation Objective**: Calculate the average signup rate for each device type. This involves:\n   - Counting the number of signups for each device type.\n   - Calculating the total number o

In [77]:


# Define the prompt template
prompt_template = PromptTemplate.from_template("""
You are an intelligent assistant that breaks down user's query {query} into high-level, smaller objectives. The system you are part of serves as an interface between users and a database containing various metrics/KPIs/measures.
Each smaller objective should strictly fall into one of the following intent:
* **Discover Metrics:** If you'd like to measure something specific, I can identify relevant metrics. For example, are you interested in suggest some metrics based on objective?
* **Fetch Data:** If you already know the metric you're interested in, I can retrieve the data from the database.
* **Interpret Metrics:** Once you have the data, I can explain what it means in the context of your objective. 
* **Understand Metrics:** If you're unsure about a metric's definition or purpose, I can provide more information.

However, if the query involves something else entirely, like action outside current intents, give empty list.""")


def complete(prompt, user_query):
  """
  Completes the prompt with the user objective.

  Args:
      prompt: The prompt template.
      user_objective: The user's objective.

  Returns:
      The completed prompt.
  """
  return prompt.format(query=user_query)

# Example usage
user_objective = "CPU utilization"
completed_prompt = complete(prompt_template, user_objective)

# Use the completed prompt with the AzureChatOpenAI model
response = llm_gpt.invoke(completed_prompt)

print(response)


content="Based on your query regarding CPU utilization, here are the high-level, smaller objectives broken down by intent:\n\n**Discover Metrics:**\n- Identify metrics that can provide insights into CPU utilization, such as average CPU load, peak CPU usage, and CPU idle time.\n\n**Fetch Data:**\n- Retrieve the latest data on average CPU load from the database.\n- Obtain historical data on peak CPU usage over the last month.\n- Access information on CPU idle time percentages during off-peak hours.\n\n**Interpret Metrics:**\n- Analyze the trend of CPU utilization over time to understand system performance.\n- Compare CPU usage during peak and off-peak hours to identify potential bottlenecks.\n- Evaluate the correlation between CPU idle time and system efficiency.\n\n**Understand Metrics:**\n- Explain the significance of average CPU load as a measure of system performance.\n- Clarify what peak CPU usage indicates about the system's capacity.\n- Describe how CPU idle time can reflect on sy

In [46]:
#Available experts
from typing import Any, Dict, List, Optional, Union
from typing import Type, TypedDict

class ExpertDescription(TypedDict):
    """Representation of a callable expert"""
    name: str
    """The name of the expert."""
    description: str
    """A description of the expert."""
    parameters: dict[str, object]
    """The parameters of the expert."""

def get_expert_function(expert: Type[Expert]) -> ExpertDescription:
    """A function that will return the tool's function specification"""
    name = get_expert_name(expert)
    return {
        "name": name,
        "description": expert.description,
        "parameters": {
            "type": "object",
            "properties": {
                "reasoning": {
                    "type": "string",
                    "description": (
                        f"Reasoning is how the task will be accomplished with the current expert. "
                        "Detail your overall plan along with any concerns you have."
                        "Ensure this reasoning value is in the user defined langauge "
                    ),
                },
                "arg": {
                    "type": "string",
                    "description": expert.arg_description,
                },
            },
            "required": ["reasoning", "arg"],
        },
    }

def get_expert_name(expert: Type[Expert]) -> str:
    return format_expert_name(expert.name)

def format_expert_name(expert_name: str) -> str:
    return expert_name.lower()

def get_available_experts() -> List[Type[Expert]]:
    return [
        MetricDiscoveryInputCrew
    ]

In [47]:
available_experts = get_available_experts()
experts = list(map(get_expert_function, available_experts))
expert_descriptions = {expert['name'] : expert['description'] for expert in experts}

In [48]:
expert_descriptions

{'metricdiscovery': 'Use this to answer the general user questions related to discovery, explaination, description, interpretation of metrics/measures/KPIs, tables or columns.'}

In [33]:
experts

[{'name': 'tool',
  'description': 'Use this to answer the general user questions related to discovery, explaination, description, interpretation of metrics/measures/KPIs, tables or columns.',
  'parameters': {'type': 'object',
   'properties': {'reasoning': {'type': 'string',
     'description': 'Reasoning is how the task will be accomplished with the current expert. Detail your overall plan along with any concerns you have.Ensure this reasoning value is in the user defined langauge '},
    'arg': {'type': 'string',
     'description': 'Query for metrics to be processed by the crew.'}},
   'required': ['reasoning', 'arg']}}]