In [1]:
from astra_camera import Camera

In [2]:
cam = Camera()



In [3]:
# import torch, time, cv2, numpy as np

# try:
#     while True:
#         depth, rgb = cam.get_depth_and_color()   
#         frame_bgr = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)
#         h, w = frame_bgr.shape[:2]
#         cv2.imshow('rgb', rgb)
#         if (cv2.waitKey(1) & 0xFF) == 27:
#             cv2.destroyAllWindows()
#             break

# except Exception as e:
#     print(e)
# finally:
#     try:
#         cam.unload()
#     except Exception:
#         pass
#     cv2.destroyAllWindows()

In [None]:
import time, re
import numpy as np
import cv2
import torch
from collections import deque

from control.forward import inverse_kinematics
from control.send_mess import init_serial, send_pick, send_place

# ================== MODEL/ROI CONFIG ==================
WEIGHTS = 'best.pt'
infer_interval_s = 0.10
im_size_base = 416

ROI_X0, ROI_Y0 = 31, 10
ROI_X1, ROI_Y1 = 558, 307

# ================== BEHAVIOR CONFIG ==================
PICK_DELAY_S = 3.0                       # chờ ổn định trước khi gửi PICK
VERIFY_WINDOW_S          = 2.0           # cửa sổ kiểm tra sau khi bắt đầu verify
VERIFY_MIN_CONSEC_MISS   = 2             # cần “mất” liên tiếp N khung
SEARCH_RADIUS_PICK_PX    = 30            # bán kính vòng tròn vàng

AUTO_PLACE_ON_SUCCESS    = True
PLACE_MARGIN_S           = 0.2
PLACE_MAP                = {0: 0, 1: 1}
DEFAULT_PLACE_IDX        = 0

STATE_IDLE, STATE_BUSY = 0, 1
robot_state = STATE_IDLE
cooldown_until = 0.0

# ================== UTIL ==================
def extract_angles(sol):
    try:
        arr = np.array(sol, dtype=float).ravel()
        if arr.size >= 3:
            return float(arr[0]), float(arr[1]), float(arr[2])
    except Exception:
        pass
    if isinstance(sol, dict):
        for keys in (('t1','t2','t3'), ('s1','s2','s3')):
            if all(k in sol for k in keys):
                return float(sol[keys[0]]), float(sol[keys[1]]), float(sol[keys[2]])
        try:
            vals = list(sol.values())
            if len(vals) >= 3:
                return float(vals[0]), float(vals[1]), float(vals[2])
        except Exception:
            return None
    return None

def run_detect(model, frame_rgb, size):
    with torch.inference_mode():
        results = model(frame_rgb, size=size)
    try: det = results.xyxy[0]
    except Exception: det = results.pred[0] if hasattr(results, 'pred') else None
    if det is None or len(det) == 0: return []
    return det.detach().to('cpu').numpy().tolist()

def same_class_near(dets, target_cid, cx, cy, radius_px):
    """ĐÚNG như yêu cầu: còn bbox cùng class nằm TRONG vòng tròn hay không."""
    r2 = radius_px * radius_px
    for b in dets:
        bx1,by1,bx2,by2,conf,cid = b
        if int(cid) != int(target_cid): 
            continue
        mx,my = 0.5*(bx1+bx2), 0.5*(by1+by2)
        if (mx-cx)**2 + (my-cy)**2 <= r2:
            return True
    return False

def count_in_roi(dets, x0,y0,x1,y1):
    cnt = {}
    for b in dets:
        bx1,by1,bx2,by2,conf,cid = b
        cx = 0.5*(bx1+bx2); cy = 0.5*(by1+by2)
        if x0 <= cx <= x1 and y0 <= cy <= y1:
            k = int(cid); cnt[k] = cnt.get(k, 0) + 1
    return cnt

def parse_pick_ms_from_resp(lines):
    if not lines: return None
    for ln in lines[::-1]:
        m = re.search(r'OK:PICK_MS=(\d+)', ln.replace('\r',''))
        if m:
            try: return int(m.group(1))
            except: pass
    return None

class PrePick:
    def __init__(self, cid, cx, cy, t_detect, delay_s):
        self.cid = int(cid); self.cx = int(cx); self.cy = int(cy)
        self.t_detect = float(t_detect)
        self.t_pick_start = self.t_detect + float(delay_s) - 2.0
        self.issued = False

class PickTask:
    def __init__(self, cid, cx, cy, pick_start_time, pick_ms_est):
        self.cid = int(cid)
        self.cx, self.cy = int(cx), int(cy)
        self.t_pick = float(pick_start_time)                    # lúc gửi P
        self.pick_ms = float(pick_ms_est) if pick_ms_est else 4000.0
        self.t_verify = self.t_pick + self.pick_ms/1000.0 - 2.0
        self.t_deadline = self.t_verify + VERIFY_WINDOW_S
        self.miss_streak = 0
        self.done = False
        self.success = False

# ================== MAIN ==================
if __name__ == "__main__":
    try:
        init_serial()

        torch.backends.cudnn.benchmark = True
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        model = torch.hub.load('ultralytics/yolov5', 'custom', path=WEIGHTS, force_reload=False)
        model.to(device).eval()
        if device == 'cuda':
            try: model.half()
            except Exception: pass
        model.conf, model.iou, model.max_det = 0.5, 0.65, 20
        names = model.names

        try: stride = int(getattr(model, 'stride', torch.tensor([32])).max())
        except Exception: stride = 32
        imgsz = (im_size_base // stride) * stride

        last_infer = 0.0
        last_infer_time = 0.0
        fps_smooth = None

        pending = deque()
        prepick = None

        while True:
            t0 = time.perf_counter()

            depth, rgb = cam.get_depth_and_color()
            frame_bgr = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)
            h, w = frame_bgr.shape[:2]

            cv2.rectangle(frame_bgr, (ROI_X0, ROI_Y0), (ROI_X1, ROI_Y1), (255,255,0), 2)
            cv2.putText(frame_bgr, "ROI", (ROI_X0+5, ROI_Y0+20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,0), 2)

            x0 = max(0, min(ROI_X0, w-1)); y0 = max(0, min(ROI_Y0, h-1))
            x1 = max(0, min(ROI_X1, w));   y1 = max(0, min(ROI_Y1, h))
            if x1 <= x0 or y1 <= y0:
                cv2.putText(frame_bgr, "ROI invalid", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,255), 2)
                cv2.imshow("Detections", frame_bgr)
                if (cv2.waitKey(1) & 0xFF) == 27: break
                continue

            # Infer
            detections = None
            now = time.perf_counter()
            if (now - last_infer) >= infer_interval_s:
                with torch.inference_mode():
                    t_inf0 = time.perf_counter()
                    results = model(rgb, size=imgsz)
                    last_infer_time = time.perf_counter() - t_inf0
                try: det = results.xyxy[0]
                except Exception: det = results.pred[0] if hasattr(results, 'pred') else None
                if det is not None and len(det) > 0:
                    detections = det.detach().to('cpu')
                last_infer = now

            det_list = [row.tolist() for row in detections] if detections is not None else []

            state_txt = {STATE_IDLE:"IDLE", STATE_BUSY:"BUSY"}[robot_state]
            cv2.putText(frame_bgr, f"STATE: {state_txt}", (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,255), 2)

            # ---------- PREPICK ----------
            if prepick is not None and not prepick.issued:
                cv2.circle(frame_bgr, (prepick.cx, prepick.cy), SEARCH_RADIUS_PICK_PX, (0,255,255), 2)
                remain = max(0.0, prepick.t_pick_start - time.perf_counter())
                cv2.putText(frame_bgr, f"PICK in {remain:.1f}s",
                            (prepick.cx+8, max(0,prepick.cy-8)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,255), 1)

                if time.perf_counter() >= prepick.t_pick_start and robot_state == STATE_IDLE:
                    try:
                        sol = inverse_kinematics(prepick.cx, prepick.cy)
                        angles = extract_angles(sol)
                        if angles is None:
                            print("[IK][ERROR] cannot extract (t1,t2,t3):", sol)
                            prepick = None
                        else:
                            t1,t2,t3 = angles
                            robot_state = STATE_BUSY
                            print("[ACT] PICK:", (t1,t2,t3), "cid=", prepick.cid, "at", (prepick.cx,prepick.cy))

                            resp_lines = send_pick([t1, t2, t3])
                            pick_ms = parse_pick_ms_from_resp(resp_lines) or 4000
                            print(f"[INFO] PICK_MS = {pick_ms} ms")

                            task = PickTask(prepick.cid, prepick.cx, prepick.cy,
                                            time.perf_counter(), pick_ms)
                            pending.clear()
                            pending.append(task)

                            prepick.issued = True
                            prepick = None
                            cooldown_until = time.perf_counter() + 0.3
                            cv2.imshow("Detections", frame_bgr); cv2.waitKey(1)
                    except Exception as e:
                        print("[ACT][ERROR]", e)
                        robot_state = STATE_IDLE
                        prepick = None

            # ---------- VERIFY ----------
            if pending:
                task = pending[0]
                now_t = time.perf_counter()

                cv2.circle(frame_bgr, (task.cx, task.cy), SEARCH_RADIUS_PICK_PX, (0,255,255), 2)
                rem = max(0.0, task.t_verify - now_t)
                cv2.putText(frame_bgr, f"verify in {rem:.1f}s",
                            (task.cx+8, max(0,task.cy-8)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,255), 1)

                if now_t >= task.t_verify and now_t <= task.t_deadline and not task.done:
                    still_in_circle = same_class_near(det_list, task.cid, task.cx, task.cy, SEARCH_RADIUS_PICK_PX)
                    if not still_in_circle:
                        task.miss_streak += 1
                    else:
                        task.miss_streak = 0

                    if task.miss_streak >= VERIFY_MIN_CONSEC_MISS:
                        task.done = True; task.success = True; pending.popleft()
                        print(f"[VERIFY] OK (miss={task.miss_streak}) → PLACE")
                        if AUTO_PLACE_ON_SUCCESS:
                            time.sleep(PLACE_MARGIN_S)
                            place_idx = PLACE_MAP.get(task.cid, DEFAULT_PLACE_IDX)
                            try:
                                print(f"[ACT] PLACE idx={place_idx}")
                                send_place(place_idx)
                            except Exception as e:
                                print("[PLACE][ERROR]", e)
                        robot_state = STATE_IDLE

                elif now_t > task.t_deadline and not task.done:
                    task.done = True; task.success = False; pending.popleft()
                    print("[VERIFY] FAIL: timeout → back to IDLE")
                    robot_state = STATE_IDLE

            if detections is not None and len(detections) > 0 and not pending and prepick is None:
                x0, y0 = 320, 0
                sorted_rows = []

                for row in detections:
                    bx1, by1, bx2, by2, conf, cls = row.tolist()
                    cx = int(0.5 * (bx1 + bx2))
                    cy = int(0.5 * (by1 + by2))
                    dist2 = (cx - x0) ** 2 + (cy - y0) ** 2 
                    sorted_rows.append((dist2, cx, cy, row))

                sorted_rows.sort(key=lambda t: t[0])

                fired = False
                for _, cx, cy, row in sorted_rows:
                    bx1,by1,bx2,by2,conf,cls = row.tolist()
                    cid = int(cls)
                    color = (0,255,0) if cid == 0 else (0,0,255)
                    label = names[cid] if 0 <= cid < len(names) else str(cid)

                    cv2.rectangle(frame_bgr, (int(bx1),int(by1)), (int(bx2),int(by2)), color, 2)
                    try: conf_txt = f"{float(conf):.2f}"
                    except Exception: conf_txt = "?"
                    cv2.putText(frame_bgr, f"{label} {conf_txt}",
                                (int(bx1), max(0,int(by1)-6)),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
                    cv2.circle(frame_bgr, (cx, cy), 4, (255,0,0), -1)

                    if not (ROI_X0 <= cx <= ROI_X1 and ROI_Y0 <= cy <= ROI_Y1): continue
                    if robot_state != STATE_IDLE or time.perf_counter() < cooldown_until: continue
                    if label == 'hand':
                        print("[INFO] 'hand' in ROI -> sleeping 8s")
                        cv2.putText(frame_bgr, "Detected 'hand' -> sleeping 8s",
                                    (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,255), 2)
                        cv2.imshow("Detections", frame_bgr); cv2.waitKey(1); time.sleep(8)
                        break

                    if not fired:
                        prepick = PrePick(cid, cx, cy, time.perf_counter(), PICK_DELAY_S)
                        print(f"[PREPICK] target class={cid} at ({cx},{cy}) after sort; PICK after {PICK_DELAY_S:.1f}s")
                        cooldown_until = time.perf_counter() + 0.2
                        fired = True

            # Overlay tốc độ
            dt = time.perf_counter() - t0
            inst_fps = (1.0 / dt) if dt > 0 else 0.0
            fps_smooth = inst_fps if fps_smooth is None else (fps_smooth*0.9 + inst_fps*0.1)
            cv2.putText(frame_bgr, f"FPS: {fps_smooth:.1f}", (10, h-20),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
            if last_infer_time > 0:
                cv2.putText(frame_bgr, f"Infer dt: {last_infer_time*1000:.1f} ms",
                            (10, h-50), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)

            cv2.imshow("Detections", frame_bgr)
            if (cv2.waitKey(1) & 0xFF) == 27:
                cv2.destroyAllWindows(); break

    except Exception as e:
        print(e)
    finally:
        try: cam.unload()
        except Exception: pass
        cv2.destroyAllWindows()


In [None]:
# # Từ thư mục yolov5
# python train.py \
#   --data data.yaml \
#   --weights yolov5s.pt \
#   --img 640 \
#   --batch 16 \
#   --epochs 200 \
#   --rect           # giữ 640x480, hạn chế padding
#   --project runs/train --name y5s_640x480 

In [None]:
# import os

# labels_dir = "/home/hiwe/project/arm_robot/yolov5/data/test/labels"
# images_dir = "/home/hiwe/project/arm_robot/yolov5/data/test/images"

# # Các định dạng ảnh bạn dùng (có thể thêm tùy dataset)
# image_exts = [".jpg", ".jpeg", ".png"]

# for txt_file in os.listdir(labels_dir):
#     if txt_file.endswith(".txt"):
#         txt_path = os.path.join(labels_dir, txt_file)
        
#         # Kiểm tra file rỗng
#         if os.path.getsize(txt_path) == 0:
#             # Xóa file .txt
#             os.remove(txt_path)
#             print("Đã xóa:", txt_path)

#             # Tìm ảnh cùng tên
#             base_name = os.path.splitext(txt_file)[0]
#             for ext in image_exts:
#                 img_path = os.path.join(images_dir, base_name + ext)
#                 if os.path.exists(img_path):
#                     os.remove(img_path)
#                     print("Đã xóa ảnh:", img_path)
#                     break
