In [1]:
import os
import re
import pickle
import traceback
import concurrent.futures
from glob import glob

import numpy as np
from tqdm import trange, tqdm

from KeywordSearch import loader, indexing, utils, kwsearch
from KeywordSearch.cloud_index import prepare_tokendict_for_upload, upload_firestore

Using `is` instead of `=` for comparison in performance-critical code is acceptable
Downloading stopwords...


[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\10022\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [2]:
from google.cloud import firestore
db = firestore.Client(project="moonlit-oven-412316")
index_api = db.collection("index")

In [3]:
pickled_segments = glob("*_merged.pkl", root_dir=loader.index_dir)
regex_segment_id = re.compile("([0-9]+)_merged.pkl")

In [4]:
finished: dict[int, list] = dict()
failed: dict[int, list] = dict()
num_segments = len(pickled_segments)
with concurrent.futures.ProcessPoolExecutor() as pool:
    with tqdm(total=len(loader.all_tokens), desc="Initializing", ncols=120, ascii=True) as pbar:
        for counter, fname in enumerate(pickled_segments, 1):
            segment_id = int(regex_segment_id.fullmatch(fname).group(1))
            offset = 5000 * segment_id
            finished[segment_id] = []
            failed[segment_id] = []
            with open(os.path.join(loader.index_dir, fname), "rb") as f:
                current_slice = pickle.load(f)
            pbar.set_description(f"Segment {counter:4d}/{num_segments:4d}")
            jobs = {pool.submit(prepare_tokendict_for_upload, token_dict, i) : i 
                    for i, token_dict in enumerate(current_slice, start=offset)}
            for job in concurrent.futures.as_completed(jobs):
                token_id = jobs[job]
                try:
                    for doc in job.result():
                        upload_firestore(doc, index_api)
                except Exception as e:
                    failed[segment_id].append(token_id)
                    with open(loader.LOG_PATH, 'a', encoding="UTF-8") as f:
                        f.write(f"Upload failure at {token_id}:\n{''.join(traceback.format_exception(e))}\n")
                finished[segment_id].append(token_id)
                pbar.update()

Segment   35/2987:   1%|5                                               | 174499/14930836 [1:28:51<117:13:29, 34.97it/s]