In [28]:
"""
index_finder.py – Python 3.8‑friendly, type‑hint‑free, **Numba 지원** 문자열·숫자 겸용 고속 인덱스 라이브러리

Public API
----------
    build_index(a, engine='auto') -> Index
    find_indices(a, b, engine='auto') -> list[int] | numpy.ndarray[int]

Notes
-----
* `engine='auto'` : array 크기·데이터 종류·Numba 설치 여부로 적절 backend 선택
* Numba backend 는 **숫자(int/float)** 뿐만 아니라 **문자열(str)** 도 지원 (Numba 0.56+) 
  * 숫자 → `typed.Dict[int64,int64]`
  * 문자열 → `typed.Dict[unicode,int64]`
"""

import numpy as np

# ---------------------------------------------------------------------------
#  Optional Numba backend ----------------------------------------------------
# ---------------------------------------------------------------------------
try:
    import numba  # type: ignore
    from numba import njit, types  # type: ignore
    from numba.typed import Dict as NumbaDict  # type: ignore

    _HAS_NUMBA = True
except ModuleNotFoundError:  # pragma: no cover
    _HAS_NUMBA = False

__all__ = ["build_index", "find_indices", "Index"]

# ---------------------------------------------------------------------------
#  Core container
# ---------------------------------------------------------------------------

class Index:
    """Lookup object – call with a sequence to fetch indices (‑1 if absent)."""

    __slots__ = ("_map", "_engine")

    def __init__(self, mapping, engine):
        self._map = mapping
        self._engine = engine  # 'python' | 'numpy' | 'numba_num' | 'numba_str'

    # ---------------- public API ---------------- #
    def lookup(self, b):
        if self._engine == "numba_num":
            return _lookup_num(d=self._map, b_np=np.asarray(b, dtype=np.int64))
        if self._engine == "numba_str":
            return _lookup_str(d=self._map, b_list=list(b))
        return [self._map.get(x, -1) for x in b]

    __call__ = lookup  # alias
    def __getitem__(self, item):
        return self._map.get(item, -1)

    def __repr__(self):  # pragma: no cover
        return f"<Index engine={self._engine} size={len(self._map)}>"

# ---------------------------------------------------------------------------
#  Builder -------------------------------------------------------------------
# ---------------------------------------------------------------------------

def build_index(a, engine="auto"):
    """Pre‑compute an index structure for *a* (first occurrence per key).

    Parameters
    ----------
    a : sequence‑like (list/ndarray)
    engine : {"auto", "python", "numpy", "numba"}
    """
    if engine == "auto":
        engine = _pick_engine(a)

    # ---------- pure‑python ---------- #
    if engine == "python":
        return Index({v: i for i, v in enumerate(a)}, "python")

    # ---------- numpy backend ---------- #
    arr = np.asarray(a)
    if engine == "numpy":
        uniq, first_pos = _first_pos_numpy(arr)
        return Index(dict(zip(uniq.tolist(), first_pos.tolist())), "numpy")

    # ---------- numba backend ---------- #
    if engine == "numba":
        if not _HAS_NUMBA:
            raise RuntimeError("Numba not installed; choose another engine.")

        kind = arr.dtype.kind
        if kind in {"i", "u", "f"}:  # numeric
            mapping = _build_num(arr.astype(np.int64))
            return Index(mapping, "numba_num")

        # treat as string (unicode/object)
        py_list = [str(x) for x in a]  # ensure Python str
        mapping = _build_str(py_list)
        return Index(mapping, "numba_str")

    raise ValueError(
        f"Unknown engine {engine!r} – choose 'auto', 'python', 'numpy', or 'numba'.")


def find_indices(a, b, engine="auto"):
    """Convenience: build index then look up *b*."""
    return build_index(a, engine).lookup(b)

# ---------------------------------------------------------------------------
#  Numpy helper --------------------------------------------------------------
# ---------------------------------------------------------------------------

def _first_pos_numpy(arr):
    order = arr.argsort(kind="mergesort")  # stable sort to keep first position
    sorted_arr = arr[order]
    mask = np.empty(len(arr), dtype=bool)
    mask[0] = True
    mask[1:] = sorted_arr[1:] != sorted_arr[:-1]
    return sorted_arr[mask], order[mask]

# ---------------------------------------------------------------------------
#  Numba helpers -------------------------------------------------------------
# ---------------------------------------------------------------------------
if _HAS_NUMBA:

    # ---- numeric ---- #
    @njit(cache=True)
    def _build_num(a_np):
        d = NumbaDict.empty(key_type=types.int64, value_type=types.int64)
        for i in range(a_np.size):
            v = int(a_np[i])
            if v not in d:
                d[v] = i
        return d

    @njit(cache=True)
    def _lookup_num(d, b_np):
        out = np.empty(b_np.size, dtype=np.int64)
        for i in range(b_np.size):
            k = int(b_np[i])
            out[i] = d.get(k, -1)
        return out

    # ---- string ---- #
    @njit(cache=True)
    def _build_str(a_list):
        d = NumbaDict.empty(key_type=types.unicode_type, value_type=types.int64)
        for i in range(len(a_list)):
            v = a_list[i]
            if v not in d:
                d[v] = i
        return d

    @njit(cache=True)
    def _lookup_str(d, b_list):
        out = np.empty(len(b_list), dtype=np.int64)
        for i in range(len(b_list)):
            k = b_list[i]
            out[i] = d.get(k, -1)
        return out
else:  # No numba

    def _build_num(*_a, **_k):
        raise RuntimeError("Numba not available – cannot use engine='numba'.")

    _lookup_num = _build_str = _lookup_str = _build_num  # alias to raise

# ---------------------------------------------------------------------------
#  Engine heuristic ----------------------------------------------------------
# ---------------------------------------------------------------------------

def _pick_engine(a):
    n = len(a)
    if n < 1_000_000:
        return "python"
    if _HAS_NUMBA:
        # try numba even for strings (0.56+)
        return "numba"
    return "numpy"


In [29]:
import numpy as np, time

a = np.random.randint(0, 1000, size=10_000_000, dtype="int64")
b = [123, 456, 789]

idx = build_index(a, engine="numba")   # ≈ 1.2 s (초기 컴파일 포함)
t0 = time.perf_counter()
positions = idx.lookup(b)
print(positions)                       # array([  -1, 5219,  ... ])
print("lookup time:", time.perf_counter() - t0, "sec")  # <1 µs

[2873   59  348]
lookup time: 0.0457160420000946 sec


In [30]:
a[2873],a[59], a[348]

(123, 456, 789)

In [37]:
import sys
sys.path.append('../')
from asopipe.utils.basic import loadBlatOutput
from jklib.genome import locus
from itertools import chain

refFlat_path="/Users/dowonkim/Dropbox/data/UCSC/hg38/refFlat/refFlat_200817.txt"
blat = loadBlatOutput(refFlat_path, by='transID')

transid="NM_133379" #TTN
#transid="NM_002415" #MIF


def _get_transInfo(transid):
    return [t for t in blat[transid]
            if len(t['chrom'].split('_')) == 1][0]

def get_data(transid):
    transInfo = _get_transInfo(transid=transid)
    anti = '+' if transInfo['strand'] == '-' else '-'
    chrom = transInfo['chrom']
    start = transInfo['txnSta']
    end   = transInfo['txnEnd']
    return transInfo

def _tile_region(transid, tile_length=17):
    
    transInfo = get_data(transid=transid)
    anti = '+' if transInfo['strand'] == '-' else '-'
    chrom = transInfo['chrom']
    start = transInfo['txnSta']
    end   = transInfo['txnEnd']
    exonL = transInfo['exnList']
    L     = tile_length
    print(start, end)
    print(exonL)
    concatL = [range(s+1,e-L+1) for s,e in exonL if s+1 <= e-L+1]
    iterrange = chain(*concatL)
    mature_loc_list = []
    for i in iterrange:
        mature_loc_list.append(f"{chrom}:{i}-{i+L-1}{anti}")
    
    return [f"{chrom}:{i}-{i+L-1}{anti}"   # inclusive end = i+L-1
        for i in range(start, end-L+1)], mature_loc_list
    
tile_loc_list, mature_loc_list = _tile_region(transid=transid, tile_length=17)
tile_loc_list[:3], mature_loc_list[:3]

178744408 178807423
[(178744408, 178752039), (178753123, 178753180), (178758983, 178759172), (178764176, 178764302), (178764526, 178764811), (178766380, 178766612), (178767758, 178767924), (178768013, 178768155), (178768672, 178768933), (178769678, 178769939), (178770059, 178770320), (178770411, 178770675), (178771210, 178771471), (178773108, 178773369), (178773461, 178773725), (178773837, 178774110), (178774206, 178774473), (178774920, 178775202), (178775355, 178777049), (178777148, 178777317), (178777419, 178777584), (178777703, 178777975), (178778873, 178779118), (178779228, 178779462), (178779999, 178780205), (178781120, 178781263), (178782211, 178782427), (178782538, 178782602), (178782805, 178783064), (178783719, 178783785), (178784069, 178784351), (178785619, 178785742), (178785847, 178786141), (178789359, 178789497), (178789977, 178790115), (178790707, 178790845), (178792071, 178792197), (178793403, 178793541), (178794398, 178794551), (178794921, 178795252), (178799486, 1787997

(['chr2:178744408-178744424+',
  'chr2:178744409-178744425+',
  'chr2:178744410-178744426+'],
 ['chr2:178744409-178744425+',
  'chr2:178744410-178744426+',
  'chr2:178744411-178744427+'])

In [38]:
import numpy as np, time

idx = build_index(tile_loc_list, engine="numba")   # ≈ 1.2 s (초기 컴파일 포함)
t0 = time.perf_counter()
positions = idx.lookup(mature_loc_list)
print(positions)                       # array([  -1, 5219,  ... ])
print("lookup time:", time.perf_counter() - t0, "sec")  # <1 µs

[    1     2     3 ... 62996 62997 62998]
lookup time: 0.009857957999884093 sec


In [39]:
len(positions)

17434

In [41]:
a_arr   = np.asarray(tile_loc_list, dtype=object)
idx_arr = np.asarray(positions, dtype=int)

mapped = a_arr[idx_arr[idx_arr >= 0]].tolist()
len(mapped)

17434