In [None]:
# default_exp process_bson

# Process BSON

> Script to process BSON data into JPGs. Ideas from [here](https://www.kaggle.com/inversion/processing-bson-files/notebook).

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#export
import pandas as pd
from fastcore.all import *
import io
import bson
from PIL import Image
import multiprocessing as mp

In [None]:
#export
NCORE = mp.cpu_count()

In [None]:
#export
def get_process_func(product2category, save_dir, is_test):
    def process_product(q, iolock):
            """Saves images in product and returns id and category."""
            while True:
                product = q.get()
                if product is None: break
                if not is_test: product2category[product["_id"]] = product["category_id"]
                for i, img in enumerate(product["imgs"]):
                    picture = Image.open(io.BytesIO(img["picture"]))
                    picture.save(save_dir/f"{product['_id']}_{i}.jpg")
    return process_product

In [None]:
#export
@call_parse
def bson_to_jpeg(
    path: Param("Path to BSON", Path),
):
    """Coverts BSON to JPGs and saves product id to category mapping as CSV."""
    path = Path(path)
    save_dir = path.parent/"images"
    save_dir.mkdir(exist_ok=True)
    csv_save_path = path.parent/f"{path.stem}.csv"
    is_test = path.stem == "test"
    print(f"Converting {path} to JPGs in {save_dir}. Mapping saved in {csv_save_path}")
    
    with mp.Manager() as manager:
        product2category = manager.dict()
        q = mp.Queue()
        iolock = mp.Lock()
        pool = mp.Pool(NCORE, initializer=get_process_func(product2category, save_dir, is_test), initargs=(q, iolock))
        for product in bson.decode_file_iter(path.open("rb")): q.put(product)
        for _ in range(NCORE):                                 q.put(None)
        pool.close()
        pool.join()
        product2category = dict(product2category)

    columns = ["_id"]
    if not is_test: columns.append("category_id")
    df = pd.DataFrame.from_dict(product2category, orient="index")
    df.index.name = "_id"
    if not is_test: df.rename(columns={0: 'category_id'}, inplace=True)
    df.to_csv(csv_save_path)
    print("Completed successfully.")
    return df

In [None]:
%time bson_to_jpeg("./data/train_example.bson")

Converting data/train_example.bson to JPGs in data/images. Mapping saved in data/train_example.csv
CPU times: user 38.8 ms, sys: 110 ms, total: 149 ms
Wall time: 187 ms


Unnamed: 0_level_0,category_id
_id,Unnamed: 1_level_1
0,1000010653
1,1000010653
2,1000004079
3,1000004141
4,1000015539
...,...
84,1000010641
101,1000004085
98,1000010667
81,1000010683


In [None]:
#hide
from nbdev.export import notebook2script; notebook2script()

Converted 00_core.ipynb.
Converted index.ipynb.
