Skip to content

Commit

Permalink
Try to prevent dead locks
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas committed Jul 30, 2021
1 parent 42aeef3 commit 25c9090
Showing 1 changed file with 17 additions and 24 deletions.
41 changes: 17 additions & 24 deletions tools/preprocess_data_fast.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,25 +113,14 @@ def process_samples(simple_queue, process_index, args, level, writer: Connection

json_lines = simple_queue.get()
while json_lines is not None:
try:
process_json_lines(json_lines, encoder, builders, writer)
except:
# Debugging code in order to understand why the encoder can fail
for json_line in json_lines:
try:
if json_line.strip() == "":
continue
encoder.encode(json_line)
except:
print(repr(json_line))
print(json_line.strip() == "")
raise
process_json_lines(json_lines, encoder, builders, writer)

json_lines = simple_queue.get()

# in case finished, we still need to add None to signal to everyone else
# In case finished, we still need to add None to signal to everyone else
simple_queue.put(None)
# we need to send EOFError
# Send None as end of sequence signal
writer.send(None)
writer.close()

for key in args.json_keys:
Expand Down Expand Up @@ -235,13 +224,17 @@ def log(readers, log_interval):

while len(readers) != 0:
for r in multiprocessing.connection.wait(readers):
try:
nb_of_docs, bytes_processed = r.recv()
total_bytes_processed += bytes_processed
doc_processed += nb_of_docs
except EOFError:
data = r.recv()
if data is None:
# This means that a worker has finished.
r.close()
readers.remove(r)
print(f"Remaining workers: {len(readers)}")
continue

nb_of_docs, bytes_processed = data
total_bytes_processed += bytes_processed
doc_processed += nb_of_docs

if (doc_processed - logged_docs) >= log_interval:
logged_docs = doc_processed
Expand Down Expand Up @@ -292,15 +285,15 @@ def main():
for process in processes:
process.join()
process.close()
log_thread.join()
log_thread.join() #TODO: figure out why there seems to be a possible dead lock situation.

# TODO: this may be done after.
print("Merging files together")
print("Merging files together", flush=True)

tokenizer = build_tokenizer(args)

print(f"Vocab size: {tokenizer.vocab_size}")
print(f"Output prefix: {args.output_prefix}")
print(f"Vocab size: {tokenizer.vocab_size}", flush=True)
print(f"Output prefix: {args.output_prefix}", flush=True)
output_bin_files = {}
output_idx_files = {}
builders = {}
Expand Down

0 comments on commit 25c9090

Please sign in to comment.