In [1]:
import pandas as pd
import re
import yaml
import sqlparse
import os
import pandas as pd
import numpy as np
import requests
from IPython.display import display, Markdown

from langchain.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI

In [2]:
def add_repo_root_path():
    import os
    import sys
    repo_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
    if repo_root not in sys.path:
        sys.path.append(repo_root)
        
add_repo_root_path()
from src import generate_knowledge
from src import create_rag_db
from src import llm_chain_tools
from src.enhanced_retriever import EnhancedRetriever

In [3]:
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 50)
#pd.set_option('display.width', None)
#pd.set_option('display.max_colwidth', 10) 

### INIT

In [4]:
generate_knowledge.add_repo_root_path()
import openai_setup

OPENAI_API_KEY = openai_setup.conf['key']
OPENAI_PROJECT = openai_setup.conf['project']
OPENAI_ORGANIZATION = openai_setup.conf['organization']
DEFAULT_LLM_MODEL = "gpt-4o-mini"
CHROMADB_DIRECTORY = '../chromadb'
COLLECTION_NAME = "my_chromadb" 

import os
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ['OPENAI_MODEL_NAME'] = DEFAULT_LLM_MODEL

In [5]:
langchain_openai_embeddings = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY, model="text-embedding-ada-002")
langchain_openai_llm = ChatOpenAI(model=DEFAULT_LLM_MODEL, temperature=0.1, openai_api_key=OPENAI_API_KEY, openai_organization = OPENAI_ORGANIZATION)

In [6]:
def update_tasks_and_agents_config(files):
    # Load configurations from YAML files
    configs = {}
    for config_type, file_path in files.items():
        with open(file_path, 'r') as file:
            configs[config_type] = yaml.safe_load(file)

    # Assign loaded configurations to specific variables
    agents_config = configs['agents']
    tasks_config = configs['tasks']

    print(agents_config)
    print(tasks_config)
    return agents_config, tasks_config

files = {
    'agents': '../config/agents.yml',
    'tasks': '../config/tasks.yml'
}
agents_config, tasks_config = update_tasks_and_agents_config(files)

{'interpretation_agent': {'role': 'Request Interpreter\n', 'goal': 'Interpret user requests related to dbt projects and translate them into actionable decisions. Use expertise in dbt, data modeling, and analytics engineering to determine the type of action required.\n', 'backstory': "You specialize in analyzing requests to identify whether the action involves adding a field, modifying an existing model, or retrieving specific information. Your goal is to provide concise and actionable outputs tailored to the user's needs.\n", 'verbose': True, 'allow_delegation': False}, 'evaluation_agent': {'role': 'Evaluation Specialist\n', 'goal': 'Evaluate user requests related to dbt projects and provide concise, actionable insights and steps required to address the request. Leverage expertise in data modeling, dbt project structure, and dependency analysis to ensure accurate evaluations.\n', 'backstory': 'You specialize in analyzing interpreted requests and breaking them down into specific, action

In [7]:
from crewai import Agent, Task, Crew

### TESTS

#### Agents

In [8]:

# Creating Agents
interpretation_agent = Agent(
  config=agents_config['interpretation_agent'],
)

evaluation_agent = Agent(
  config=agents_config['evaluation_agent'],
)

lineage_agent = Agent(
  config=agents_config['lineage_agent'],
)

plan_agent = Agent(
  config=agents_config['plan_agent'],
)

In [9]:
# Creating Tasks
interpretation_task = Task(
  config=tasks_config['interpretation_task'],
  agent=interpretation_agent
)

evaluation_task = Task(
  config=tasks_config['evaluation_task'],
  agent=evaluation_agent
)

lineage_task = Task(
  config=tasks_config['lineage_task'],
  agent=lineage_agent
)

plan_task = Task(
  config=tasks_config['plan_task'],
  agent=plan_agent
)

In [10]:
crew = Crew(
  agents=[
    interpretation_agent,
    evaluation_agent
  ],
  tasks=[
    interpretation_task,
    evaluation_task
  ],
  verbose=True
)

In [None]:
user_input = 'Give me all the information about the models related with customers'

inputs = {
  'request': user_input
}

# Run the crew
result = crew.kickoff(
  inputs=inputs
)

#### Flows

In [17]:
import nest_asyncio
nest_asyncio.apply()

In [12]:
from crewai import Flow
from crewai.flow.flow import listen, start

class dbtChatFlow(Flow):
    @start()
    def interpret_prompt(self):
        user_prompt = self.state["user_input"]
        print(user_prompt)
        interpretation_result = crew.kickoff(inputs = {'request': user_prompt} )
        self.state["interpretation_result"] = interpretation_result
        return interpretation_result

    @listen(lambda state: "interpretation_result" in state)
    def evaluate_interpretation(self):
        interpretation_result = self.state.get("interpretation_result")
        evaluation_result = crew.agents[1].kickoff({"request": interpretation_result})
        self.state["evaluation_result"] = evaluation_result
        return evaluation_result

flow = dbtChatFlow()
#flow.plot()

In [None]:
user_input = 'Give me all the information about the models related with customers'
flow.kickoff(inputs={"user_input": user_input})

### CREATE AGENT CHAIN

#### Configure Prerequisites

In [None]:
from langchain_openai import ChatOpenAI

loaded_vectorstore = Chroma(
    collection_name=COLLECTION_NAME,
    persist_directory=CHROMADB_DIRECTORY,
    embedding_function=langchain_openai_embeddings
)

In [10]:
_, repo_name = generate_knowledge.extract_owner_and_repo('https://github.com/dbt-labs/jaffle-shop')
dbt_models_df = pd.read_csv('../data/dbt_models_' + repo_name + '.csv')
dbt_project_df = pd.read_csv('../data/dbt_project_' + repo_name + '.csv')
dbt_repo_knowledge_df = create_rag_db.merge_dbt_models_and_project_dfs(dbt_models_df, dbt_project_df)

In [11]:
retriever = EnhancedRetriever(vectorstore = loaded_vectorstore, embedding_function= langchain_openai_embeddings)

query = "give me all the models related with the dbt model orders"
final_context, top_documents = retriever.retrieve(query)

#### Create agents, tasks and flow

In [12]:
# Creating Agents
interpretation_agent = Agent(
  config=agents_config['interpretation_agent'],
)

evaluation_agent = Agent(
  config=agents_config['evaluation_agent'],
)

lineage_agent = Agent(
  config=agents_config['lineage_agent'],
)

plan_agent = Agent(
  config=agents_config['plan_agent'],
)

In [13]:
# Creating Tasks
interpretation_task = Task(
  config=tasks_config['interpretation_task'],
  agent=interpretation_agent
)

evaluation_task = Task(
  config=tasks_config['evaluation_task'],
  agent=evaluation_agent
)

lineage_task = Task(
  config=tasks_config['lineage_task'],
  agent=lineage_agent
)

plan_task = Task(
  config=tasks_config['plan_task'],
  agent=plan_agent
)

In [None]:
interpretation_crew = Crew(agents = [interpretation_agent], tasks = [interpretation_task], verbose = True)
evaluation_crew = Crew(agents = [evaluation_agent], tasks = [evaluation_task], verbose = True)
lineage_crew = Crew(agents = [lineage_agent], tasks = [lineage_task], verbose = True)
plan_crew = Crew(agents = [plan_agent], tasks = [plan_task], verbose = True)

In [None]:
import nest_asyncio
nest_asyncio.apply()

from crewai import Flow
from crewai.flow.flow import listen, start

class dbtChatFlow(Flow):
    @start()
    def interpret_prompt(self):
        request = self.state["request"]
        interpretation = interpretation_crew.kickoff(inputs = {'request': request})
        self.state["interpretation"] = interpretation
        return interpretation

    @listen(interpret_prompt)
    def evaluate_interpretation(self):
        request = self.state["request"]
        interpretation = self.state.get("interpretation")
        evaluation = evaluation_crew.kickoff(inputs = {'request': request, "interpretation": interpretation})
        self.state["evaluation"] = evaluation
        return evaluation
    
    @listen(evaluate_interpretation)
    def retrieve_general_context_for_lineage_calculation(self):
        request = self.state["request"]
        interpretation = self.state.get("interpretation")
        vectorstore = self.state["vectorstore"]
        embedding_function = self.state["embedding_function"]
        retriever = EnhancedRetriever(vectorstore = vectorstore, embedding_function= embedding_function)
        retriever_input = """
            USER REQUEST: {request}
            REQUEST FINALITY: {interpretation}
        """
        retrieved_context, retrieved_documents = retriever.retrieve(retriever_input)
        retrieved_context = "\n".join([doc.page_content for doc in retrieved_documents if hasattr(doc, 'page_content')])
        self.state["retrieved_context"] = retrieved_context
        return retrieved_context

    @listen(retrieve_general_context_for_lineage_calculation)
    def get_lineage(self):
        request = self.state["request"]
        evaluation = self.state.get("evaluation")
        
        retrieved_context = self.state.get("retrieved_context")
        lineage_analysis = lineage_crew.kickoff(inputs = {'request': request, 'evaluation': str(evaluation), 'retrieved_context':retrieved_context})
        json_output = lineage_analysis.raw.replace("```json", "").replace("```", "").strip()
        self.state["lineage_analysis"] = eval(json_output)
        return eval(json_output)
    
    @listen(get_lineage)
    def get_lineage_documents(self):
        lineage_analysis = self.state.get("lineage_analysis")
        vectorstore = self.state["vectorstore"]
        dbt_repo_knowledge_df = self.state["dbt_repo_knowledge_df"]

        model_name = lineage_analysis.get("model")
        scope = lineage_analysis.get("scope", "").upper()

        lineage_df = create_rag_db.plot_dbt_lineage(dbt_repo_knowledge_df)
        affected_models = llm_chain_tools.get_affected_models(lineage_df, model_name)

        if scope == "UP":
            filtered_models = affected_models["upstream"]
        elif scope == "DOWN":
            filtered_models = affected_models["downstream"]
        elif scope == "ALL":
            filtered_models = affected_models["upstream"] + affected_models["downstream"]
        filtered_models = list(set(f"{model}.sql" for model in filtered_models + [model_name]))
        
        documents = llm_chain_tools.extract_documents_from_vectorstore(vectorstore)
        lineage_documents = llm_chain_tools.select_documents(documents, filtered_models)
        self.state["lineage_documents"] = lineage_documents
        return lineage_documents

    @listen(get_lineage_documents)
    def retrieve_lineage_context(self):
        lineage_documents = self.state.get("lineage_documents")
        embedding_function = self.state["embedding_function"]
        request = self.state["request"]
        interpretation = self.state.get("interpretation")
        evaluation = self.state.get("evaluation")

        retriever_documents = lineage_documents["retriever_documents"]
        csv_sources_documents = lineage_documents["csv_sources_documents"]
        yml_project_documents = lineage_documents["yml_project_documents"]

        # Create a new vectorstore with the filtered documents
        new_vectorstore = Chroma.from_documents(retriever_documents, embedding_function)
        
        # Adjusted retriever
        new_retriever = EnhancedRetriever(vectorstore = new_vectorstore, embedding_function = embedding_function)
        retriever_input = """
            USER REQUEST: {request}
            REQUEST FINALITY: {interpretation}
            DBT EXPERT DEEP EVALUATION: {evaluation}
        """
        retrieved_context, retrieved_documents = new_retriever.retrieve(retriever_input)
        combined_documents =  yml_project_documents + retrieved_documents

        retrieved_context = "\n".join([doc.page_content for doc in combined_documents if hasattr(doc, 'page_content')])
        retrieved_csv_sources_context = "\n".join([doc.page_content for doc in csv_sources_documents if hasattr(doc, 'page_content')])

        self.state["planning_retrieved_context"] = retrieved_context
        self.state["planning_retrieved_csv_sources_context"] = retrieved_csv_sources_context
        return retrieved_context
    
    @listen(retrieve_lineage_context)
    def plan_changes(self):
        request = self.state["request"]
        evaluation = self.state.get("evaluation")
        lineage_analysis = self.state.get("lineage_analysis")
        planning_retrieved_context = self.state.get("planning_retrieved_context")
        planning_retrieved_csv_sources_context = self.state.get("planning_retrieved_csv_sources_context")

        plan = plan_crew.kickoff(inputs = {'request': request, "evaluation": str(evaluation), "lineage_analysis": str(lineage_analysis), "retrieved_context": planning_retrieved_context,  "retrieved_csv_sources_context":planning_retrieved_csv_sources_context})
        self.state["plan"] = plan
        return plan

flow = dbtChatFlow()
flow.plot()

In [None]:
user_input = 'Give me all the information about the models related with customers'
result = flow.kickoff(inputs={"request": user_input, "dbt_repo_knowledge_df": dbt_repo_knowledge_df, "vectorstore": loaded_vectorstore, "embedding_function":langchain_openai_embeddings})

In [None]:
Markdown(result.raw)

### DEVELOP CHAIN

In [8]:
from langchain_openai import ChatOpenAI

loaded_vectorstore = Chroma(
    collection_name=COLLECTION_NAME,
    persist_directory=CHROMADB_DIRECTORY,
    embedding_function=langchain_openai_embeddings
)

retriever = EnhancedRetriever(vectorstore = loaded_vectorstore, embedding_function= langchain_openai_embeddings)

_, repo_name = generate_knowledge.extract_owner_and_repo('https://github.com/dbt-labs/jaffle-shop')
dbt_models_df = pd.read_csv('../data/dbt_models_' + repo_name + '.csv')
dbt_project_df = pd.read_csv('../data/dbt_project_' + repo_name + '.csv')
dbt_repo_knowledge_df = create_rag_db.merge_dbt_models_and_project_dfs(dbt_models_df, dbt_project_df)

  loaded_vectorstore = Chroma(


#### Agents, tasks and crews

In [28]:
agents_config, tasks_config = update_tasks_and_agents_config(files)

{'interpretation_agent': {'role': 'Request Interpreter\n', 'goal': 'Interpret user requests related to dbt projects and translate them into actionable decisions. Use expertise in dbt, data modeling, and analytics engineering to determine the type of action required.\n', 'backstory': "You specialize in analyzing requests to identify whether the action involves adding a field, modifying an existing model, or retrieving specific information. Your goal is to provide concise and actionable outputs tailored to the user's needs.\n", 'verbose': True, 'allow_delegation': False}, 'evaluation_agent': {'role': 'Evaluation Specialist\n', 'goal': 'Evaluate user requests related to dbt projects and provide concise, actionable insights and steps required to address the request. Leverage expertise in data modeling, dbt project structure, and dependency analysis to ensure accurate evaluations.\n', 'backstory': 'You specialize in analyzing interpreted requests and breaking them down into specific, action

In [29]:
# Creating Agents
check_model_agent = Agent(
  config=agents_config['check_model_agent'],
)

search_model_agent = Agent(
  config=agents_config['search_model_agent'],
)

interpretation_agent = Agent(
  config=agents_config['interpretation_agent'],
)

generate_info_report_agent = Agent(
  config=agents_config['generate_info_report_agent'],
)

search_involved_models_agent = Agent(
  config=agents_config['search_involved_models_agent'],
)

solution_design_agent = Agent(
  config=agents_config['solution_design_agent'],
)

concilation_and_testing_agent = Agent(
  config=agents_config['concilation_and_testing_agent'],
)

In [30]:
# Creating Tasks
check_model_task = Task(
  config=tasks_config['check_model_task'],
  agent=check_model_agent
)

search_model_task = Task(
  config=tasks_config['search_model_task'],
  agent=search_model_agent
)

interpretation_task = Task(
  config=tasks_config['interpretation_task'],
  agent=interpretation_agent
)

generate_info_report_task = Task(
  config=tasks_config['generate_info_report_task'],
  agent=generate_info_report_agent
)

search_models_impacted_task = Task(
  config=tasks_config['search_models_impacted_task'],
  agent=generate_info_report_agent
)

search_models_needed_task = Task(
  config=tasks_config['search_models_needed_task'],
  agent=generate_info_report_agent
)

solution_design_task = Task(
  config=tasks_config['solution_design_task'],
  agent=solution_design_agent
)

solution_design_models_impacted_task = Task(
  config=tasks_config['solution_design_models_impacted_task'],
  agent=solution_design_agent
)

concilation_and_testing_task = Task(
  config=tasks_config['concilation_and_testing_task'],
  agent=concilation_and_testing_agent
)

In [31]:
check_model_crew = Crew(agents = [check_model_agent], tasks = [check_model_task], verbose = True)
search_model_crew = Crew(agents = [search_model_agent], tasks = [search_model_task], verbose = True)
interpretation_crew = Crew(agents = [interpretation_agent], tasks = [interpretation_task], verbose = True)

generate_info_report_crew = Crew(agents = [generate_info_report_agent], tasks = [generate_info_report_task], verbose = True)

search_models_impacted_task_crew = Crew(agents = [search_involved_models_agent], tasks = [search_models_impacted_task], verbose = True)
search_models_needed_task_crew = Crew(agents = [search_involved_models_agent], tasks = [search_models_needed_task], verbose = True)

solution_design_crew = Crew(agents = [solution_design_agent], tasks = [solution_design_task], verbose = True)
solution_design_models_impacted_crew = Crew(agents = [solution_design_agent], tasks = [solution_design_models_impacted_task], verbose = True)

concilation_and_testing_crew = Crew(agents = [concilation_and_testing_agent], tasks = [concilation_and_testing_task], verbose = True)

Overriding of current TracerProvider is not allowed
Overriding of current TracerProvider is not allowed
Overriding of current TracerProvider is not allowed
Overriding of current TracerProvider is not allowed
Overriding of current TracerProvider is not allowed
Overriding of current TracerProvider is not allowed
Overriding of current TracerProvider is not allowed
Overriding of current TracerProvider is not allowed
Overriding of current TracerProvider is not allowed


#### Flow

In [41]:
import nest_asyncio
nest_asyncio.apply()

from crewai import Flow
from crewai.flow.flow import listen, start, and_, or_, router

import importlib
import src.enhanced_retriever
importlib.reload(src.enhanced_retriever)
from src.enhanced_retriever import EnhancedRetriever 


class dbtChatFlow(Flow):

    @start()
    def check_model(self):
        request = self.state["request"]
        dbt_repo_knowledge_df = self.state["dbt_repo_knowledge_df"]

        lineage_df = create_rag_db.calculate_dbt_lineage(dbt_repo_knowledge_df)
        check_model_ouput = check_model_crew.kickoff(inputs = {"request": request, "lineage": str(lineage_df)})
        check_model_ouput_json =  eval(check_model_ouput.raw.replace("```json", "").replace("```", "").strip())
        
        self.state["check_model_ouput"] =check_model_ouput_json
        return check_model_ouput_json

    @listen(check_model)
    def retrieve_search_models(self, check_model_ouput_json):
        dbt_repo_knowledge_df = self.state["dbt_repo_knowledge_df"]
        vectorstore = self.state["vectorstore"]

        documents = llm_chain_tools.extract_documents_from_vectorstore(vectorstore)

        if not isinstance(check_model_ouput_json['identified_model'], list):
            identified_models = [check_model_ouput_json['identified_model']]
        identified_model_names = list(set(f"{model}.sql" for model in identified_models))
        identified_model_documents = [
            doc for doc in documents
            if hasattr(doc, 'metadata') and doc.metadata.get("name") in identified_model_names
        ]

        lineage_df = create_rag_db.calculate_dbt_lineage(dbt_repo_knowledge_df)
        identified_model_lineage = llm_chain_tools.get_affected_models(lineage_df, check_model_ouput_json['identified_model'])

        self.state["identified_model_documents"] =identified_model_documents
        return identified_model_names, identified_model_lineage, identified_model_documents

    @listen(retrieve_search_models)
    def search_model(self, retrieved_search_models):
        identified_model_names, identified_model_lineage, identified_model_documents = retrieved_search_models
        request = self.state["request"]
        
        search_impacted_models_ouput = search_model_crew.kickoff(
            inputs={
                "request": request,
                "lineage": str(identified_model_lineage),
                "impacted_models": identified_model_names,
                "impacted_models_documents": str(identified_model_documents)
            }
        )
        
        self.state["search_impacted_models_ouput"] = search_impacted_models_ouput
        return search_impacted_models_ouput
    
    @listen(search_model)
    def interpret_prompt(self):
        request = self.state["request"]

        interpretation = interpretation_crew.kickoff(inputs = {'request': request})
        self.state["interpretation"] = interpretation
        return interpretation

    @router(interpret_prompt)
    def select_required_ouput(self, interpretation):
        if interpretation.raw == 'RETRIEVE_INFO':
            return 'info'
        else:
            return 'code'

    @listen('info')
    def generate_info_report(self, search_impacted_models_ouput):
        request = self.state["request"]
        identified_model_documents = self.state["identified_model_documents"]
        
        generate_info_report_ouput = generate_info_report_crew.kickoff(
            inputs={
                "request": request,
                "search_impacted_models_ouput": str(search_impacted_models_ouput),
                "impacted_models_documents": str(identified_model_documents)
            }
        )

        self.state["generate_info_report_ouput"] = generate_info_report_ouput
        return generate_info_report_ouput

    @listen('code')
    def search_needed_models_for_change(self, search_impacted_models_ouput):
        request = self.state["request"]
        dbt_repo_knowledge_df = self.state["dbt_repo_knowledge_df"]
        check_model_ouput_json = self.state["check_model_ouput"]

        lineage_df = create_rag_db.calculate_dbt_lineage(dbt_repo_knowledge_df)

        search_needed_models_for_change_ouput = search_models_needed_task_crew.kickoff(
            inputs={
                "request": request,
                "identified_model": str(check_model_ouput_json['identified_model']),
                "search_impacted_models_ouput": str(search_impacted_models_ouput),
                "lineage_df": str(lineage_df)
            }
        )
        search_needed_models_for_change_ouput_json =  eval(search_needed_models_for_change_ouput.raw.replace("```json", "").replace("```", "").strip())
        self.state["search_needed_models_for_change_ouput"] = search_needed_models_for_change_ouput_json
        return search_needed_models_for_change_ouput_json
    
    @listen('code')
    def search_models_impacted_by_change(self, search_impacted_models_ouput):
        request = self.state["request"]
        dbt_repo_knowledge_df = self.state["dbt_repo_knowledge_df"]
        check_model_ouput_json = self.state["check_model_ouput"]

        lineage_df = create_rag_db.calculate_dbt_lineage(dbt_repo_knowledge_df)

        search_models_impacted_by_change_ouput = search_models_impacted_task_crew.kickoff(
            inputs={
                "request": request,
                "identified_model": str(check_model_ouput_json['identified_model']),
                "search_impacted_models_ouput": str(search_impacted_models_ouput),
                "lineage_df": str(lineage_df)
            }
        )
        search_models_impacted_by_change_ouput_json =  eval(search_models_impacted_by_change_ouput.raw.replace("```json", "").replace("```", "").strip())
        self.state["search_models_impacted_by_change_ouput"] = search_models_impacted_by_change_ouput_json
        return search_models_impacted_by_change_ouput_json

    @listen(search_needed_models_for_change)
    def retrieve_context_for_solution_main_model(self):
        search_needed_models_for_change_ouput = self.state["search_needed_models_for_change_ouput"]

        vectorstore = self.state["vectorstore"]
        embedding_function = self.state["embedding_function"]
        retriever = EnhancedRetriever(vectorstore = vectorstore, embedding_function= embedding_function)

        retrieve_context_for_solution_main_model = ""
        for model in search_needed_models_for_change_ouput['upstream_models']:
            retriever_input = f"""\n
                RELATION: parent model
                MODEL NAME: {model['model_name']}
                CONTEXT NEEDED FOR: {model['requirement']}
            \n"""
            _, retrieved_documents = retriever.retrieve(retriever_input)
            retrieved_context = "\n".join([doc.page_content for doc in retrieved_documents if hasattr(doc, 'page_content')])
            retrieve_context_for_solution_main_model += retriever_input + retrieved_context

        return retrieve_context_for_solution_main_model    

    @listen(search_models_impacted_by_change)
    def retrieve_context_for_solution_impacted_models(self):
        search_models_impacted_by_change_ouput = self.state["search_models_impacted_by_change_ouput"]

        vectorstore = self.state["vectorstore"]
        embedding_function = self.state["embedding_function"]
        retriever = EnhancedRetriever(vectorstore = vectorstore, embedding_function= embedding_function)

        retrieve_context_for_solution_impacted_models = ""
        for model_group in ['upstream_models', 'downstream_models']:
            for model in search_models_impacted_by_change_ouput.get(model_group, []):
                retriever_input = f"""
                    RELATION: {model_group}
                    MODEL NAME: {model['model_name']}
                    CONTEXT NEEDED FOR: {model['requirement']}
                """
                _, retrieved_documents = retriever.retrieve(retriever_input)
                retrieved_context = "\n".join([doc.page_content for doc in retrieved_documents if hasattr(doc, 'page_content')])
                retrieve_context_for_solution_impacted_models += retriever_input + retrieved_context

        return retrieve_context_for_solution_impacted_models

    @listen(retrieve_context_for_solution_main_model)
    def design_solution_main_model(self, retrieve_context_for_solution_main_model):
        request = self.state["request"]
        search_impacted_models_ouput = self.state["search_impacted_models_ouput"] #info about the model in markdown format
        identified_model_documents = self.state["identified_model_documents"]
        dbt_repo_knowledge_df = self.state["dbt_repo_knowledge_df"]

        lineage_df = create_rag_db.calculate_dbt_lineage(dbt_repo_knowledge_df)
        design_solution_main_model_output = solution_design_crew.kickoff(
            inputs={
                "request": request,
                "identified_model_documents": str(identified_model_documents),
                "search_impacted_models_ouput": str(search_impacted_models_ouput),
                "retrieved_context_complete": str(retrieve_context_for_solution_main_model),
                "lineage_df": str(lineage_df)
            }
        )
        
        self.state["design_solution_main_model_ouput"] = design_solution_main_model_output
        return design_solution_main_model_output

    @listen(and_(design_solution_main_model, retrieve_context_for_solution_impacted_models))
    def design_solution_impacted_models(self, retrieve_context_for_solution_impacted_models):
        request = self.state["request"]
        design_solution_main_model_ouput = self.state["design_solution_main_model_ouput"]
        search_models_impacted_by_change_ouput = self.state["search_models_impacted_by_change_ouput"]
        dbt_repo_knowledge_df = self.state["dbt_repo_knowledge_df"]

        lineage_df = create_rag_db.calculate_dbt_lineage(dbt_repo_knowledge_df)
        design_solution_impacted_models_output = solution_design_models_impacted_crew.kickoff(
            inputs={
                "request": request,
                "design_solution_main_model_ouput": str(design_solution_main_model_ouput),
                "search_models_impacted_by_change_ouput": str(search_models_impacted_by_change_ouput),
                "retrieve_context_for_solution_impacted_models": str(retrieve_context_for_solution_impacted_models),
                "lineage_df": str(lineage_df)
            }
        )
        self.state["design_solution_impacted_models_output"] = design_solution_impacted_models_output
        return design_solution_impacted_models_output

    @listen(and_(design_solution_main_model, design_solution_impacted_models))
    def concilation_and_testing(self):
        request = self.state["request"]
        design_solution_main_model_ouput = self.state["design_solution_main_model_ouput"]
        design_solution_impacted_models_output = self.state["design_solution_impacted_models_output"]
        dbt_repo_knowledge_df = self.state["dbt_repo_knowledge_df"]
    
        lineage_df = create_rag_db.calculate_dbt_lineage(dbt_repo_knowledge_df)
        concilation_and_testing_output = concilation_and_testing_crew.kickoff(
            inputs={
                "request": request,
                "design_solution_main_model_ouput": str(design_solution_main_model_ouput),
                "design_solution_impacted_models_output": str(design_solution_impacted_models_output),
                "lineage_df": str(lineage_df)
            }
        )
        self.state["concilation_and_testing_output"] = concilation_and_testing_output
        return concilation_and_testing_output

flow = dbtChatFlow()
flow.plot()

user_input = "I want to add a new column 'overdue' to the model orders that come from raw_orders source, and have it available in customers. the overdue column is directly available in raw_orders, is not necessairy to calcylate it "
result = flow.kickoff(inputs={"request": user_input, "dbt_repo_knowledge_df": dbt_repo_knowledge_df, "vectorstore": loaded_vectorstore, "embedding_function":langchain_openai_embeddings})
result

Plot saved as crewai_flow.html
[1m[95m# Agent:[00m [1m[92mIdentify if the user's request explicitly mentions a specific model for retrieving information or implementing changes.[00m
[95m## Task:[00m [92mVerify if the request explicitly mentions a model that requires information retrieval or changes. Request: I want to add a new column 'overdue' to the model orders that come from raw_orders source, and have it available in customers. the overdue column is directly available in raw_orders, is not necessairy to calcylate it  Current dbt lineage of the dbt project:                model_name                source  \
0           stg_customers  [ecom.raw_customers]   
1           stg_locations     [ecom.raw_stores]   
2         stg_order_items      [ecom.raw_items]   
3              stg_orders     [ecom.raw_orders]   
4            stg_products   [ecom.raw_products]   
5            stg_supplies   [ecom.raw_supplies]   
6               customers                    []   
7              

CrewOutput(raw='### Changes to the Main Model: `orders.sql`\n\nThe `orders.sql` model has been updated to include the `overdue` column from the `stg_orders` staging model. The following changes were made:\n\n1. **Modified CTEs**: Included the `overdue` column in the CTE where the data is selected from the `stg_orders` model.\n\nUpdated SQL code for `orders.sql`:\n\n```sql\nWITH orders AS (\n  SELECT *,\n         overdue  -- Include the overdue column directly from the orders CTE\n  FROM {{ ref(\'stg_orders\') }}  \n),\norder_items AS (\n  SELECT *\n  FROM {{ ref(\'order_items\') }}\n),\norder_items_summary AS (\n  SELECT order_id,\n         SUM(supply_cost) AS order_cost,\n         SUM(product_price) AS order_items_subtotal,\n         COUNT(order_item_id) AS count_order_items,\n         SUM(CASE\n                 WHEN is_food_item THEN 1\n                 ELSE 0\n             END) AS count_food_items,\n         SUM(CASE\n                 WHEN is_drink_item THEN 1\n                 ELSE

In [42]:
display(Markdown(f"<div style='font-size: 18px;'><b>User input:</b> <i>{user_input}</i></div><hr>"))
display(Markdown(result.raw))

<div style='font-size: 18px;'><b>User input:</b> <i>I want to add a new column 'overdue' to the model orders that come from raw_orders source, and have it available in customers. the overdue column is directly available in raw_orders, is not necessairy to calcylate it </i></div><hr>

### Changes to the Main Model: `orders.sql`

The `orders.sql` model has been updated to include the `overdue` column from the `stg_orders` staging model. The following changes were made:

1. **Modified CTEs**: Included the `overdue` column in the CTE where the data is selected from the `stg_orders` model.

Updated SQL code for `orders.sql`:

```sql
WITH orders AS (
  SELECT *,
         overdue  -- Include the overdue column directly from the orders CTE
  FROM {{ ref('stg_orders') }}  
),
order_items AS (
  SELECT *
  FROM {{ ref('order_items') }}
),
order_items_summary AS (
  SELECT order_id,
         SUM(supply_cost) AS order_cost,
         SUM(product_price) AS order_items_subtotal,
         COUNT(order_item_id) AS count_order_items,
         SUM(CASE
                 WHEN is_food_item THEN 1
                 ELSE 0
             END) AS count_food_items,
         SUM(CASE
                 WHEN is_drink_item THEN 1
                 ELSE 0
             END) AS count_drink_items
  FROM order_items
  GROUP BY order_id
),
compute_booleans AS (
  SELECT orders.*,
         order_items_summary.order_cost,
         order_items_summary.order_items_subtotal,
         order_items_summary.count_food_items,
         order_items_summary.count_drink_items,
         order_items_summary.count_order_items,
         order_items_summary.count_food_items > 0 AS is_food_order,
         order_items_summary.count_drink_items > 0 AS is_drink_order
  FROM orders
  LEFT JOIN order_items_summary ON orders.order_id = order_items_summary.order_id
),
customer_order_count AS (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY ordered_at ASC) AS customer_order_number
  FROM compute_booleans
)

SELECT *
FROM customer_order_count
```

### Updates to the YAML Documentation for `orders`

The updated `.yml` file for the `orders` model has been modified to document the `overdue` column:

```yaml
models:
  - name: orders
    description: "Order overview data mart, offering key details for each order including if it's a customer's first order and a food vs. drink item breakdown. One row per order."
    columns:
      - name: order_id
        description: "The unique key of the orders mart."
        data_tests: ['not_null', 'unique']
      - name: customer_id
        description: "The foreign key relating to the customer who placed the order."
        data_tests:
          - relationships:
              to: "ref('stg_customers')"
              field: customer_id
      - name: ordered_at
        description: "The timestamp the order was placed at."
      - name: order_cost
        description: "The sum of supply expenses to fulfill the order."
      - name: overdue
        description: "Indicates whether the order is past its due date."
      - name: is_food_order
        description: "A boolean indicating if this order included any food items."
      - name: is_drink_order
        description: "A boolean indicating if this order included any drink items."
```

### Changes to the Affected Models: `customers.sql`

The `customers.sql` model has been updated to utilize the `overdue` column from the `orders` model:

Updated SQL code for `customers.sql`:

```sql
WITH customer_orders AS (
  SELECT customer_id,
         COUNT(order_id) AS total_orders,
         SUM(order_cost) AS total_order_value,
         MAX(ordered_at) AS last_order_date,
         SUM(CASE WHEN overdue THEN 1 ELSE 0 END) AS total_overdue_orders  -- Count overdue orders
  FROM {{ ref('orders') }}  
  GROUP BY customer_id
)

SELECT c.customer_id,
       c.customer_name,
       customer_orders.total_orders,
       customer_orders.total_order_value,
       customer_orders.last_order_date,
       customer_orders.total_overdue_orders,  -- Include the total overdue orders
       CASE 
         WHEN customer_orders.total_overdue_orders > 0 THEN TRUE 
         ELSE FALSE 
       END AS has_overdue_orders  -- Flag to indicate if customer has overdue orders
FROM {{ ref('stg_customers') }} AS c
LEFT JOIN customer_orders ON c.customer_id = customer_orders.customer_id
```

### Update the Documentation for `customers`

The updated `.yml` file for the `customers` model includes the new fields:

```yaml
models:
  - name: customers
    description: "Customer data mart providing insights on customer orders including any overdue orders."
    columns:
      - name: customer_id
        description: "The unique identifier for the customer."
        data_tests: ['not_null', 'unique']
      - name: customer_name
        description: "The name of the customer."
      - name: total_orders
        description: "The total number of orders placed by the customer."
      - name: total_order_value
        description: "The total monetary value of orders placed by the customer."
      - name: last_order_date
        description: "The date the last order was placed by the customer."
      - name: total_overdue_orders
        description: "The total number of overdue orders associated with this customer."
      - name: has_overdue_orders
        description: "Indicates if the customer has any overdue orders (boolean)."
```

### Tests Required for Verification

1. **Test for Duplicates**:
   - **Description**: Ensure that the primary keys in the `orders` model remain unique and that no duplicates were inadvertently introduced.
   - **SQL Query**:
   ```sql
   SELECT order_id, COUNT(*)
   FROM {{ ref('orders') }}
   GROUP BY order_id
   HAVING COUNT(*) > 1
   ```

2. **Test for Null Values in New Column**:
   - **Description**: Verify that the `overdue` column is not completely null.
   - **SQL Query**:
   ```sql
   SELECT COUNT(*) AS null_overdue_count
   FROM {{ ref('orders') }}
   WHERE overdue IS NULL
   ```

3. **Test for Consistent Row Count**:
   - **Description**: Check that the number of registers in the `orders` model remains consistent with the upstream source (`stg_orders`).
   - **SQL Query**:
   ```sql
   SELECT (SELECT COUNT(*) FROM {{ ref('stg_orders') }}) AS stg_orders_count,
          (SELECT COUNT(*) FROM {{ ref('orders') }}) AS orders_count
   ```

4. **Test for Granularity Consistency**:
   - **Description**: Confirm that the granularity of the `orders` model has not changed.
   - **SQL Query**:
   ```sql
   SELECT COUNT(DISTINCT order_id) AS distinct_order_count
   FROM {{ ref('orders') }}
   ```

### Conclusion

The changes made to the `orders` and `customers` models effectively satisfy the user request to add the `overdue` column, while maintaining the integrity of the dbt project. The specified tests should be executed to ensure the correctness and reliability of the implemented changes.