forked from tvergho/ai-card-cutting
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
267 lines (226 loc) · 7.84 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
import openai_async
import tiktoken
import json
from datetime import datetime
import asyncio
from utils_highlight import highlight_substrings
from constants import MAX_PROMPT_LENGTH
import re
import yaml
from dotenv import load_dotenv
import os
import openai
encoding = tiktoken.encoding_for_model("text-babbage-001")
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
def num_tokens_from_string(string):
"""Returns the number of tokens in a text string."""
num_tokens = len(encoding.encode(string))
return int(num_tokens)
def format_prompt_for_openai_completion(tag, bodyText, underlines=None):
if underlines is None:
# We just pass in the card body text as input
bodyTextArr = bodyText.split(" ")
chunk = ""
bodyTextChunks = []
chunk_len = 0
for word in bodyTextArr:
tokens = num_tokens_from_string(word)
if chunk_len + tokens > MAX_PROMPT_LENGTH - 100:
bodyTextChunks.append(chunk)
chunk = ""
chunk_len = 0
chunk += word + " "
chunk_len += tokens
bodyTextChunks.append(chunk)
return [f"Tag: {tag}\n\nInput: {text}\n\n###\n\nHighlighted Text:" for text in bodyTextChunks], bodyTextChunks
else:
try:
underlines_arr = json.loads(underlines.strip())
chunk = []
bodyTextChunks = []
chunk_len = 0
for underline in underlines_arr:
tokens = num_tokens_from_string(underline)
if chunk_len + tokens + 5 > MAX_PROMPT_LENGTH - 100:
bodyTextChunks.append(chunk)
chunk = []
chunk_len = 0
chunk.append(underline)
chunk_len += tokens + 5
bodyTextChunks.append(chunk)
return [f"Tag: {tag}\n\nInput: {json.dumps(chunk)}\n\n###\n\nHighlighted Text:" for chunk in bodyTextChunks], bodyTextChunks
except Exception as e:
print(e)
return None
def fix_escaped_unicode(s):
def replace(match):
unicode_code = int(match.group(1), 16)
return chr(unicode_code)
# Find the improperly escaped Unicode characters and replace them
return re.sub(r'\\u([0-9a-fA-F]{4})', replace, s)
def fix_truncated_json(json_string):
# Add the missing closing characters
candidates = [
json_string,
json_string + ']',
json_string + '"]',
json_string.rstrip(',') + '"]',
]
# Try to parse each candidate as JSON and return the first valid one
for candidate in candidates:
try:
parsed_json = json.loads(candidate)
return candidate
except json.JSONDecodeError:
pass
# If none of the candidates are valid JSON, return the original string
return json_string
async def get_completion(prompt, model, debug=False):
try:
num_tokens_in_prompt = num_tokens_from_string(prompt)
if num_tokens_in_prompt > MAX_PROMPT_LENGTH:
print("Prompt too long")
return None
if debug:
print("Max tokens: " + str(2048-num_tokens_in_prompt))
# response = openai.Completion.create(
# model=model,
# prompt=prompt,
# max_tokens=2048-num_tokens_in_prompt-10,
# temperature=0,
# stop=["\n", "END"]
# )
response = await openai_async.complete(
openai_api_key,
timeout=10,
payload={
"model": model,
"prompt": prompt,
"max_tokens": 2048-num_tokens_in_prompt-10,
"temperature": 0,
"stop": ["\n", "END"]
}
)
choices = response.json()['choices']
output = choices[0]['text'].strip()
if debug:
print(output)
output = fix_truncated_json(output.strip())
output = fix_escaped_unicode(output)
output = output.replace("\\u201", "")
# output = output.replace("“", "\"")
# output = output.replace("”", "\"")
output_arr = yaml.safe_load(output)
return output_arr
except Exception as e:
print(e)
return None
async def get_completions_from_input(tag, bodyText, model, underlines=None, debug=False, paragraphs=[]):
if underlines is not None:
prompts, _ = format_prompt_for_openai_completion(tag, bodyText, underlines)
chunks = None
else:
prompts, chunks = format_prompt_for_openai_completion(tag, bodyText, None)
if prompts is None:
print("Invalid input")
return None
results = await asyncio.gather(*[get_completion(prompt, model, debug=debug) for prompt in prompts])
if results is None or any(map(lambda x: x is None, results)):
print("Invalid output")
return None
# Strip each item in the array
parsed_results = [[s.strip().replace("\n", "") for s in sublist] for sublist in results]
output_str = ""
loc = []
if chunks:
for i, result in enumerate(parsed_results):
os, l = highlight_substrings(chunks[i], result, debug=debug, paragraphs=paragraphs)
output_str += os
loc.extend(l)
else:
# Flatten results
parsed_results = [item for sublist in parsed_results for item in sublist]
output_str, loc = highlight_substrings(bodyText, parsed_results, debug=debug, paragraphs=paragraphs)
return output_str, loc
## OpenAI API
def create_openai_file(model_name, file_path):
try:
response = openai.File.create(
file=open(file_path, "rb"),
purpose='fine-tune',
user_provided_filename=model_name
)
file_id = response.id
print(f"Successfully created file from {file_path}")
print(f"File ID: {file_id}")
except Exception as e:
print(e)
def list_openai_files():
try:
response = openai.File.list()
for file in response["data"]:
if file["purpose"] == "fine-tune":
date = str(datetime.fromtimestamp(int(file["created_at"])))
print(f"File ID: {file['id']}, created at: {date}, filename: {file['filename']}")
except Exception as e:
print(e)
def create_finetune(file_id, open_ai_model, model_name):
try:
response = openai.FineTune.create(training_file=file_id, model=open_ai_model, n_epochs=3, suffix=model_name)
print(f"Created fine tune for file ID {file_id} with model {open_ai_model}")
print(f"Fine tune ID: {response.id}")
except Exception as e:
print(e)
def list_finetunes():
try:
response = openai.FineTune.list()
for finetune in response["data"]:
date = str(datetime.fromtimestamp(int(finetune["created_at"])))
print(f"Fine tune ID: {finetune['id']}, created at: {date}, model: {finetune['model']}, status: {finetune['status']}")
except Exception as e:
print(e)
def get_finetune(finetune_id):
try:
response = openai.FineTune.retrieve(finetune_id)
date = str(datetime.fromtimestamp(int(response.created_at)))
print(f"Fine tune ID: {response.id}, created at: {date}, model: {response.model}, status: {response.status}")
for event in response.events:
print(event["message"])
except Exception as e:
print(e)
def list_models():
try:
response = openai.Model.list()
for model in response["data"]:
print(f"Model: {model['id']}")
except Exception as e:
print(e)
## Cost calculation
def count_tokens(file_path):
total_tokens = 0
with open(file_path, 'r') as f:
for line in f:
data = json.loads(line)
text = data.get('prompt', '') + data.get('completion', '')
tokens = num_tokens_from_string(text)
total_tokens += tokens
return total_tokens
def calculate_fine_tuning_cost(file_path):
# Define the models and their training cost per 1K tokens
models = {
'Ada': 0.0004,
'Babbage': 0.0006,
'Curie': 0.0030,
'Davinci': 0.0300
}
# Count the total tokens in the .jsonl file
total_tokens = count_tokens(file_path)
# Calculate the total cost for fine-tuning with each model
costs = {}
for model, cost_per_1k_tokens in models.items():
cost = (total_tokens / 1000) * cost_per_1k_tokens
costs[model] = cost
print("Cost per epoch (default 3)")
for model, cost in costs.items():
print(f'{model}: ${cost:.2f}')