In [1]:
# tracking_id:int, xyxy: [], 'ocr':object,'ocr_conf':float , row: int, status: str, 
from enum import Enum
class ItemStatus(Enum):
    NEW = "new"    
    NORMAL = "normal"
    MISPLACED = "misplaced"
    PENDING = "pending"


청구기호 파싱

In [2]:
from decimal import Decimal, getcontext
import unicodedata, re, time
from bisect import bisect_left

getcontext().prec = 50  # DDC 소수부 비교 정밀도
JAMO_ORDER = "ㄱㄲㄴㄷㄸㄹㅁㅂㅃㅅㅆㅇㅈㅉㅊㅋㅌㅍㅎ"
JAMO_IDX = {ch: i for i, ch in enumerate(JAMO_ORDER)}

def _to_initial_jamo(ch):
    code = ord(ch)
    if 0xAC00 <= code <= 0xD7A3:  # 완성형
        return (code - 0xAC00) // 588
    return JAMO_IDX.get(ch, -1)

def _letter_code(ch):
    # 영문(A..Z) < 한글(초성순) < 기타
    if 'A' <= ch <= 'Z':
        return (0, ord(ch))
    j = _to_initial_jamo(ch)
    if j != -1:
        return (1, j)
    return (2, ord(ch))

def _normalize(s):
    s = unicodedata.normalize('NFKC', (s or "").upper())
    s = re.sub(r'\s+', ' ', s).strip()
    s = s.replace('..', '.')
    s = re.sub(r'(?<=\d)O(?=\d)', '0', s)     # O↔0
    s = re.sub(r'(?<=\d)[IL](?=\d)', '1', s)  # I,l↔1
    s = s.replace(',', '.')                   # 004,3 -> 004.3
    return s

_TOK = re.compile(r'([A-Z]+|[ㄱ-ㅎ가-힣]+|\d+|[.])')
def _tokens(s):
    return [t for t in _TOK.findall(s or "") if t]

YEAR = re.compile(r'(?<!\d)((?:19|20)\d{2})(?!\d)')
VOL  = re.compile(r'(?:V|VOL\.?|권)\s*\.?\s*(\d+)', re.I)
PART = re.compile(r'(?:PT|NO)\s*\.?\s*(\d+)', re.I)
COPY = re.compile(r'(?:C|COPY|C\.)\s*\.?\s*(\d+)', re.I)

class DdcKey:
    # 청구기호 클래스
    def __init__(self, cls, dec, cutters, year, vol, part, copy_):
        self.cls = cls
        self.dec = dec
        self.cutters = cutters    # 리스트/튜플 [(grp, code, num), ...]
        self.year = year
        self.vol = vol
        self.part = part
        self.copy = copy_

    def tup(self):
        # 파이썬 튜플 사전식 비교 활용
        return (self.cls, self.dec, tuple(self.cutters), self.year, self.vol, self.part, self.copy)

def parse_ddc_key(raw):
    """DDC 문자열을 DdcKey로 파싱. 실패 시 None."""
    s = _normalize(raw)
    m = re.match(r'^(\d{3})(?:\.(\d+))?', s)  # 004 / 004.3 / 005.133
    if not m:
        return None
    cls = int(m.group(1))
    dec = Decimal('0.' + m.group(2)) if m.group(2) else Decimal(0)
    rest = s[m.end():].strip()

    year = next((int(y) for y in YEAR.findall(rest)), -1)
    vol  = next((int(v) for v in VOL.findall(rest)), -1)
    part = next((int(p) for p in PART.findall(rest)), -1)
    copy_ = next((int(c) for c in COPY.findall(rest)), -1)

    toks = _tokens(rest)
    cutters = []
    i = 0
    while i < len(toks):
        t = toks[i]
        if t == '.':
            i += 1
            continue
        if re.fullmatch(r'[A-Z]+|[ㄱ-ㅎ가-힣]+', t):
            ch = t[0]
            grp, code = _letter_code(ch)
            num = -1
            if i + 1 < len(toks) and toks[i + 1].isdigit():
                num = int(toks[i + 1]); i += 1
            cutters.append((grp, code, num))
        i += 1

    return DdcKey(cls, dec, cutters, year, vol, part, copy_)

LIS (최장 증가 부분수열) 인덱스 복원

In [3]:
def _lis_indices(keys):
    """O(n log n) LIS: 증가 부분수열의 '원본 인덱스 집합'을 반환."""
    tails = []               # 길이 k 수열의 '최소 꼬리' 원소 인덱스
    prev = [-1] * len(keys)  # 재구성 포인터

    def less(i, j):
        return keys[i].tup() < keys[j].tup()

    for i in range(len(keys)):
        lo, hi = 0, len(tails)
        while lo < hi:
            mid = (lo + hi) // 2
            if less(tails[mid], i):
                lo = mid + 1
            else:
                hi = mid
        if lo == len(tails):
            tails.append(i)
        else:
            tails[lo] = i
        if lo > 0:
            prev[i] = tails[lo - 1]

    res = []
    cur = tails[-1] if tails else -1
    while cur != -1:
        res.append(cur)
        cur = prev[cur]
    return set(reversed(res))

def _to_float_or_none(x):
    try:
        return float(x) if x is not None else None
    except Exception:
        return None

행(row) 단위 오배치 탐지

In [4]:
def detect_row(records, conf_threshold_hold=0.70, none_conf_as_hold=False):
    """
    같은 행만 담긴 records를 받아 오배치 탐지.
    반환:
      {
        "ordered": [tracker_id...],     # 좌->우 정렬된 ID
        "lis_indices": set([...]),      # 정상(LIS) 인덱스(좌->우 기준)
        "misplaced": [ {i, tracker_id, ocr, reason, move{to_index,delta,hint}} ... ],
        "hold":      [ {i, tracker_id, reason} ... ]
      }
    """
    # 1) 좌->우 정렬(x1 기준)
    seq = sorted(records, key=lambda r: float(r['xyxy'][0]))
    ordered_ids = [r['tracker_id'] for r in seq]

    # 2) 파싱 + 신뢰도 필터
    keys = []
    hold = []
    valid_idx = []
    for i, r in enumerate(seq):
        conf = _to_float_or_none(r.get('ocr_conf', None))
        key = parse_ddc_key(r.get('ocr', '') or '')
        keys.append(key)
        if key is None:
            hold.append({"i": i, "tracker_id": r['tracker_id'], "reason": "DDC 파싱 실패"})
        elif (conf is None) and none_conf_as_hold:
            hold.append({"i": i, "tracker_id": r['tracker_id'], "reason": "OCR 신뢰도 없음(None)"})
        elif (conf is not None) and (conf < conf_threshold_hold):
            hold.append({"i": i, "tracker_id": r['tracker_id'], "reason": "OCR 신뢰도 낮음(%.2f)" % conf})
        else:
            valid_idx.append(i)

    # 3) 유효 항목으로 LIS → 오배치 확정
    lis_set = set()
    misplaced = []
    if valid_idx:
        sub = [keys[i] for i in valid_idx]  # 유효 DdcKey만
        lis_sub = _lis_indices(sub)         # sub 내부 인덱스 집합
        lis_set = {valid_idx[i] for i in lis_sub}  # 원래 좌->우 인덱스로 역매핑

        for i in valid_idx:
            if i not in lis_set:
                misplaced.append({
                    "i": i,
                    "tracker_id": seq[i]['tracker_id'],
                    "ocr": seq[i].get('ocr', '') or '',
                    "reason": "행 전체 기준 정렬 역행",
                    "move": _suggest_move(seq, keys, i)
                })

    return {
        "ordered": ordered_ids,
        "lis_indices": lis_set,
        "misplaced": misplaced,
        "hold": hold
    }

def _suggest_move(seq, keys, i):
    """
    같은 행의 '이상적 정렬'에서 대상 키의 삽입 위치를 추정해 이동 힌트를 제공.
    """
    pairs = [(idx, keys[idx].tup()) for idx in range(len(keys)) if keys[idx] is not None]
    pairs.sort(key=lambda t: t[1])
    if not pairs or keys[i] is None:
        return {"to_index": None, "delta": None, "hint": "제안 불가(파싱 실패)"}

    sorted_keys = [kv for _, kv in pairs]
    pos = bisect_left(sorted_keys, keys[i].tup())

    if pos <= 0:
        to_index = pairs[0][0]
    elif pos >= len(pairs):
        to_index = pairs[-1][0]
    else:
        left_i = pairs[pos - 1][0]
        right_i = pairs[pos][0]
        # 기본 전략: 오른쪽으로 붙임
        to_index = right_i

    delta = to_index - i
    hint = "%d권 %s" % (abs(delta), "오른쪽" if delta > 0 else ("왼쪽" if delta < 0 else "이동 불필요"))
    return {"to_index": int(to_index), "delta": int(delta), "hint": hint}

상태/변경 관리 클래스

In [None]:
class Item:
    def __init__(self, tracker_id, xyxy, ocr, ocr_conf, row):
        self.tracker_id = tracker_id    
        self.xyxy = xyxy
        self.ocr = ocr # 없을경우는 기본적 입력할때 None임 
        self.ocr_conf = ocr_conf  # None 또는 float
        self.row = int(row)
        self.status = ItemStatus.NEW if ocr_conf > 0.7 else ItemStatus.PENDING  #ocr 신뢰도가 0.7미만인 경우 보류로 설정 
        self.parsed =parse_ddc_key(ocr) if ocr is not None else None # 없을경우는 None으로 
        self.updated_at = time.time()

    
    def x1(self):
        return float(self.xyxy[0])
    
    def id(self):
        return self.tracker_id
    
class ManageItem:
    def __init__(self): 
        self.items ={} # Key: Item.row value: [Item class]  
    
     def insert_update(self, item:Item):
        """tracker_id 기준으로 우선 탐색해야 함.
            먼저 모든 row에 있는 아이템들 중 tracker_id 같은 게 있는지 확인
            있으면: 해당 아이템을 꺼내서 row가 바뀌었으면 새 row로 이동 및 OCR 신뢰도 비교후 높은 신뢰도로 ocr 교체 
            없으면: 그냥 새로 삽입"""
          # 1. tracker_id 탐색
        found_row, found_idx = None, None
        for row, items_in_row in self.items.items():
            for idx, it in enumerate(items_in_row):
                if it.tracker_id == item.tracker_id:
                    found_row, found_idx = row, idx
                    break
            if found_row is not None:
                break

        if found_row is not None:  
            # 2. 기존 item 존재
            old_item = self.items[found_row][found_idx]
            s = old_item.status  # 기존 status 유지

            # OCR 신뢰도 비교
            if old_item.ocr_conf is None and item.ocr_conf is None:
                # 둘 다 OCR 없음 → 좌표만 갱신
                old_item.xyxy = item.xyxy

            elif old_item.ocr_conf is None:
                # 기존 OCR 없음 → 새 item으로 교체
                self.items[found_row][found_idx] = item

            elif item.ocr_conf is None:
                # 새 item에 OCR 없음 → 좌표만 갱신
                old_item.xyxy = item.xyxy

            elif item.ocr_conf > old_item.ocr_conf:
                # 새 item이 OCR 더 정확 → 교체
                self.items[found_row][found_idx] = item
            else:
                # 기존 OCR 더 정확 → 좌표만 갱신
                old_item.xyxy = item.xyxy

            # status는 항상 유지
            self.items[found_row][found_idx].status = s

            # 3. row 변경되었으면 새 row로 이동
            if item.row != found_row:
                # 기존 row에서 제거
                moved_item = self.items[found_row].pop(found_idx)
                if not self.items[found_row]:
                    del self.items[found_row]  # 비면 삭제
                # 새 row에 삽입
                if item.row not in self.items:
                    self.items[item.row] = []
                self.items[item.row].append(moved_item)

        else:
            # 4. tracker_id 못 찾으면 새로 삽입
            if item.row not in self.items:
                self.items[item.row] = []
            self.items[item.row].append(item)    
    
    def input_predict(self, results): ## 예측 결과를 items 변수에 삽입
        for i in results:
            self.insert_update(Item(tracker_id=i.get('tracker_id'),
                xyxy=i.get('xyxy'),
                ocr=i.get('ocr'),
                ocr_conf=i.get('ocr_conf'),
                row=i.get('row')
            ))
    
        
    def _lis_indices(self, keys):
        """O(n log n) LIS: 증가 부분수열의 인덱스 집합을 반환."""
        tails = []
        prev = [-1] * len(keys)

        def less(i: int, j: int) -> bool:
            return keys[i].tup() < keys[j].tup()

        for i in range(len(keys)):
            lo, hi = 0, len(tails) # 초기화 low : 0 index high : 입력된 len(list) 
            while lo < hi:
                mid = (lo + hi) // 2
                if less(tails[mid], i):
                    lo = mid + 1
                else:
                    hi = mid
            if lo == len(tails):
                tails.append(i)
            else:
                tails[lo] = i
            if lo > 0:
                prev[i] = tails[lo - 1]

        res = []
        cur = tails[-1] if tails else -1
        while cur != -1:
            res.append(cur)
            cur = prev[cur]
        return set(reversed(res))         
                
    def detect_bookLabel(self):        
        for k, v in self.items.items():
            if not v: # 데이터가 없을 시 판정 X (혹시모를 예외처리)
                continue
            if len(v) < 3: # 2개이하일때는 판정을 안함(보류 판정)
                for j in self.items[k]:
                    j.status = ItemStatus.PENDING  # 보류 판정
                continue
            
            new_items = [it for it in v if it.status == ItemStatus.NEW or it.status == ItemStatus.MISPLACED]
            if not new_items: #없을경우 해당 행 종료 
                continue

            # DDC 키 기준 정렬 (x좌표 순서)
            new_items = sorted(new_items, key=lambda x: float(x.xyxy[0]))

            # parsed (DdcKey) 추출
            parsed_keys = [it.parsed for it in new_items if it.parsed is not None]

            # LIS 인덱스 구하기
            lis_idx = self._lis_indices(parsed_keys)

            # LIS 안에 포함 → 정상, LIS 밖 → 잘못 배치
            for idx, it in enumerate(new_items):
                if it.parsed is None:
                    it.status = ItemStatus.PENDING
                elif idx in lis_idx:
                    it.status = ItemStatus.NORMAL
                else:
                    it.status = ItemStatus.MISPLACED        
    
    def get_misplaced(self):
        wrong = []  
        for i in self.items.values():
            for v in i:
                if v.status == ItemStatus.MISPLACED:
                    wrong.append(v)       
        return wrong
    
    def get_pending(self) :
        pending = [] 
         
        for i in self.items.values():
            for v in i:
                if v.status == ItemStatus.PENDING:
                    pending.append(v)       
        return pending   
    
    def get_normal(self):
        normal = []
        for i in self.items.values():
            for v in i:
                if v.status == ItemStatus.NORMAL:
                    normal.append(v)       
        return normal         
            
    
    def start(self,results):
        self.input_predict(results)
        self.detect_bookLabel()
        n = self.get_normal()
        m = self.get_misplaced()
        p = self.get_pending()
        return {"normal": n, "misplaced":m,"pending":p }
            
            
                    
                  
                
        
            
               
    
        
             
    
         


In [6]:
# if __name__ == "__main__":
#     # 상태 매니저 생성(DDC 파서 주입)
#     state = RowState(parser=parse_ddc_key)

#     # 스트림 입력(동일 row=0)
#     state.upsert({'tracker_id': 1, 'xyxy':[215.86,448.50,279.39,489.61], 'ocr':'004',       'ocr_conf': None, 'row':0})
#     state.upsert({'tracker_id': 2, 'xyxy':[300.00,448.50,360.00,489.61], 'ocr':'004.3 P1', 'ocr_conf': 0.95, 'row':0})
#     state.upsert({'tracker_id': 3, 'xyxy':[380.00,448.50,440.00,489.61], 'ocr':'003.9',    'ocr_conf': 0.99, 'row':0})

#     # 같은 tracker_id=1, conf 생기고 더 자세해짐 → 교체
#     state.upsert({'tracker_id': 1, 'xyxy':[215.86,448.50,279.39,489.61], 'ocr':'004.301',  'ocr_conf': 0.91, 'row':0})

#     # 행 판정
#     row0_input = state.to_detect_row_input(0)
#     out = detect_row(row0_input, conf_threshold_hold=0.70, none_conf_as_hold=False)
#     state.apply_detect_result(0, out)

#     # 결과 출력(좌->우)
#     print("ordered:", out["ordered"])
#     print("lis_indices:", sorted(out["lis_indices"]))
#     print("misplaced:", [(m["i"], m["tracker_id"], m["ocr"], m["move"]["hint"]) for m in out["misplaced"]])
#     print("hold:", [(h["i"], h["tracker_id"], h["reason"]) for h in out["hold"]])

#     for it in state.list_row(0):
#         print("row0 item:", it.tracker_id, it.ocr, it.ocr_conf, it.status)

In [7]:
from tracking_method.processing import row_ocr_clustering
import supervision as sv
import cv2
from ultralytics import YOLO
# 1) 일괄(행 단위) 판정
# row = [
#     RowItem(id=1, x=10,  text="004.3 P12 2015", conf=0.98),
#     RowItem(id=2, x=50,  text="005.1 P9 2018",  conf=0.96),
#     RowItem(id=3, x=90,  text="003.9 A1 2012",  conf=0.99),  # 오배치가 되어야 정상
#     RowItem(id=4, x=130, text="005.2 A37 2019", conf=0.92),
# ]



img = r"E:\다운로드\KakaoTalk_20250814_141911336.jpg"
img_orignal = cv2.imread(img)
imgs = cv2.resize(img_orignal, (640, 640))


model = YOLO("weights.pt")

results = model(imgs, verbose=False)[0]
# 2) 탐지 → Detections 변환
detections = sv.Detections.from_ultralytics(results)
# (필요 시 감도 조정) conf 필터 예: detections = detections[detections.confidence > 0.25]

# 3) 트래킹 업데이트 (ByteTrack)
#   supervision 0.19+ 버전은 인자 없이도 동작.
#   특정 해상도/프레임 속도 기반 튜닝이 필요하면 ByteTrackArgs로 세부설정 가능.
tracker = sv.ByteTrack()  
tracked = tracker.update_with_detections(detections)
boxes_id = row_ocr_clustering(imgs,tracked,img_orignal)
m = ManageItem()



  from .autonotebook import tqdm as notebook_tqdm
Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


FileNotFoundError: [Errno 2] No such file or directory: 'weights.pt'

In [None]:
detections

Detections(xyxy=array([[     215.87,       448.5,       279.4,      489.61],
       [     434.38,      446.14,      484.19,       484.6],
       [      518.1,      457.41,       576.1,      499.04],
       [     397.89,      447.42,      432.51,      486.54],
       [     285.84,      453.07,      314.02,      494.22],
       [     114.37,      448.79,       146.1,      489.25],
       [     152.33,      447.59,      185.73,      486.49],
       [     66.202,      448.53,      108.39,      487.95],
       [      362.3,      444.69,      395.02,      483.81],
       [     316.49,      453.85,      359.28,      495.86],
       [     189.83,      447.62,      212.62,      486.94],
       [     25.871,      448.39,      59.072,      487.65],
       [     610.77,      460.88,      639.84,      502.81],
       [     486.91,      452.26,      513.04,      490.93]], dtype=float32), mask=None, confidence=array([    0.92785,     0.92298,     0.91487,     0.90789,     0.90373,       0.899,      0

In [None]:
r_o_c = m.start(boxes_id)

In [None]:
r_o_c

{'normal': [<__main__.Item at 0x245314e2dc0>,
  <__main__.Item at 0x24531603370>,
  <__main__.Item at 0x2453154b430>],
 'misplaced': [<__main__.Item at 0x245314e27f0>,
  <__main__.Item at 0x245314e2670>,
  <__main__.Item at 0x245314e26a0>,
  <__main__.Item at 0x2453154b0d0>,
  <__main__.Item at 0x2453154bd30>,
  <__main__.Item at 0x2453154ba00>],
 'pending': [<__main__.Item at 0x245314e2940>,
  <__main__.Item at 0x245314e24f0>,
  <__main__.Item at 0x245314e2d00>,
  <__main__.Item at 0x245314e2040>,
  <__main__.Item at 0x245316034f0>]}

In [None]:
def draw_bounding_box(imgs,r_o_c):
    color = ()
    for k,v in r_o_c.items():
        if k == ItemStatus.NORMAL.value:
            color = (0,255,0)
        elif k == ItemStatus.PENDING.value:
            color = (0,255,255)
        elif k == ItemStatus.MISPLACED.value:
            color = (0,0,255)
        else:
            color = (255, 255, 255)    
        print(color)    
        for j in v:
            x1, y1, x2, y2 = map(int, j.xyxy)
            label = f"{j.tracker_id}"
            # 1. 내부 반투명 박스 (alpha blending 직접 적용)
            alpha = 0.25
            sub_img = imgs[y1:y2, x1:x2]                                # ROI
            overlay = sub_img.copy()   # 새로운 배열(메모리) 생성
            cv2.rectangle(overlay, (0,0), (x2-x1, y2-y1), color, -1)
            cv2.addWeighted(overlay, alpha, sub_img, 1 - alpha, 0, sub_img)

            # 2. 테두리 박스
            cv2.rectangle(imgs, (x1, y1), (x2, y2), color, 2, lineType=cv2.LINE_AA)

            # 3. 텍스트 배경 + 텍스트
            font = cv2.FONT_HERSHEY_SIMPLEX
            font_scale = 0.4
            (tw, th), baseline = cv2.getTextSize(label, font, font_scale, 1)

            cv2.rectangle(imgs,
                        (x1, y1 - th - baseline),
                        (x1 + tw, y1),
                        color,
                        -1,
                        lineType=cv2.LINE_AA)

            cv2.putText(imgs,
                        label,
                        (x1, y1 - baseline),
                        font,
                        font_scale,
                        (255, 255, 255),
                        1,
                        cv2.LINE_AA)


In [None]:
draw_bounding_box(imgs,r_o_c)

cv2.imshow("Result", imgs)     # 윈도우에 이미지 출력
cv2.waitKey(0)                 # 키 입력 대기 (0이면 무한 대기)
cv2.destroyAllWindows()  

(0, 255, 0)
(0, 0, 255)
(0, 255, 255)
