-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathgenerate_answer_csv_node.py
171 lines (134 loc) · 6.3 KB
/
generate_answer_csv_node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""
Module for generating the answer node
"""
from typing import List, Optional
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableParallel
from langchain_mistralai import ChatMistralAI
from langchain_openai import ChatOpenAI
from tqdm import tqdm
from ..prompts import TEMPLATE_CHUKS_CSV, TEMPLATE_MERGE_CSV, TEMPLATE_NO_CHUKS_CSV
from ..utils.output_parser import (
get_pydantic_output_parser,
get_structured_output_parser,
)
from .base_node import BaseNode
class GenerateAnswerCSVNode(BaseNode):
"""
A node that generates an answer using a language model (LLM) based on the user's input
and the content extracted from a webpage. It constructs a prompt from the user's input
and the scraped content, feeds it to the LLM, and parses the LLM's response to produce
an answer.
Attributes:
llm_model: An instance of a language model client, configured for generating answers.
node_name (str): The unique identifier name for the node, defaulting
to "GenerateAnswerNodeCsv".
node_type (str): The type of the node, set to "node" indicating a
standard operational node.
Args:
llm_model: An instance of the language model client (e.g., ChatOpenAI) used
for generating answers.
node_name (str, optional): The unique identifier name for the node.
Defaults to "GenerateAnswerNodeCsv".
Methods:
execute(state): Processes the input and document from the state to generate an answer,
updating the state with the generated answer under the 'answer' key.
"""
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "GenerateAnswerCSV",
):
"""
Initializes the GenerateAnswerNodeCsv with a language model client and a node name.
Args:
llm_model: An instance of the OpenAIImageToText class.
node_name (str): name of the node
"""
super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"]
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
self.additional_info = node_config.get("additional_info")
def execute(self, state):
"""
Generates an answer by constructing a prompt from the user's input and the scraped
content, querying the language model, and parsing its response.
The method updates the state with the generated answer under the 'answer' key.
Args:
state (dict): The current state of the graph, expected to contain 'user_input',
and optionally 'parsed_document' or 'relevant_chunks' within 'keys'.
Returns:
dict: The updated state with the 'answer' key containing the generated answer.
Raises:
KeyError: If 'user_input' or 'document' is not found in the state, indicating
that the necessary information for generating an answer is missing.
"""
self.logger.info(f"--- Executing {self.node_name} Node ---")
input_keys = self.get_input_keys(state)
input_data = [state[key] for key in input_keys]
user_prompt = input_data[0]
doc = input_data[1]
if self.node_config.get("schema", None) is not None:
if isinstance(self.llm_model, (ChatOpenAI, ChatMistralAI)):
self.llm_model = self.llm_model.with_structured_output(
schema=self.node_config["schema"]
) # json schema works only on specific models
output_parser = get_structured_output_parser(self.node_config["schema"])
format_instructions = "NA"
else:
output_parser = get_pydantic_output_parser(self.node_config["schema"])
format_instructions = output_parser.get_format_instructions()
else:
output_parser = JsonOutputParser()
format_instructions = output_parser.get_format_instructions()
TEMPLATE_NO_CHUKS_CSV_PROMPT = TEMPLATE_NO_CHUKS_CSV
TEMPLATE_CHUKS_CSV_PROMPT = TEMPLATE_CHUKS_CSV
TEMPLATE_MERGE_CSV_PROMPT = TEMPLATE_MERGE_CSV
if self.additional_info is not None:
TEMPLATE_NO_CHUKS_CSV_PROMPT = self.additional_info + TEMPLATE_NO_CHUKS_CSV
TEMPLATE_CHUKS_CSV_PROMPT = self.additional_info + TEMPLATE_CHUKS_CSV
TEMPLATE_MERGE_CSV_PROMPT = self.additional_info + TEMPLATE_MERGE_CSV
chains_dict = {}
if len(doc) == 1:
prompt = PromptTemplate(
template=TEMPLATE_NO_CHUKS_CSV_PROMPT,
input_variables=["question"],
partial_variables={
"context": doc,
"format_instructions": format_instructions,
},
)
chain = prompt | self.llm_model | output_parser
answer = chain.invoke({"question": user_prompt})
state.update({self.output[0]: answer})
return state
for i, chunk in enumerate(
tqdm(doc, desc="Processing chunks", disable=not self.verbose)
):
prompt = PromptTemplate(
template=TEMPLATE_CHUKS_CSV_PROMPT,
input_variables=["question"],
partial_variables={
"context": chunk,
"chunk_id": i + 1,
"format_instructions": format_instructions,
},
)
chain_name = f"chunk{i+1}"
chains_dict[chain_name] = prompt | self.llm_model | output_parser
async_runner = RunnableParallel(**chains_dict)
batch_results = async_runner.invoke({"question": user_prompt})
merge_prompt = PromptTemplate(
template=TEMPLATE_MERGE_CSV_PROMPT,
input_variables=["context", "question"],
partial_variables={"format_instructions": format_instructions},
)
merge_chain = merge_prompt | self.llm_model | output_parser
answer = merge_chain.invoke({"context": batch_results, "question": user_prompt})
state.update({self.output[0]: answer})
return state