-
Notifications
You must be signed in to change notification settings - Fork 0
/
qa.py
181 lines (154 loc) · 6.24 KB
/
qa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from src.generate.expand import QuestionVariantGenerator
from src.cache.semantic import stream_update
from src.cache.semantic import query_match
from src.cache.semantic import doc_match
from src.config.logging import logger
from src.cache.exact import match
from src.cache.exact import add
from typing import Optional
from typing import Dict
import time
generator = QuestionVariantGenerator()
def meets_threshold(confidence: float) -> bool:
"""
Determine if the confidence level meets the required threshold.
Parameters:
confidence (float): The confidence level of a match.
Returns:
bool: True if confidence is greater than 0.9, False otherwise.
"""
return confidence > 0.9
def handle_semantic_match(question: str, closest_match: Dict[str, any], start_time: float) -> Dict[str, Optional[str]]:
"""
Handle the case where there is a semantic match for the question.
Parameters:
question (str): User's question.
closest_match (dict): Closest matching question and its metadata.
start_time (float): Timestamp when the query processing started.
Returns:
dict: Details about the processed question and the match.
"""
confidence = closest_match['confidence']
if meets_threshold(confidence):
query = closest_match['query']
answer = match(query)
if answer:
stream_update(question)
add(question, answer)
return {
"question": question,
"closest_question": query,
"match_type": "SEMANTIC",
"confidence": confidence,
"answer": answer,
"execution_time": (time.time() - start_time) * 1000
}
else:
logger.error(f'Closest matching query: {query} does not have a mapping key value pair in the standard cache!')
else:
return handle_native_search(question, start_time)
def handle_native_search(question: str, start_time: float) -> Dict[str, Optional[str]]:
"""
Handle native search for a question when no semantic or exact matches are found.
Parameters:
question (str): User's question.
start_time (float): Timestamp when the query processing started.
Returns:
dict: Details about the processed question.
"""
try:
answer = doc_match(question)
stream_update(question)
add(question, answer)
return {
"question": question,
"closest_question": "NA",
"match_type": "NATIVE",
"confidence": "NA",
"answer": answer,
"execution_time": (time.time() - start_time) * 1000
}
except Exception as e:
logger.error(f"Error during native search: {str(e)}")
def handle_exact_match(question: str, answer: str, start_time: float) -> Dict[str, Optional[str]]:
"""
Handle the response for an exact match found for the question.
Parameters:
question (str): User's question.
answer (str): The answer associated with the exact match.
start_time (float): Timestamp when the query processing started.
Returns:
dict: Details about the question and the exact match found.
"""
try:
return {
"question": question,
"closest_question": "NA",
"match_type": "EXACT",
"confidence": "NA",
"answer": answer,
"execution_time": (time.time() - start_time) * 1000
}
except Exception as e:
logger.error(f"Error handling exact match: {str(e)}")
def pipeline(question: str) -> Dict[str, Optional[str]]:
"""
Processing pipeline for handling a question, searching for exact, semantic, and native matches.
Parameters:
question (str): User's question.
Returns:
dict: Results of processing the question through the pipeline.
"""
start_time = time.time()
try:
answer = match(question)
if not answer: # No exact match found
closest_match = query_match(question)
if closest_match:
return handle_semantic_match(question, closest_match, start_time)
else:
return handle_native_search(question, start_time)
else:
# Exact match found
return handle_exact_match(question, answer, start_time)
except Exception as e:
logger.error(f"Error processing the question: {str(e)}")
return {
"question": question,
"error": str(e),
"execution_data": (time.time() - start_time) * 1000
}
if __name__ == '__main__':
# TEST 1: Test exact match - user question and relevant correct answer is already available in Redis Memorystore.
# Warmup query - to compensate for LLM load times.
print('WARM UP')
question = "What was Google's operating income (in billions) at the end of March 2021, and how did it compare to the same period of the previous year?"
answer = pipeline(question)
print(answer)
print('-' * 100)
# Test run 1
print('TEST 1')
question = "What was Google's operating income (in billions) at the end of March 2021, and how did it compare to the same period of the previous year?"
answer = pipeline(question)
print(answer)
print('-' * 100)
# TEST 2: Test semantic query match - user question is semantically similar to an already asked question stored in Vertex AI Vector Store.
# Note: The match meets confidence threshold.
print('TEST 2')
question = "What was Google's operating income (in billions) at the end of March 2021, and how did it compare to the same period of the previous year?"
variant = generator.generate_variant(question)
logger.info(f'Query variant => {variant}')
answer = pipeline(variant)
print(answer)
print('-' * 100)
# TEST 3: Test exact match after upsert
print('TEST 3')
answer = pipeline(variant)
print(answer)
print('-' * 100)
# TEST 4: Test semantic query match that failed to meet the confidence threshold
print('TEST 4')
question = "What is the total amount of investment announced by Amazon, for the Delivery Service Partner (DSP) program in Q3 of 2023?"
answer = pipeline(question)
print(answer)
print('-' * 100)