In [1]:
import os
os.chdir("..")
os.getcwd()

'/root/code/Protenix'

In [2]:
INPUT_DIR = "/vepfs/fs_projects/yangsw/af3-dev/release_data/mmcif_msa"
OUTPUT_LMDB_PATH = "/vepfs/fs_projects/yangsw/af3-dev/release_data/lmdb/mmcif_msa.lmdb"
N_WORKER = 32
N_COMMIT = 1000
ENCODING = "utf-8"

In [3]:
import lmdb

env_new = lmdb.open(
    OUTPUT_LMDB_PATH,
    subdir=False,
    readonly=False,
    lock=False,
    readahead=False,
    meminit=False,
    max_readers=126,
    map_size=200 * 1024 * 1024 * 1024,
)

In [4]:
import gzip
import tqdm
from typing import Tuple, List
from multiprocessing import Pool

todo_dir_names, finished_dir_names = [], []
with env_new.begin() as txn:
    with txn.cursor() as cursor:
        def check_processed(_dir_name: str) -> Tuple[str, bool]:
            _dir_path = os.path.join(INPUT_DIR, _dir_name)
            for _file_name in os.listdir(_dir_path):
                _file_key = f"{_dir_name}-{_file_name}"
                if not cursor.set_key(bytes(_file_key, ENCODING)):
                    return _dir_name, False
            return _dir_name, True

        dir_names = os.listdir(INPUT_DIR)
        with Pool(N_WORKER) as pool:
            for ret in tqdm.tqdm(pool.imap_unordered(check_processed, dir_names), total=len(dir_names), ncols=60):
                cif_name, is_processed = ret
                if is_processed:
                    finished_dir_names.append(cif_name)
                else:
                    todo_dir_names.append(cif_name)
print(f"\tTODO {len(todo_dir_names)}; FINISHED {len(finished_dir_names)}")

100%|████████████| 157202/157202 [00:06<00:00, 24339.25it/s]

	TODO 0; FINISHED 157202





In [5]:
def process_one(_dir_name: str) -> Tuple[List[Tuple[bytes, bytes]], bool]:
    try:
        _dir_path = os.path.join(INPUT_DIR, _dir_name)
        _tuples = []
        for _file_name in os.listdir(_dir_path):
            _file_path = os.path.join(_dir_path, _file_name)
            _file_key = f"{_dir_name}-{_file_name}"
            with open(_file_path) as _fp:
                a3m_lines = _fp.read()
                _tuples.append((bytes(_file_key, ENCODING), gzip.compress(a3m_lines.encode(ENCODING))))
        return _tuples, True
    except Exception as e:
        print(f"{_dir_name} encounters {e.__class__.__name__}({e})\n")
        return [], False

In [6]:
txn_write = env_new.begin(write=True)
i = 0
with Pool(N_WORKER) as pool:
    for ret in tqdm.tqdm(
            pool.imap_unordered(process_one, todo_dir_names),
            total=len(todo_dir_names), ncols=60,
    ):
        tuples, succeed = ret
        if succeed:
            for key_bytes, value_bytes in tuples:
                txn_write.put(key_bytes, value_bytes)
        i += 1
        if i % N_COMMIT == 0:
            txn_write.commit()
            txn_write = env_new.begin(write=True)

txn_write.commit()
env_new.close()

0it [00:00, ?it/s]


In [7]:
env_new = lmdb.open(
    OUTPUT_LMDB_PATH,
    subdir=False,
    readonly=True,
    lock=False,
    readahead=False,
    meminit=False,
    max_readers=126,
    map_size=100 * 1024 * 1024 * 1024,
)

key = None
with env_new.begin() as txn:
    cursor = txn.cursor()
    for key, value in cursor:
        key = key.decode("utf-8")
        value = gzip.decompress(value).decode("utf-8")
        print(key)
        print(len(value))
        print(value[:1000])
        break

0-mmseqs_other_hits.a3m
11
>query
AAA



In [8]:
assert key is not None
dir_name, file_name = key.split("-")
with open(os.path.join(INPUT_DIR, dir_name, file_name)) as fp:
    lines = fp.read()
    print(len(lines))
    print(lines[:1000])

11
>query
AAA



In [9]:
# for msa_dir_name in os.listdir(INPUT_DIR):
#     msa_dir_path = os.path.join(INPUT_DIR, msa_dir_name)
#     if not os.path.isdir(msa_dir_path):
#         print(msa_dir_path)
#     for file_name in os.listdir(msa_dir_path):
#         file_path = os.path.join(msa_dir_path, file_name)
#         if file_name != "mmseqs_other_hits.a3m" and file_name != "uniref100_hits.a3m":
#             print(file_path)