In [None]:
import pyarrow as pa
import pyarrow.dataset as ds
import os
import math

BYTES_PER_TYPE = {
    "int8": 1, "int16": 2, "int32": 4, "int64": 8,
    "uint8": 1, "uint16": 2, "uint32": 4, "uint64": 8,
    "float": 4, "float16": 2, "float32": 4, "float64": 8,
    "bool": 1,
}

STRING_DEFAULT_BYTES = 16

def patch_schema(schema):
    fields = []
    for field in schema:
        if pa.types.is_decimal(field.type):
            fields.append(pa.field(field.name, pa.string()))
        else:
            fields.append(field)
    return pa.schema(fields)


def estimate_array_size(arr, col_type):
    n = len(arr)

    if pa.types.is_integer(col_type) or pa.types.is_floating(col_type):
        dtype = col_type.to_pandas_dtype()
        return n * BYTES_PER_TYPE.get(dtype, 8)

    if pa.types.is_boolean(col_type):
        return n * 1

    if pa.types.is_decimal(col_type):
        return n * 16

    if pa.types.is_string(col_type):
        # Sample 5000
        sample = arr.slice(0, min(5000, n)).to_pylist()
        total_len = sum(len(x) for x in sample if x is not None)
        non_null = sum(1 for x in sample if x is not None)
        avg_len = (total_len / non_null) if non_null > 0 else STRING_DEFAULT_BYTES
        return int(n * avg_len)

    return n * 16


# INPUT_DIR = "D:\\zman\\graph\\data"
results = []
files = [
    'edge_pinj_credit',
    'edge_pinj_debit',
    'edge_simp_credit',
    'edge_simp_debit',
]

# for file in os.listdir(INPUT_DIR)[:5]:
for file in files:

    if file.endswith(".csv"):
        continue

    path = os.path.join("/Users/ymnzaman/Documents/Project/Graph/data", file)
    # path = os.path.join(INPUT_DIR, file)


    base = ds.dataset(path, format="parquet", partitioning="hive")
    patched_schema = patch_schema(base.schema)
    dataset = ds.dataset(path, format="parquet", partitioning="hive", schema=patched_schema)

    print(f"\n=== Estimasi LMDB untuk: {file} ===")

    num_rows = dataset.count_rows()
    print("Rows:", num_rows)

    total_bytes = 0

    for col in dataset.schema.names:

        col_type = dataset.schema.field(col).type
        col_bytes = 0

        # Scanner hanya untuk 1 kolom
        scanner = dataset.scanner(columns=[col], batch_size=75_000)

        for batch in scanner.to_batches():
            arr = batch.column(0)
            col_bytes += estimate_array_size(arr, col_type)

        total_bytes += col_bytes

        print(f"{col:30} | {str(col_type):25} | {col_bytes/1024/1024:10.2f} MB")

    lmdb_bytes = int(total_bytes * 1.2)
    gb = math.ceil(lmdb_bytes / (1024**3))

    print(f"\nTOTAL ESTIMATED SIZE: {total_bytes/1024/1024/1024:.2f} GB")
    print(f"RECOMMENDED LMDB MAP_SIZE: {gb} GB\n")

    results.append((file, gb))

print("\n===================== RINGKASAN MAP_SIZE =====================")
for file, gb in results:
    print(f"{file:40} : {gb} GB")
print("==============================================================")



In [None]:
import os
import lmdb
import pyarrow as pa
import pyarrow.dataset as ds


DEBUG = False


# =======================================================
# MAP SIZE
# =======================================================
MAP_SIZE = {
    # "edge_nasabah_is_pekerja": 1024 * 1024 * 1024 * 1,
    # "edge_nasabah_memiliki_pinj": 1024 * 1024 * 1024 * 1,
    # "edge_nasabah_memiliki_simp": 1024 * 1024 * 1024 * 15,
    # "edge_rek_credit": 1024 * 1024 * 1024 * 2,
    # "edge_rek_debit": 1024 * 1024 * 1024 * 2,
    'edge_pinj_credit': 1024 * 1024 * 1024 *1,
    'edge_pinj_debit': 1024 * 1024 * 1024 *1,
    'edge_simp_credit': 1024 * 1024 * 1024 *1,
    'edge_simp_debit': 1024 * 1024 * 1024 *1,
}


# =======================================================
# PATCH SCHEMA DECIMAL → STRING
# =======================================================
def patch_schema(schema):
    fields = []
    for field in schema:
        if pa.types.is_decimal(field.type):
            fields.append(pa.field(field.name, pa.string()))
        else:
            fields.append(field)
    return pa.schema(fields)


# =======================================================
# NORMALIZE KEY
# =======================================================
def normalize_key(v):
    if v is None:
        return None
    v = str(v).strip()

    if v.endswith(".0"):
        v = v[:-2]

    if v.startswith("0") and len(v) > 1:
        v = v.lstrip("0")

    return v


# =======================================================
# LMDB LOOKUP
# =======================================================
def lmdb_lookup(env, key):
    key = normalize_key(key)
    if key is None:
        return None
    with env.begin() as txn:
        v = txn.get(key.encode())
        return None if v is None else int(v.decode())


def multi_lmdb_lookup(env_list, key):
    key = normalize_key(key)
    if key is None:
        return None
    key_b = key.encode()

    for env in env_list:
        with env.begin() as txn:
            v = txn.get(key_b)
            if v is not None:
                return int(v.decode())
    return None


# =======================================================
# MAIN INDEX FUNCTION
# =======================================================
def index_edges_dataset(edge_type, edge_folder, map_src, map_dst, out_lmdb, map_size):

    # Source LMDB (bisa list)
    if isinstance(map_src, list):
        src_envs = [lmdb.open(x, readonly=True, lock=False) for x in map_src]
        lookup_src = lambda k: multi_lmdb_lookup(src_envs, k)
    else:
        env_src = lmdb.open(map_src, readonly=True, lock=False)
        lookup_src = lambda k: lmdb_lookup(env_src, k)

    # Target LMDB (bisa list)
    if isinstance(map_dst, list):
        dst_envs = [lmdb.open(x, readonly=True, lock=False) for x in map_dst]
        lookup_dst = lambda k: multi_lmdb_lookup(dst_envs, k)
    else:
        env_dst = lmdb.open(map_dst, readonly=True, lock=False)
        lookup_dst = lambda k: lmdb_lookup(env_dst, k)

    # Output LMDB
    env_out = lmdb.open(out_lmdb, map_size=map_size)

    # Dataset
    base = ds.dataset(edge_folder, format="parquet", partitioning="hive")
    patched_schema = patch_schema(base.schema)
    dataset = ds.dataset(edge_folder, format="parquet", partitioning="hive", schema=patched_schema)

    scanner = dataset.scanner(columns=["src", "dst"])

    print(f"\n=== Start indexing {edge_type} ===")
    edge_id = 0

    with env_out.begin(write=True) as txn:
        for batch in scanner.to_batches():
            d = batch.to_pydict()

            for s, t in zip(d["src"], d["dst"]):

                sid = lookup_src(s)
                tid = lookup_dst(t)

                if sid is None or tid is None:
                    if DEBUG:
                        print(f"[MISS] src={s} sid={sid} | dst={t} tid={tid}")
                    continue

                txn.put(str(edge_id).encode(), f"{sid},{tid}".encode())
                edge_id += 1

            print(f"{edge_type}: {edge_id:,} edges...", end="\r")

    print(f"\n✓ Done {edge_type}: {edge_id:,} edges.")
    print(f"Saved → {out_lmdb}")


# =======================================================
# CONFIG BENAR (src/dst bisa list atau string)
# =======================================================
edges_config = {
    # "edge_rek_credit": {
    #     "src": "transaksi.lmdb",
    #     "dst": ["simpanan.lmdb", "pinjaman.lmdb"]
    # },
    # "edge_rek_debit": {
    #     "src": ["simpanan.lmdb", "pinjaman.lmdb"],
    #     "dst": "transaksi.lmdb"
    # }
    "edge_pinj_credit": {
        "src": "transaksi.lmdb",
        "dst": "pinjaman.lmdb"
    },
    "edge_pinj_debit": {
        "src": "pinjaman.lmdb",
        "dst": "transaksi.lmdb"
    },
    "edge_simp_credit": {
        "src": "transaksi.lmdb",
        "dst": "simpanan.lmdb"
    },
    "edge_simp_debit": {
        "src": "simpanan.lmdb",
        "dst": "transaksi.lmdb"
    }
}


root_edges = "/Users/ymnzaman/Documents/Project/Graph/data"
root_maps  = "/Users/ymnzaman/Documents/Project/Graph/lmdb_node_mapping"
out_root   = "/Users/ymnzaman/Documents/Project/Graph/lmdb_edge_indexing"
os.makedirs(out_root, exist_ok=True)


# =======================================================
# MAIN LOOP (FULLY FIXED)
# =======================================================
for edge_type, cfg in edges_config.items():

    folder = os.path.join(root_edges, edge_type)

    # src bisa list
    if isinstance(cfg["src"], list):
        src_map = [os.path.join(root_maps, x) for x in cfg["src"]]
    else:
        src_map = os.path.join(root_maps, cfg["src"])

    # dst bisa list
    if isinstance(cfg["dst"], list):
        dst_map = [os.path.join(root_maps, x) for x in cfg["dst"]]
    else:
        dst_map = os.path.join(root_maps, cfg["dst"])

    out_lmdb_path = os.path.join(out_root, f"{edge_type}.lmdb")

    index_edges_dataset(
        edge_type=edge_type,
        edge_folder=folder,
        map_src=src_map,
        map_dst=dst_map,
        out_lmdb=out_lmdb_path,
        map_size=MAP_SIZE[edge_type]
    )





In [None]:
import os
import lmdb
import pyarrow.dataset as ds
import pyarrow as pa

# ---------- config ----------
ROOT_EDGES = r"D:\\zman\\graph\\data"
ROOT_MAPS  = r"D:\\zman\\graph\\notebook\\lmdb_node_mapping"
OUT_ROOT   = r"D:\\zman\\graph\\notebook\\lmdb_edge_indexing"
os.makedirs(OUT_ROOT, exist_ok=True)

MAP_SIZE = 1024 * 1024 * 1024 * 2
DEBUG = True   # set False to silence per-row MISS prints
SAMPLE_MISS_LIMIT = 10

# ---------- helpers ----------
def patch_schema(schema):
    fields = []
    for field in schema:
        if pa.types.is_decimal(field.type):
            fields.append(pa.field(field.name, pa.string()))
        else:
            fields.append(field)
    return pa.schema(fields)

def normalize_key(v):
    if v is None:
        return None
    s = str(v).strip()
    if s.endswith(".0"):
        s = s[:-2]
    # don't aggressively strip leading zeros here unless you know LMDB keys don't have them
    return s

def lmdb_open(path):
    return lmdb.open(path, readonly=True, lock=False)

def lmdb_get_single(env, key):
    if key is None:
        return None
    k = normalize_key(key).encode()
    with env.begin() as txn:
        v = txn.get(k)
        return None if v is None else v.decode()

def lmdb_get_multi(envs, key):
    if key is None:
        return None
    k = normalize_key(key).encode()
    for env in envs:
        with env.begin() as txn:
            v = txn.get(k)
            if v is not None:
                return v.decode()
    return None

# ---------- indexer generic ----------
def index_edge(
    edge_type,
    edge_folder,
    src_map,    # either path or list of paths
    dst_map,    # either path or list of paths
    out_lmdb_path,
    map_size=MAP_SIZE,
    src_col_name="src",
    dst_col_name="dst"
):
    print(f"\n=== INDEX {edge_type} ===")
    # open maps (single or list)
    if isinstance(src_map, list):
        src_envs = [lmdb_open(p) for p in src_map]
        lookup_src = lambda k: lmdb_get_multi(src_envs, k)
    else:
        env_src = lmdb_open(src_map)
        lookup_src = lambda k: lmdb_get_single(env_src, k)

    if isinstance(dst_map, list):
        dst_envs = [lmdb_open(p) for p in dst_map]
        lookup_dst = lambda k: lmdb_get_multi(dst_envs, k)
    else:
        env_dst = lmdb_open(dst_map)
        lookup_dst = lambda k: lmdb_get_single(env_dst, k)

    # prepare dataset
    base = ds.dataset(edge_folder, format="parquet", partitioning="hive")
    patched_schema = patch_schema(base.schema)
    dataset = ds.dataset(edge_folder, format="parquet", partitioning="hive", schema=patched_schema)
    scanner = dataset.scanner(columns=[src_col_name, dst_col_name])

    # stats
    total = 0
    matched = 0
    miss_src = 0
    miss_dst = 0
    miss_both = 0
    miss_samples = []

    # open output LMDB writer
    env_out = lmdb.open(out_lmdb_path, map_size=map_size)
    edge_id = 0

    with env_out.begin(write=True) as txn:
        for batch in scanner.to_batches():
            cols = batch.to_pydict()
            srcs = cols.get(src_col_name, [])
            dsts = cols.get(dst_col_name, [])

            for s_raw, d_raw in zip(srcs, dsts):
                total += 1
                s_key = normalize_key(s_raw)
                d_key = normalize_key(d_raw)

                sid = lookup_src(s_key)
                did = lookup_dst(d_key)

                # sid / did are strings read from LMDB; treat None as missing
                if sid is None and did is None:
                    miss_both += 1
                    if len(miss_samples) < SAMPLE_MISS_LIMIT:
                        miss_samples.append(("both", s_raw, d_raw, s_key, d_key))
                    if DEBUG:
                        print(f"[MISS BOTH] src={s_raw} -> {s_key} | dst={d_raw} -> {d_key}")
                    continue
                if sid is None:
                    miss_src += 1
                    if len(miss_samples) < SAMPLE_MISS_LIMIT:
                        miss_samples.append(("src", s_raw, d_raw, s_key, d_key))
                    if DEBUG:
                        print(f"[MISS SRC] src={s_raw} -> {s_key} | dst={d_raw} -> {d_key} (did ok)")
                    continue
                if did is None:
                    miss_dst += 1
                    if len(miss_samples) < SAMPLE_MISS_LIMIT:
                        miss_samples.append(("dst", s_raw, d_raw, s_key, d_key))
                    if DEBUG:
                        print(f"[MISS DST] src={s_raw} -> {s_key} (sid ok) | dst={d_raw} -> {d_key}")
                    continue

                # write edge as "sid,did" (we keep them as strings; you can convert to int if needed)
                txn.put(str(edge_id).encode(), f"{sid},{did}".encode())
                edge_id += 1
                matched += 1

            # progress
            if total % 100000 == 0:
                print(f"{edge_type}: processed {total:,} rows, matched {matched:,}")

    print("\n--- summary ---")
    print(f"total rows scanned   : {total:,}")
    print(f"matched edges written : {matched:,}")
    print(f"miss src only         : {miss_src:,}")
    print(f"miss dst only         : {miss_dst:,}")
    print(f"miss both             : {miss_both:,}")
    print(f"out LMDB path         : {out_lmdb_path}")
    if miss_samples:
        print("\nEXAMPLE MISS SAMPLES (type, raw_src, raw_dst, norm_src, norm_dst):")
        for ex in miss_samples:
            print(" ", ex)
    print("=== done ===\n")


# ---------- run for credit and debit with correct src/dst ----------
# credit: src = transaksi, dst = simpanan|pinjaman
index_edge(
    edge_type="edge_rek_credit",
    edge_folder=os.path.join(ROOT_EDGES, "edge_rek_credit"),
    src_map=os.path.join(ROOT_MAPS, "transaksi.lmdb"),
    dst_map=[os.path.join(ROOT_MAPS, "simpanan.lmdb"), os.path.join(ROOT_MAPS, "pinjaman.lmdb")],
    out_lmdb_path=os.path.join(OUT_ROOT, "edge_rek_credit.lmdb"),
    src_col_name="src",
    dst_col_name="dst"
)

# debit: src = simpanan|pinjaman, dst = transaksi
index_edge(
    edge_type="edge_rek_debit",
    edge_folder=os.path.join(ROOT_EDGES, "edge_rek_debit"),
    src_map=[os.path.join(ROOT_MAPS, "simpanan.lmdb"), os.path.join(ROOT_MAPS, "pinjaman.lmdb")],
    dst_map=os.path.join(ROOT_MAPS, "transaksi.lmdb"),
    out_lmdb_path=os.path.join(OUT_ROOT, "edge_rek_debit.lmdb"),
    src_col_name="src",
    dst_col_name="dst"
)





In [None]:
import lmdb

def validate_edge_lmdb(lmdb_path, sample_size=5):
    print("="*50)
    print(f"LMDB Path             : {lmdb_path}")
    print("-"*50)

    env = lmdb.open(lmdb_path, readonly=True, lock=False)

    total_keys = 0
    none_values = 0
    bad_format = 0
    samples = []

    with env.begin() as txn:
        cursor = txn.cursor()

        for k, v in cursor:
            total_keys += 1
            val = v.decode()

            if val.strip() == "" or val is None:
                none_values += 1
                continue

            # Format harus "sid,did"
            if "," not in val:
                bad_format += 1
                continue

            if len(samples) < sample_size:
                samples.append((k.decode(), val))

    print(f"Total Unique Keys     : {total_keys:,}")
    print(f"Nilai 'None' / Empty  : {none_values:,}")
    print(f"Bad Format (no comma) : {bad_format:,}")
    print("-"*50)
    print("Contoh Key-Value:")
    for item in samples:
        print("  ", item)

    print("="*50)


paths = [
    "edge_rek_credit.lmdb",
    "edge_rek_debit.lmdb",
    # "edge_nasabah_is_pekerja.lmdb",
    # "edge_nasabah_memiliki_simp.lmdb",
    # "edge_nasabah_memiliki_pinj.lmdb"
    'edge_pinj_credit.lmdb',
    'edge_pinj_debit.lmdb',
    'edge_simp_credit.lmdb',
    'edge_simp_debit.lmdb',
]

for p in paths:
    validate_edge_lmdb(f"/Users/ymnzaman/Documents/Project/Graph/lmdb_edge_indexing/{p}")





In [None]:
import pyarrow.parquet as pq
import pyarrow.dataset as ds

# Edge
edge = ds.dataset("D:\zman\graph\data/edge_rek_credit")
print("edge_rek_credit")
print(edge.schema, end="\n\n")

edge_ = ds.dataset("D:\zman\graph\data/edge_rek_debit")
print("edge_rek_debit")
print(edge_.schema, end="\n\n")

# Node transaksi
trx = ds.dataset("D:\zman\graph\data/node_transaksi")
print("node_transaksi")
print(trx.schema)





In [None]:
scanner = edge.scanner()
batch = scanner.head(5)
batch.to_pandas()

In [None]:
scanner_ = edge_.scanner()
batch_ = scanner_.head(5)
batch_.to_pandas()

In [None]:
scanner_trx = trx.scanner()
batch_trx = scanner_trx.head(5)
batch_trx.to_pandas()