In [1]:
from datasets import load_dataset
from concurrent.futures import ThreadPoolExecutor, as_completed

streaming = True

database_name="mc4"

if database_name=="mc4":
    dataset = load_dataset('mc4', 'ja', split='train', streaming=streaming)
elif database_name=="oscar":
    dataset=load_dataset('oscar', 'unshuffled_deduplicated_ja', split='train',streaming=streaming)
elif database_name=="cc100":
    dataset=load_dataset('cc100', lang='ja', split='train',streaming=streaming)
elif database_name=="shisa":
    dataset=load_dataset("augmxnt/shisa-pretrain-en-ja-v1",split="train",streaming=streaming)
else:
    raise ValueError(f"unknown database name: {database_name}")

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
from bertopic import BERTopic
model_path="data/topic_model.bin"
topic_model=BERTopic.load(model_path)

In [5]:
import os
base_dir="data/categorized"

def make_dir(path):
    if not os.path.exists(path):
        os.mkdir(path)

make_dir(base_dir)

In [6]:
import json

In [7]:
batch_size=10000
def proc(docs):
    # docsを処理する関数
    # ここに処理のロジックを実装します
    print(f"Processing {len(docs)} documents...")
    categories=topic_model.transform(docs)[0]

    for text,category in zip(docs,categories):
        save_dir=f"{base_dir}/{database_name}"
        make_dir(save_dir)

        data=json.dumps({"db":database_name,"text":text},ensure_ascii=False)
        with open(f"{save_dir}/{category}.jsonl","a") as f:
            f.write(data+"\n")

    return len(docs)

def main():
    
    docs = []
    futures = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        for doc in dataset:
            docs.append(doc["text"])
            if len(docs) == batch_size:
                # docsのコピーを作成してprocに渡す
                docs_copy = docs[:]
                # バッチをproc関数に非同期で渡す
                future = executor.submit(proc, docs_copy)
                futures.append(future)
                # docsをリセット
                docs = []
            #break

        # まだ処理されていないドキュメントがあれば、それも処理する
        if docs:
            futures.append(executor.submit(proc, docs))

        # すべての処理が完了するのを待つ
        for future in as_completed(futures):
            result = future.result()
            print(f"Batch processed with {result} documents.")


In [8]:
main()

Processing 10000 documents...


Batches:  94%|█████████▍| 294/313 [00:11<00:00, 34.67it/s]

Processing 10000 documents...


Batches: 100%|██████████| 313/313 [00:13<00:00, 23.18it/s]
Batches: 100%|██████████| 313/313 [00:19<00:00, 15.94it/s]


Processing 10000 documents...


Batches: 100%|██████████| 313/313 [00:11<00:00, 27.34it/s]


Processing 10000 documents...


Batches: 100%|██████████| 313/313 [00:11<00:00, 26.90it/s]


Processing 10000 documents...


Batches: 100%|██████████| 313/313 [00:11<00:00, 26.95it/s]


Processing 10000 documents...


Batches: 100%|██████████| 313/313 [00:11<00:00, 26.42it/s]
