-
Notifications
You must be signed in to change notification settings - Fork 2
/
openai_account_manager.py
368 lines (293 loc) · 14.8 KB
/
openai_account_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
import logging
from typing import Union
import openai
import fcntl
import threading
import tqdm
from multi_thread_openai_api_call import MyThread
logger = logging.getLogger()
class OpenAI_Account_Manager:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = object.__new__(cls)
return cls._instance
def __init__(self, used_account_fp, all_account_fp):
self.used_account_fp = used_account_fp
self.all_account_fp = all_account_fp
used_account_f = open(used_account_fp, 'r')
used_account = list(map(lambda x: x.strip().split('----'), used_account_f.readlines()))
used_account_f.close()
all_account_f = open(all_account_fp, 'r')
all_account = list(map(lambda x: x.strip().split('----'), all_account_f.readlines()))
all_account_f.close()
used_account_key = list(map(lambda x: x[-1], used_account))
all_account = list(filter(lambda x: x[-1] not in used_account_key, all_account))
self.used_account = used_account
self.all_account = all_account
openai.api_key = self.all_account[0][-1]
logger.info(
'successfully build OpenAI_Account_Manager, now the number of available accounts is {} and now api_key is {}'.format(
len(self.all_account), self.all_account[0][-1]))
def use_next_account(self):
self.used_account.append(self.all_account[0])
del self.all_account[0]
with open(self.used_account_fp, 'a') as tmp_used_account_f:
fcntl.fcntl(tmp_used_account_f.fileno(), fcntl.LOCK_EX)
print('----'.join(self.used_account[-1]), file=tmp_used_account_f)
logger.info(
'account:[{}, {}, {}] runs out. so use next.'.format(self.used_account[-1][0], self.used_account[-1][1],
self.used_account[-1][2]))
openai.api_key = self.all_account[0][-1]
class OpenAI_Account_Manager_MultiThread:
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = object.__new__(cls)
return cls._instance
def __init__(self, used_account_fp, all_account_fp):
self.now_account_idx = 0
self.used_account_fp = used_account_fp
self.all_account_fp = all_account_fp
used_account_f = open(used_account_fp, 'r')
used_account = list(map(lambda x: x.strip().split('----'), used_account_f.readlines()))
used_account_f.close()
all_account_f = open(all_account_fp, 'r')
all_account = list(map(lambda x: x.strip().split('----'), all_account_f.readlines()))
all_account_f.close()
used_account_key = list(map(lambda x: x[-1], used_account))
all_account = list(filter(lambda x: x[-1] not in used_account_key, all_account))
self.used_account = used_account
self.all_account = all_account
self.using_account = []
# openai.api_key = self.all_account[0][-1]
logger.info(
'successfully build OpenAI_Account_Manager, now the number of available accounts is {} and now api_key is {}'.format(
len(self.all_account), self.all_account[0][-1]))
self.next_account_lock = threading.Lock()
self.empty_account_lock = threading.Lock()
def get_next_account(self, thread_id, last_empty_account=None):
with self.next_account_lock:
result = self.all_account[0]
self.using_account.append(self.all_account[0])
del self.all_account[0]
if last_empty_account != None:
self.record_empty_account(last_empty_account)
logger.info('Thread {} account: [{}, {}, {}] '
'runs out'.format(thread_id,
self.used_account[-1][0],
self.used_account[-1][1],
self.used_account[-1][2]))
logger.info('Thread {} use next account: [{}, {}, {}] '
.format(thread_id, result[0],
result[1],
result[2]))
else:
logger.info('Thread {} first account: [{}, {}, {}] '
.format(thread_id, result[0],
result[1],
result[2]))
# openai.api_key = self.all_account[0][-1]
return result
def record_empty_account(self, empty_account):
with self.empty_account_lock:
self.used_account.append(empty_account)
with open(self.used_account_fp, 'a') as tmp_used_account_f:
fcntl.fcntl(tmp_used_account_f.fileno(), fcntl.LOCK_EX)
print('----'.join(self.used_account[-1]), file=tmp_used_account_f)
class OpenAI_Account_Manager_MultiThread_One_Acount_Many_Used:
'''
OpenAI_Account_Manager_MultiThread_One_Acount_Many_Used: when OpenAI_Account_Manager_MultiThread uses one account for one thread,
so the number of threads is limited by the number of accounts.
OpenAI_Account_Manager_MultiThread_One_Acount_Many_Used support multiple threads using one account.
'''
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = object.__new__(cls)
return cls._instance
def __init__(self, used_account_fp: str, all_account_fp: str, limit_account_num: int=-1) -> None:
"""Class init
Args
----
used_account_fp: str
Path to file containing used OpenAI accounts.
all_account_fp: str
Path to file containing all OpenAI accounts.
limit_account_num: int=-1
Number of available accounts.
"""
if hasattr(self, 'inited'):
return
self.inited = 1
self.now_account_idx = 0
self.used_account_fp = used_account_fp
self.all_account_fp = all_account_fp
used_account_f = open(used_account_fp, 'r')
used_account = list(map(lambda x: x.strip().split('----'), used_account_f.readlines()))
used_account_f.close()
all_account_f = open(all_account_fp, 'r')
all_account = list(map(lambda x: x.strip().split('----'), all_account_f.readlines()))
all_account_f.close()
used_account_key = []
for account in used_account:
if len(account) == 4:
used_account_key.append(account[-2])
else:
used_account_key.append(account[-1])
# Keep only usable account.
all_account = list(filter(lambda x: x[-1] not in used_account_key, all_account))
temp_all_account = []
for account in all_account:
if len(account) == 4 and account[-2] not in used_account_key:
temp_all_account.append(account)
elif len(account) == 3 and account[-1] not in used_account_key:
temp_all_account.append(account)
else:
raise Exception
all_account = temp_all_account
if limit_account_num > 0:
all_account = all_account[:limit_account_num]
self.used_account = used_account
self.used_account_key = set(used_account_key)
self.all_account = all_account
self.using_account = []
self.thread_to_account = {}
logger.info('successfully build OpenAI_Account_Manager, now the number of available accounts is {}'.format(len(self.all_account)))
self.next_account_lock = threading.Lock()
self.empty_account_lock = threading.Lock()
def get_next_account(self, thread_id, last_empty_account=None):
with self.next_account_lock:
available_num = self.check_available_account_num()
if available_num == 0:
logger.info('all accounts used, so..')
logger.info('all accounts used, so..')
logger.info('all accounts used, so..')
logger.info('all accounts used, so..')
logger.info('all accounts used, so..')
else:
logger.info('now available accounts : {}'.format(available_num))
while True:
result = self.all_account[self.now_account_idx]
if result[-1] in self.used_account_key or result[-2] in self.used_account_key:
self.now_account_idx += 1
self.now_account_idx = self.now_account_idx % len(self.all_account)
else:
break
result = self.all_account[self.now_account_idx]
self.now_account_idx += 1
self.now_account_idx = self.now_account_idx % len(self.all_account)
if last_empty_account != None:
self.record_empty_account(last_empty_account)
logger.info('Thread {} account: [{}, {}, {}] '
'runs out'.format(thread_id,
self.used_account[-1][0],
self.used_account[-1][1],
self.used_account[-1][2]))
logger.info('Thread {} use next account: [{}, {}, {}] '
.format(thread_id, result[0],
result[1],
result[2]))
else:
logger.info('Thread {} first account: [{}, {}, {}] '
.format(thread_id, result[0],
result[1],
result[2]))
return result
def record_empty_account(self, empty_account):
with self.empty_account_lock:
self.used_account.append(empty_account)
if len(empty_account) == 4:
self.used_account_key.add(empty_account[-2])
else:
self.used_account_key.add(empty_account[-1])
with open(self.used_account_fp, 'a') as tmp_used_account_f:
fcntl.fcntl(tmp_used_account_f.fileno(), fcntl.LOCK_EX)
print('----'.join(self.used_account[-1]), file=tmp_used_account_f)
def check_available_account_num(self):
available_num = 0
for account in self.all_account:
if len(account) == 4 and account[-2] not in self.used_account_key:
available_num += 1
elif len(account) == 3 and account[-1] not in self.used_account_key:
available_num += 1
else:
raise Exception
return available_num
def get_account_manager(
account_file: str,
used_file: str,
multi_thread: bool=False,
limit_account_num: int=-1
) -> Union[OpenAI_Account_Manager_MultiThread_One_Acount_Many_Used, OpenAI_Account_Manager]:
"""Get an instance of managing openai accounts.
Args
----
account_file: str
The file containing available username, password and key of OpenAI API account.
used_file: str
The file containing unavailable username, password and key of OpenAI API account.
multi_thread: bool=False
Whether to use multi-thread or not.
limit_account_num: int=-1
Number of available accounts.
Returns
-------
result: Union[OpenAI_Account_Manager_MultiThread_One_Acount_Many_Used, OpenAI_Account_Manager]
An instance of class OpenAI_Account_Manager_MultiThread_One_Acount_Many_Used or OpenAI_Account_Manager
"""
if multi_thread:
result = OpenAI_Account_Manager_MultiThread_One_Acount_Many_Used(account_file, used_file, limit_account_num=limit_account_num)
else:
result = OpenAI_Account_Manager(account_file, used_file)
return result
class OpenAI_API_inp_Manager_MultiThread:
def __init__(self, idx_x_list_to_decode, inference_hyper_parameter):
self.idx_x_list_to_decode = idx_x_list_to_decode
self.inp_lock = threading.Lock()
self.progress_index = 0
assert type(inference_hyper_parameter) == type([])
assert type(inference_hyper_parameter[0]) == type({})
if len(inference_hyper_parameter) == 1:
inference_hyper_parameter = inference_hyper_parameter * len(self.idx_x_list_to_decode)
assert len(self.idx_x_list_to_decode) == len(inference_hyper_parameter), \
'idx_x_list_to_decode:{}, inference_hyper_parameter:{}' \
.format(len(idx_x_list_to_decode), len(inference_hyper_parameter))
self.inference_hyper_parameter = inference_hyper_parameter
for i in range(len(inference_hyper_parameter)):
assert 'max_tokens' in inference_hyper_parameter[i], "{} th inference_hyper_parameter has no max_length"
def get_next_gpt_idx_inp(self):
with self.inp_lock:
if self.progress_index < len(self.idx_x_list_to_decode):
tmp_inp = self.idx_x_list_to_decode[self.progress_index]
tmp_hyper_parameter = self.inference_hyper_parameter[self.progress_index]
self.progress_index += 1
return {'inp': tmp_inp, 'hyper_parameter': tmp_hyper_parameter}
else:
return None
def openai_llm_generate_multi_thread(eval_data_openai_queries, llm, num_threads, use_tqdm,turbo_system_message=None):
# hyper_parameter = None
x_list_to_decode = list(map(lambda x:x['input'],eval_data_openai_queries))
max_tokens = list(map(lambda x:x['max_tokens'],eval_data_openai_queries))
idx_x_list_to_decode = list(enumerate(x_list_to_decode))
# eval_data_openai_queries = list(enumerate(eval_data_openai_queries))
hyper_parameter = list(map(lambda x:{'max_tokens':x},max_tokens))
inp_manager = OpenAI_API_inp_Manager_MultiThread(idx_x_list_to_decode, hyper_parameter)
thread_list = []
account_manager = get_account_manager('openai_account_files/used.txt', 'openai_account_files/accounts.txt', multi_thread=True)
if use_tqdm:
pbar = tqdm.tqdm(total=len(idx_x_list_to_decode))
else:
pbar = None
for i in range(num_threads):
thread_list.append(MyThread(i, llm, account_manager, inp_manager, 1, pbar, turbo_system_message))
for t in thread_list:
t.start()
for i, t in enumerate(thread_list):
t.join()
responses_with_idx = []
for t in thread_list:
responses_with_idx.extend(t.responses_with_idx)
responses_with_idx.sort(key=lambda x: x[0])
responses = list(map(lambda x: x[1], responses_with_idx))
return responses