In [1]:
import os
import json
import pymongo
import pandas as pd
import dask.dataframe as dd
from pymongo import InsertOne, DeleteMany
from pymongo.errors import BulkWriteError
from pymongo.collection import Collection

In [2]:
mongo_port = 27017
mongo_host = "192.168.1.14"
database_name = "enwiki"
collection_name = "revisions"

client = pymongo.MongoClient(host=mongo_host, port=mongo_port)
db = client.get_database(name=database_name)
revisions_collection = db.get_collection(name=collection_name)

In [3]:
def df_to_mongo(df: pd.DataFrame, collection: Collection, pk=None):
    df_size = len(df)
    total_size = 0
    batch_size = 50000
    requests = []
    for index, row in df.iterrows():
        total_size += 1
        requests.append(InsertOne({c if pk!=c else '_id': row[c] for c in df.columns}))
        
        if total_size > 0 and (total_size % batch_size == 0):
            print(f"{total_size}/{df_size}")
            write_bulk(requests, collection)
            requests = []
    
    if len(requests) > 0:
        write_bulk(requests, collection)
        print(f"{total_size}/{df_size}")

def write_bulk(requests, collection):
    try:
        result = collection.bulk_write(requests)
#         print('Total: {}\tTotal inserted: {}'.format(len(requests), result.inserted_count))
    except BulkWriteError as bwe:
        print(bwe.details)
    except Exception as err:
        print(err)

In [4]:
completed_files = []
with open(r'../../../data/enwiki/latest/mongo_rev_files.txt', mode='r') as mrf:
    line = mrf.readline()
    completed_files = line.split(';')

In [6]:
# chunksize = 50000
# tmp_path = r'../../../data/enwiki/latest/enwiki-latest-stub-meta-history10.h5'
# for chunk in pd.read_hdf(path_or_buf=tmp_path, key='enwiki', chunksize=chunksize):
#     print(len(chunk))

In [5]:
files = !ls ../../../data/enwiki/latest
for file in files:
    file = os.path.join('../../../data/enwiki/latest', file)
    
    if file in completed_files or file == r'../../../data/enwiki/latest/mongo_rev_files.txt':
        continue
    
    print(f'Processing {file}')
    
    df = pd.read_hdf(file, key='enwiki')
    df_to_mongo(df, revisions_collection, 'rev_id')
    
    del df
    
    with open(r'../../../data/enwiki/latest/mongo_rev_files.txt', mode='a') as mrf:
        mrf.write(file + ';')

In [6]:
!cat ../../../data/enwiki/latest/mo

../../../data/enwiki/latest/enwiki-latest-stub-meta-history9.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history10.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history11.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history12.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history13.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history14.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history15.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history16.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history17.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history18.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history19.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history1.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history20.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history21.h5;../../../data/enwiki/latest/enwiki-latest-stub-meta-history22.h5;../../../data/enwiki/latest