-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathdescription_node.py
72 lines (55 loc) · 2.42 KB
/
description_node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""
DescriptionNode Module
"""
from typing import List, Optional
from langchain.prompts import PromptTemplate
from langchain_core.runnables import RunnableParallel
from tqdm import tqdm
from ..prompts.description_node_prompts import DESCRIPTION_NODE_PROMPT
from .base_node import BaseNode
class DescriptionNode(BaseNode):
"""
A node responsible for compressing the input tokens and storing the document
in a vector database for retrieval. Relevant chunks are stored in the state.
It allows scraping of big documents without exceeding the token limit of the language model.
Attributes:
llm_model: An instance of a language model client, configured for generating answers.
verbose (bool): A flag indicating whether to show print statements during execution.
Args:
input (str): Boolean expression defining the input keys needed from the state.
output (List[str]): List of output keys to be updated in the state.
node_config (dict): Additional configuration for the node.
node_name (str): The unique identifier name for the node, defaulting to "Parse".
"""
def __init__(
self,
input: str,
output: List[str],
node_config: Optional[dict] = None,
node_name: str = "DESCRIPTION",
):
super().__init__(node_name, "node", input, output, 2, node_config)
self.llm_model = node_config["llm_model"]
self.verbose = (
False if node_config is None else node_config.get("verbose", False)
)
self.cache_path = node_config.get("cache_path", False)
def execute(self, state: dict) -> dict:
self.logger.info(f"--- Executing {self.node_name} Node ---")
docs = list(state.get("docs"))
chains_dict = {}
for i, chunk in enumerate(
tqdm(docs, desc="Processing chunks", disable=not self.verbose)
):
prompt = PromptTemplate(
template=DESCRIPTION_NODE_PROMPT,
partial_variables={"content": chunk.get("document")},
)
chain_name = f"chunk{i+1}"
chains_dict[chain_name] = prompt | self.llm_model
async_runner = RunnableParallel(**chains_dict)
batch_results = async_runner.invoke({})
for i in range(1, len(docs) + 1):
docs[i - 1]["summary"] = batch_results.get(f"chunk{i}").content
state.update({self.output[0]: docs})
return state