In [1]:
import sys
sys.path.append('../src')

from wikidata_dumpreader import WikidataDumpReader
from wikidataLabelsDB import WikidataLabels
from multiprocessing import Manager
import asyncio
import gc
import os
import json
from datasets import Dataset

FILEPATH = os.getenv("FILEPATH", '../data/Wikidata/latest-all.json.bz2')
BATCH_SIZE = int(os.getenv("BATCH_SIZE", 5000))
PUSH_SIZE = int(os.getenv("PUSH_SIZE", 10000))
QUEUE_SIZE = int(os.getenv("QUEUE_SIZE", 10000))
NUM_PROCESSES = int(os.getenv("NUM_PROCESSES", 4))
SKIPLINES = 0

In [2]:
def save_labels_to_sqlite(item, data_batch, sqlitDBlock):
    if (item is not None):
        labels = WikidataLabels.clean_labels(item['labels'])
        labels = json.dumps(labels, separators=(',', ':'))
        data_batch.append({
            'id': item['id'],
            'labels': labels
        })

        with sqlitDBlock:
            if len(data_batch) > PUSH_SIZE:
                worked = WikidataLabels.add_bulk_labels(list(data_batch[:PUSH_SIZE]))
                if worked:
                    del data_batch[:PUSH_SIZE]
                    gc.collect()

async def run_processor(wikidata, bulk_ids, sqlitDBlock):
    await wikidata.run(lambda item: save_labels_to_sqlite(item, bulk_ids, sqlitDBlock), max_iterations=None, verbose=True)

multiprocess_manager = Manager()
sqlitDBlock = multiprocess_manager.Lock()
data_batch = multiprocess_manager.list()

wikidata = WikidataDumpReader(FILEPATH, num_processes=NUM_PROCESSES, batch_size=BATCH_SIZE, queue_size=QUEUE_SIZE, skiplines=SKIPLINES)

await run_processor(wikidata, data_batch, sqlitDBlock)

while len(data_batch) > 0:
    worked = WikidataLabels.add_bulk_labels(list(data_batch))
    if worked:
        data_batch[:] = []
    else:
        asyncio.sleep(1)

TypeError: WikidataDumpReader.__init__() got an unexpected keyword argument 'batch_size'

In [None]:
def save_to_huggingface(item, data_batch, sqlitDBlock):
    global NUM_ITEMS
    global FILE_ID
    global FILE_SIZE
    global PUSH_SIZE

    if (item is not None):
        clean_entity = WikidataLabels.clean_entity(item)
        clean_entity = {
            'id': clean_entity['id'],
            'labels': json.dumps(clean_entity['labels'], separators=(',', ':')),
            'descriptions': json.dumps(clean_entity['descriptions'], separators=(',', ':')),
            'aliases': json.dumps(clean_entity['aliases'], separators=(',', ':')),
            'sitelinks': json.dumps(clean_entity['sitelinks'], separators=(',', ':')),
            'claims': json.dumps(clean_entity['claims'], separators=(',', ':'))
        }
        data_batch.append(clean_entity)

        with sqlitDBlock:
            if len(data_batch) > PUSH_SIZE:

                if NUM_ITEMS == 0:
                    with open(f'chunk_{FILE_ID}.json', 'w+') as f:
                        f.write('[\n')

                with open(f'chunk_{FILE_ID}.json', 'a+') as f:
                    text = json.dumps(list(data_batch[:PUSH_SIZE]), separators=(',', ':')).strip("[] ,")
                    f.write(text)
                    NUM_ITEMS += PUSH_SIZE

                    del data_batch[:PUSH_SIZE]
                    gc.collect()

                    if NUM_ITEMS >= FILE_SIZE:
                        f.write(']')
                    else:
                        f.write(",\n")

                if NUM_ITEMS >= FILE_SIZE:
                    data = Dataset.from_json(f'chunk_{FILE_ID}.json', split=f'chunk_{FILE_ID}')
                    data.push_to_hub("wikidata")

                    os.remove(f'chunk_{FILE_ID}.json')
                    FILE_ID += 1
                    NUM_ITEMS = 0

async def run_processor(wikidata, bulk_ids, sqlitDBlock):
    await wikidata.run(lambda item: save_to_huggingface(item, bulk_ids, sqlitDBlock), max_iterations=None, verbose=True)

multiprocess_manager = Manager()
sqlitDBlock = multiprocess_manager.Lock()
data_batch = multiprocess_manager.list()

wikidata = WikidataDumpReader(FILEPATH, num_processes=NUM_PROCESSES, batch_size=BATCH_SIZE, queue_size=QUEUE_SIZE, skiplines=SKIPLINES)

await run_processor(wikidata, data_batch, sqlitDBlock)

In [None]:
import gzip
import json
import sys
sys.path.append('../src')

from wikidata_dumpreader import WikidataDumpReader
from multiprocessing import Process, Value, Queue
import os

FILEPATH = os.getenv("FILEPATH", '../data/Wikidata/latest-all.json.bz2')
QUEUE_SIZE = int(os.getenv("QUEUE_SIZE", 2000))
NUM_PROCESSES = int(os.getenv("NUM_PROCESSES", 4))
SKIPLINES = 0

def save_to_queue(item, data_batch):
    if (item is not None):

        clean_claims = WikidataLabels._remove_keys(item.get('claims', {}), ['hash', 'snaktype', 'type', 'entity-type', 'numeric-id', 'qualifiers-order', 'snaks-order'])
        clean_claims = WikidataLabels._clean_datavalue(clean_claims)
        clean_claims = WikidataLabels._remove_keys(clean_claims, ['id'])
        # clean_claims = WikidataLabels._add_labels_to_claims(clean_claims)

        sitelinks = WikidataLabels._remove_keys(item.get('sitelinks', {}), ['badges'])

        data_batch.put({
            'id': item['id'],
            'labels': item['labels'],
            'descriptions': item['descriptions'],
            'aliases': item['aliases'],
            'sitelinks': sitelinks,
            'claims': clean_claims
        })

def writer_loop(data_batch, finished):
    file_handle = None
    NUM_ITEMS = 0
    FILE_ID = 0
    FILE_SIZE = 1_000_000

    while True:
        if finished.value == 1 and data_batch.empty():
            if file_handle is not None:
                file_handle.write('\n]')
                file_handle.close()
            break

        try:
            next_item = data_batch.get(timeout=1)
        except:
            continue

        if next_item:

            if file_handle is None:
                file_handle = gzip.open(f'chunk_{FILE_ID}.json.gz', mode='wt')
                file_handle.write('[\n')
                NUM_ITEMS = 0

            json_data = json.dumps(next_item, separators=(',', ':'))
            file_handle.write(json_data)
            NUM_ITEMS += 1

            if NUM_ITEMS >= FILE_SIZE:
                file_handle.write('\n]')
                file_handle.close()
                file_handle = None
                FILE_ID += 1
                NUM_ITEMS = 0
            else:
                file_handle.write(",\n")

data_batch = Queue(maxsize=QUEUE_SIZE)
finished = Value('i', 0)
with finished.get_lock():
    finished.value = 0

wikidata = WikidataDumpReader(FILEPATH, num_processes=NUM_PROCESSES, queue_size=QUEUE_SIZE, skiplines=SKIPLINES)

writer_process = Process(
    target=writer_loop,
    args=(data_batch, finished)
)
writer_process.start()

wikidata.run(lambda item: save_to_queue(item, data_batch), max_iterations=None, verbose=True)

with finished.get_lock():
    finished.value = 1

writer_process.join()

Skipping lines: 0it [00:00, ?it/s]




Items Processed: 2797 | Processing Rate: 932 items/sec | Memory Usage: 139.02 MB
Items Processed: 4748 | Processing Rate: 791 items/sec | Memory Usage: 141.83 MB
Items Processed: 6900 | Processing Rate: 766 items/sec | Memory Usage: 141.83 MB
Items Processed: 8933 | Processing Rate: 744 items/sec | Memory Usage: 141.83 MB
Items Processed: 11352 | Processing Rate: 756 items/sec | Memory Usage: 141.83 MB
Items Processed: 14479 | Processing Rate: 804 items/sec | Memory Usage: 141.83 MB
Items Processed: 17517 | Processing Rate: 834 items/sec | Memory Usage: 141.83 MB
Items Processed: 19807 | Processing Rate: 825 items/sec | Memory Usage: 141.83 MB
Items Processed: 21979 | Processing Rate: 814 items/sec | Memory Usage: 141.83 MB
Items Processed: 24263 | Processing Rate: 808 items/sec | Memory Usage: 141.83 MB
Items Processed: 26417 | Processing Rate: 800 items/sec | Memory Usage: 141.83 MB
Items Processed: 28877 | Processing Rate: 802 items/sec | Memory Usage: 141.83 MB
Items Processed: 315

In [None]:
import gzip
import json
import sys
sys.path.append('../src')

from wikidata_dumpreader import WikidataDumpReader
from wikidataLabelsDB import WikidataLabels
from multiprocessing import Process, Value, Queue
import os

QUEUE_SIZE = int(os.getenv("QUEUE_SIZE", 5000))
NUM_PROCESSES = int(os.getenv("NUM_PROCESSES", 2))
SKIPLINES = 0

def save_to_queue(item, data_batch):
    if (item is not None):
        claims = WikidataLabels.add_labels_batched(item['claims'], query_batch=100)
        data_batch.put({
            'id': item['id'],
            'labels': item['labels'],
            'descriptions': item['descriptions'],
            'aliases': item['aliases'],
            'sitelinks': item['sitelinks'],
            'claims': claims
        })

def writer_loop(data_batch, finished, filename):
    file_handle = gzip.open(filename, mode='wt')
    file_handle.write('[\n')

    while True:
        if finished.value == 1 and data_batch.empty():
            if file_handle is not None:
                file_handle.write('\n]')
                file_handle.close()
            break

        try:
            next_item = data_batch.get(timeout=1)
        except:
            continue

        if next_item:
            json_data = json.dumps(next_item, separators=(',', ':'))
            file_handle.write(json_data)
            file_handle.write(",\n")

for i in range(112):
    FILEPATH = f"../data/Wikidata/latest-all-chunks/chunk_{i}.json.gz"
    filename = f"../data/Wikidata/latest-all-chunks/clean_chunk_{i}.json.gz"

    data_batch = Queue(maxsize=QUEUE_SIZE)
    finished = Value('i', 0)
    with finished.get_lock():
        finished.value = 0

    wikidata = WikidataDumpReader(FILEPATH, num_processes=NUM_PROCESSES, queue_size=QUEUE_SIZE, skiplines=SKIPLINES)

    writer_process = Process(
        target=writer_loop,
        args=(data_batch, finished, filename)
    )
    writer_process.start()

    wikidata.run(lambda item: save_to_queue(item, data_batch), max_iterations=None, verbose=True)

    with finished.get_lock():
        finished.value = 1

    writer_process.join()

    # data = Dataset.from_json(filename, split=f'chunk_{i}')
    # data.push_to_hub("wikidata")
    # os.remove(filename)

Skipping lines: 0it [00:00, ?it/s]


Items Processed: 325 | Processing Rate: 108 items/sec
Items Processed: 1084 | Processing Rate: 181 items/sec
Items Processed: 1730 | Processing Rate: 192 items/sec
Items Processed: 2563 | Processing Rate: 213 items/sec
Items Processed: 3405 | Processing Rate: 227 items/sec
Items Processed: 4531 | Processing Rate: 252 items/sec
Items Processed: 5025 | Processing Rate: 239 items/sec
Items Processed: 5029 | Processing Rate: 209 items/sec
Items Processed: 5032 | Processing Rate: 186 items/sec
Items Processed: 5032 | Processing Rate: 168 items/sec
Items Processed: 5032 | Processing Rate: 152 items/sec
Items Processed: 5032 | Processing Rate: 140 items/sec
Items Processed: 5032 | Processing Rate: 129 items/sec
Items Processed: 5037 | Processing Rate: 120 items/sec
Items Processed: 5042 | Processing Rate: 112 items/sec
Items Processed: 5045 | Processing Rate: 105 items/sec
Items Processed: 5050 | Processing Rate: 99 items/sec
Items Processed: 5058 | Processing Rate: 94 items/sec
Items Process

KeyboardInterrupt: 

Items Processed: 5579 | Processing Rate: 29 items/sec
Items Processed: 5593 | Processing Rate: 28 items/sec
Items Processed: 5613 | Processing Rate: 28 items/sec
Items Processed: 5627 | Processing Rate: 28 items/sec
Items Processed: 5652 | Processing Rate: 27 items/sec
Items Processed: 5662 | Processing Rate: 27 items/sec
Items Processed: 5676 | Processing Rate: 27 items/sec
Items Processed: 5695 | Processing Rate: 26 items/sec
Items Processed: 5714 | Processing Rate: 26 items/sec
Items Processed: 5725 | Processing Rate: 26 items/sec
Items Processed: 5743 | Processing Rate: 26 items/sec
Items Processed: 5759 | Processing Rate: 25 items/sec
Items Processed: 5766 | Processing Rate: 25 items/sec
Items Processed: 5780 | Processing Rate: 25 items/sec
Items Processed: 5800 | Processing Rate: 24 items/sec
Items Processed: 5816 | Processing Rate: 24 items/sec
Items Processed: 5832 | Processing Rate: 24 items/sec
Items Processed: 5847 | Processing Rate: 24 items/sec
Items Processed: 5864 | Proc

In [None]:
import os
import json
import sys
from multiprocessing import Process, Value, Queue
sys.path.append('../src')

from wikidata_dumpreader import WikidataDumpReader
from wikidataLabelsDB import WikidataLabels

from datasets import Dataset

# Constants
QUEUE_SIZE = int(os.getenv("QUEUE_SIZE", 5000))
NUM_PROCESSES = int(os.getenv("NUM_PROCESSES", 2))
SKIPLINES = 0

def save_to_queue(item, data_queue):
    """Processes and puts cleaned item into the multiprocessing queue."""
    if item is not None:
        claims = WikidataLabels.add_labels_batched(item['claims'], query_batch=100)
        data_queue.put({
            'id': item['id'],
            'labels': json.dumps(item['labels'], separators=(',', ':')),
            'descriptions': json.dumps(item['descriptions'], separators=(',', ':')),
            'aliases': json.dumps(item['aliases'], separators=(',', ':')),
            'sitelinks': json.dumps(item['sitelinks'], separators=(',', ':')),
            'claims': json.dumps(claims, separators=(',', ':'))
        })

def chunk_generator(filepath, num_processes=2, queue_size=5000, skip_lines=0):
    """
    A generator function that reads a chunk file with WikidataDumpReader,
    processes each item, and yields the result. It uses a multiprocessing
    queue to handle data ingestion in parallel without storing everything
    in memory.
    """
    data_queue = Queue(maxsize=queue_size)
    finished = Value('i', 0)

    # Initialize the dump reader
    wikidata = WikidataDumpReader(
        filepath,
        num_processes=num_processes,
        queue_size=queue_size,
        skiplines=skip_lines
    )

    # Define a function to feed items into the queue
    def run_reader():
        wikidata.run(lambda item: save_to_queue(item, data_queue),
                     max_iterations=None, verbose=True)
        with finished.get_lock():
            finished.value = 1

    # Start reader in a separate process
    reader_proc = Process(target=run_reader)
    reader_proc.start()

    # Continuously yield items from the queue to the Dataset generator
    while True:
        # If reader is done AND queue is empty => stop
        if finished.value == 1 and data_queue.empty():
            break
        try:
            item = data_queue.get(timeout=1)
        except:
            continue
        if item:
            yield item

    # Wait for the reader process to exit
    reader_proc.join()

# Now process each chunk file and push to the same Hugging Face repo
HF_REPO_ID = "wikidata"  # Change to your actual repo on Hugging Face

for i in range(112):
    filepath = f"../data/Wikidata/latest-all-chunks/chunk_{i}.json.gz"
    split_name = f"chunk_{i}"

    print(f"Processing {filepath} -> split={split_name}")

    # Create a Dataset from the generator
    ds_chunk = Dataset.from_generator(lambda: chunk_generator(
        filepath,
        num_processes=NUM_PROCESSES,
        queue_size=QUEUE_SIZE,
        skip_lines=SKIPLINES
    ),
    # Optionally specify features= if you know the schema
    # features=Features({...})
    )

    # Push each chunk as a separate "split" under the same dataset repo
    ds_chunk.push_to_hub(HF_REPO_ID, split=split_name)

    print(f"Chunk {i} pushed to {HF_REPO_ID} as {split_name}.")


Processing ../data/Wikidata/latest-all-chunks/chunk_0.json.gz -> split=chunk_0


Generating train split: 0 examples [00:00, ? examples/s]

Skipping lines: 0it [00:00, ?it/s]


Items Processed: 358 | Processing Rate: 119 items/sec
Items Processed: 1133 | Processing Rate: 189 items/sec


DatasetGenerationError: An error occurred while generating the dataset

Items Processed: 1963 | Processing Rate: 218 items/sec
Items Processed: 3087 | Processing Rate: 257 items/sec
Items Processed: 4382 | Processing Rate: 292 items/sec
Items Processed: 5284 | Processing Rate: 293 items/sec
