In [1]:
import threading
from dobot_api import DobotApiDashboard, DobotApi, DobotApiMove, MyType
from time import sleep
import numpy as np
current_actual = None

In [3]:
# PC 로봇 연결함수 
def connect_robot(ip):
    try:# 예외 안생기면 진행.
        dashboard_p = 29999 
        move_p = 30003
        feed_p = 30004
        print("연결 설정 중")
        # DobotApi의 class 상속으로 DobotApi~~ class 작동함. 
        dashboard = DobotApiDashboard(ip,dashboard_p)
        move = DobotApiMove(ip, move_p)
        feed = DobotApi(ip,feed_p)
        print("연결 성공!!")
        
        return dashboard, move, feed
    except Exception as e: # Exception message를 e로 치환 raise 예외 ('에러메시지')
        print("연결 실패")
        raise e # Exception을 지정할 수 있음. # return 역할도 함. 

In [4]:
#로봇 에러 메시지 초기화 함수
def robot_clear(dashboard : DobotApiDashboard):
    dashboard.ClearError() # DobotApiDashboard에 있는 에러 새로 정의 

In [None]:
# 로봇 속도 조절 함수 1~100
def robot_speed(dashboard : DobotApiDashboard, speed_value):
    dashboard.SpeedFactor(speed_value)
    

In [None]:
# 그리퍼 구동함수
def gripper_DO(dashboard : DobotApiDashboard, index, status):
    dashboard.ToolDO(index, status)
    # Status 0 -> off, 1-> on

In [None]:
# 현재 로봇위치 받아오기 (로봇 Base 좌표계 기준)
def get_Pose(dashboard : DobotApiDashboard):
    dashboard.GetPose()

In [None]:
# 로봇 구동 함수 
def run_point(move: DobotApiMove, point_list : list):
    move.MovL(point_list[0],point_list[1],point_list[2],point_list[3])
    # 현재 위치 -> 목표위치 (point_list -> (x,y,z,yaw)

In [None]:
# 데이터 지속 수신, 조건 만족시 변수 업데이트
def get_feed(feed: DobotApi):
    global current_actual
    hasRead = 0
    
    while True:
        data = bytes()
        while hasRead < 1440: # data 길이 설정되있는값
            temp = feed.socket_dobot.recv(1440 - hasRead)# 읽었던 부분 check
            if len(temp) > 0:
                hasRead += len(temp)
                data += temp
        hasRead = 0
        a = np.frombuffer(data,dtype=MyType)
        
        if hex((a['test_value'][0])) == '0X123456789abcdef':
            current_actual = a["tool_vector_actual"][0] # Refresh Properties
        sleep(0.001)

In [None]:
# 로봇이 목표위치로 도달할때 까지 기다리는 함수
def wait_arrive(point_list):
    global current_actual
    while True:
        is_arrive = True
        if current_actual is not None:
            for index in range(4):
                if (abs(current_actual[index]-point_list[index]) > 1):
                    is_arrive = False
            if is_arrive:
                return
        sleep(0.001)

In [None]:
# 입력 파라미터 
ip = "192.168.1.6" # robot의 Ip 주소 (IP주소)
gripper_port = 1   # 그리퍼 포트번호 마그네틱 그리퍼 =1(그리퍼의 index 번호를 나타내며, 여러개의 그리퍼가 존재할 경우 선택해서 제어가 가능)
speed_value = 50   # 로봇 속도 (1~100 사이의 값 입력)

# 로봇이 이동하고자 하는 좌표 (x,y,z,yaw) unit: mm, degree
# point_home = [245,5,50,115]
# point_grip = [304,19,-56.5,16]
# point_parse = [255,-54,-5,115]

In [None]:
# 로봇 연결 
dashboard, move, feed = connect_robot(ip)
dashboard.EnableRobot()
print("이제 로봇을 사용할 수 있습니다!")

In [None]:
# 쓰레드 설정 
feed_thread = threading.Thread(target=get_feed,args=(feed,))
feed_thread.setDaemon(True)
feed_thread.start()

In [None]:
# 로봇 상태 초기화 1 : 로봇 에러 메시지 초기화
robot_clear(dashboard)

# 로봇 상태 초기화 2 : 로봇 속도조절
robot_speed(dashboard, speed_value)

# 로봇 현재 위치 받아오기 (x,y,z,yaw) - 로봇 베이스 좌표계
get_Pose(dashboard)

In [None]:
# 아루코 마커 탐지
import numpy as np
import cv2
import sys
from utils import ARUCO_DICT
import time

def pose_estimation(frame, aruco_dict_type, matrix_coefficients, distortion_coefficients):
    '''
    frame - Frame from the video stream
    matrix_coefficients - Intrinsic matrix of the calibrated camera
    distortion_coefficients - Distortion coefficients associated with your camera

    return:-
    frame - The frame with the axis drawn on it
    centers - List of tuples containing center coordinates of each detected ArUco marker
    ids - List of IDs of detected ArUco markers
    '''

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    aruco_dict = cv2.aruco.Dictionary_get(aruco_dict_type)
    parameters = cv2.aruco.DetectorParameters_create()

    corners, ids, rejected_img_points = cv2.aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
    
    centers = []  # List to store center coordinates of detected markers

    # If markers are detected
    if ids is not None and len(corners) > 0:
        for i in range(0, len(ids)):
            # Estimate pose of each marker and return the values rvec and tvec---(different from those of camera coefficients)
            rvec, tvec, markerPoints = cv2.aruco.estimatePoseSingleMarkers(corners[i], 0.02, matrix_coefficients, distortion_coefficients)

            # Draw a square around the markers
            cv2.aruco.drawDetectedMarkers(frame, corners)

            # Calculate center coordinates of the marker
            x_sum = corners[i][0][0][0] + corners[i][0][1][0] + corners[i][0][2][0] + corners[i][0][3][0]
            y_sum = corners[i][0][0][1] + corners[i][0][1][1] + corners[i][0][2][1] + corners[i][0][3][1]

            x_center_pixel = x_sum * 0.25
            y_center_pixel = y_sum * 0.25

            centers.append((x_center_pixel, y_center_pixel))

            # Draw a red dot at the center of the marker
            cv2.circle(frame, (int(x_center_pixel), int(y_center_pixel)), 5, (0, 0, 255), -1)

            # Write the ID of the marker near the center
            cv2.putText(frame, f"ID: {ids[i][0]}", (int(x_center_pixel), int(y_center_pixel)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2, cv2.LINE_AA)
        
        # Sort centers and ids by marker IDs
        # ids = 1 (red), ids = 2 (green), ids = 3 (yellow)
        centers_ids = sorted(zip(ids.flatten(), centers))
        ids, centers = zip(*centers_ids)
        ids = list(ids)
        centers = list(centers)

        print(centers)
        print(ids)
        
    return frame, centers, ids

# 직접 값을 설정합니다.
calibration_matrix_path = "calibration_matrix.npy"
distortion_coefficients_path = "distortion_coefficients.npy"
aruco_dict_type = "DICT_4X4_100"

# ArUco 사전 타입을 확인합니다.
if ARUCO_DICT.get(aruco_dict_type, None) is None:
    print(f"ArUCo tag type '{aruco_dict_type}' is not supported")
    sys.exit(0)

aruco_dict_type = ARUCO_DICT[aruco_dict_type]

# 캘리브레이션 매트릭스와 왜곡 계수를 로드합니다.
k = np.load(calibration_matrix_path)
d = np.load(distortion_coefficients_path)

# 비디오 캡처를 시작합니다.
video = cv2.VideoCapture(1)
time.sleep(2.0)

strat_time = time.time() # 시작 시간 기록

while True:
    ret, frame = video.read()

    if not ret:
        break

    output, centers, ids = pose_estimation(frame, aruco_dict_type, k, d)

    cv2.imshow('Estimated Pose', output)

   # 종료 조건은 ids의 길이가 3이 되면 종료
    if ids is not None and len(ids) == 3:
        print("마커 3개 감지 완료.")
        break

    current_time = time.time()
    if current_time - strat_time > 5.0:
        print("5초 동안 마커 3개 감지 실패.")
        break

    key = cv2.waitKey(1) & 0xFF
    if key == ord('q'):
        break

video.release()
cv2.destroyAllWindows()


In [None]:
# 좌표변환
import numpy as np

# 마커의 픽셀 좌표와 물리 좌표
pixel_coords = np.array([
    (313.0, 134.75),  # yellow_pixel
    (79.0, 319.75),   # green_pixel
    (541.5, 315.75)   # red_pixel
], dtype=np.float32)

world_coords = np.array([
    [330, -90],  # yellow_point
    [240, 10],   # green_point
    [320, 120]   # red_point
], dtype=np.float32)

def calculate_transformation_matrix(pixel_coords, world_coords):
    """
    주어진 픽셀 좌표와 물리 좌표를 사용하여 변환 행렬을 계산합니다.

    Args:
    pixel_coords (np.ndarray): 2D 픽셀 좌표 배열
    world_coords (np.ndarray): 2D 물리 좌표 배열

    Returns:
    np.ndarray: 변환 행렬
    """
    A = []
    B = []
    for i in range(len(pixel_coords)):
        px, py = pixel_coords[i]
        wx, wy = world_coords[i]
        A.append([px, py, 1, 0, 0, 0])
        A.append([0, 0, 0, px, py, 1])
        B.append(wx)
        B.append(wy)

    A = np.array(A)
    B = np.array(B)
    transform_vector = np.linalg.solve(A, B)
    transformation_matrix = transform_vector.reshape(2, 3)
    return transformation_matrix

def pixel_to_world(pixel, transformation_matrix):
    """
    픽셀 좌표를 물리 좌표로 변환합니다.

    Args:
    pixel (tuple): (x, y) 픽셀 좌표
    transformation_matrix (np.ndarray): 픽셀에서 물리 좌표로 변환하는 행렬

    Returns:
    np.ndarray: 물리 좌표
    """
    pixel_homogeneous = np.array([pixel[0], pixel[1], 1])
    world_coords = np.dot(transformation_matrix, pixel_homogeneous)
    return world_coords


In [None]:
# 변환 행렬 계산 및 z, yaw(고정값)추가
def calculate_new_world_coordinates(pixel_coords, transformation_matrix, additional_coords):
    new_world_coords = []
    for pixel_coord in pixel_coords:
        new_world_coord = pixel_to_world(pixel_coord, transformation_matrix)
        combined_coord = np.concatenate((new_world_coord, np.array(additional_coords)))
        new_world_coords.append(combined_coord.tolist())
    return new_world_coords

# 변환 행렬 계산
transformation_matrix = calculate_transformation_matrix(pixel_coords, world_coords)

# 새로운 픽셀 좌표 예시
centers = [centers[0], centers[1], centers[2]]

# 추가할 좌표
additional_coordinates_home = [50, 115]
additional_coordinates_grip = [-54, 115]

# 새로운 픽셀 좌표에 대한 월드 좌표 계산
new_world_coords_home = calculate_new_world_coordinates(centers, transformation_matrix, additional_coordinates_home)
new_world_coords_grip = calculate_new_world_coordinates(centers, transformation_matrix, additional_coordinates_grip)

# 결과 출력
for idx, (new_world_coord_home, new_world_coord_grip) in enumerate(zip(new_world_coords_home, new_world_coords_grip)):
    color = ['red', 'green', 'yellow'][idx]
    print(f"Home coordinates for pixel_{color}: {new_world_coord_home}")
    print(f"Grip coordinates for pixel_{color}: {new_world_coord_grip}")

# 리스트로 변환
home_points = new_world_coords_home
grip_points = new_world_coords_grip

print(home_points)
print(grip_points)

In [None]:
# 로봇 초기 위치 = 블럭 초기 위치 (home_point_int)
point_home = [239,58,50,115]

# 로봇 grip 위치 # 블럭은 위에서부터 노랑(ids=2), 빨강(ids=0), 초록(ids=1) 순으로 쌓여있다고 가정
point_home_first_grip = [239,58,-21,115]
point_home_second_grip = [239,58,-38,115]
point_home_third_grip = [239,58,-54,115]

In [None]:
# 블럭 이동 작업 수행(복구 작업 포함)
def move_and_wait(move, target_point):
    run_point(move, target_point)
    wait_arrive(target_point)
    sleep(1)

def operate_gripper(dashboard, gripper_port, state):
    gripper_DO(dashboard, gripper_port, state)
    sleep(1) # 슬립 추가를 위해 함수 재정의

def pick_and_place_block(move, dashboard, gripper_port, grip_point, home_point, target_point):
    # 블록 피킹
    move_and_wait(move, grip_point)
    
    # 마커 위치로 이동
    move_and_wait(move, home_point)
    
    # 마커에 놓기
    move_and_wait(move, target_point)
    
    # 그리퍼 끄기
    operate_gripper(dashboard, gripper_port, 0)

def pick_and_return_block(move, dashboard, gripper_port, target_point, home_point, grip_point):
    # 블록 다시 집기
    move_and_wait(move, target_point)
    operate_gripper(dashboard, gripper_port, 1)
    
    # 원래 위치로 이동
    move_and_wait(move, home_point)
    
    # 원래 위치에 놓기
    move_and_wait(move, grip_point)
    
    # 그리퍼 끄기
    operate_gripper(dashboard, gripper_port, 0)

def process_blocks(move, dashboard, gripper_port, grip_points, home_points, target_points):
    for i in range(len(grip_points)):
        pick_and_place_block(move, dashboard, gripper_port, grip_points[i], home_points[i], target_points[i])
        move_and_wait(move, point_home)
        operate_gripper(dashboard, gripper_port, 1)

def process_blocks_reverse(move, dashboard, gripper_port, grip_points, home_points, target_points):
    for i in reversed(range(len(grip_points))):
        pick_and_return_block(move, dashboard, gripper_port, target_points[i], home_points[i], grip_points[i])
        move_and_wait(move, point_home)
        operate_gripper(dashboard, gripper_port, 1)

# 초기 위치로 이동 및 그리퍼 작동
move_and_wait(move, point_home)
operate_gripper(dashboard, gripper_port, 1)

# 블록 피킹 및 놓기 작업 수행
grip_points = [point_home_first_grip, point_home_second_grip, point_home_third_grip]
home_points = [home_points[2], home_points[0], home_points[1]]
target_points = [grip_points[2], grip_points[0], grip_points[1]]

process_blocks(move, dashboard, gripper_port, grip_points, home_points, target_points)

# 블록 피킹 위치로 이동 (블럭을 원래 위치로 옮기기 위한 그리퍼 작동)
move_and_wait(move, point_home)
operate_gripper(dashboard, gripper_port, 1)

# 블록 원래 위치로 복귀 작업 수행
process_blocks_reverse(move, dashboard, gripper_port, grip_points, home_points, target_points)

# 최종 초기 위치로 이동(그리퍼 작동 종료)
move_and_wait(move, point_home)
operate_gripper(dashboard, gripper_port, 0)


In [None]:
# 로봇 끄기
dashboard.DisableRobot()