# Imports

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw
import cv2
import os
from glob import glob

import time
from tiki.mini import TikiMini
import time 
tiki = TikiMini()
tiki.set_motor_mode(tiki.MOTOR_MODE_PID)

import ipywidgets as widgets
from IPython.display import display, clear_output
import threading

# Lane Detection

In [2]:
def histogram_lane_detection(binary_warped):
    histogram = np.sum(binary_warped[binary_warped.shape[0]//2:, :], axis=0)

    midpoint = histogram.shape[0] // 2
    left_peak = np.argmax(histogram[:midpoint])
    right_peak = np.argmax(histogram[midpoint:]) + midpoint


    offset = 30
    # print(left_peak, right_peak)
    #print(left_peak, right_peak)
    if (left_peak < 150 - offset or left_peak > 210 + offset):
        left_peak = right_peak - 60
    if (right_peak > 210 - offset or right_peak < 210 + offset):
        right_peak = left_peak + 60

    return left_peak, right_peak

def hough_transform(image):
    # Grayscale 변환
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Canny 엣지 검출
    edges = cv2.Canny(gray, 50, 150)

    # Hough Line Transform
    lines = cv2.HoughLinesP(edges, 1, np.pi / 180, 50, minLineLength=30, maxLineGap=10)

    if lines is not None:
        for line in lines:
            x1, y1, x2, y2 = line[0]
            cv2.line(image, (x1, y1), (x2, y2), (0, 255, 0), 2)

    return image, lines

def draw_center_line(image, left_lane, right_lane):
    # 이미지 크기
    height, width, _ = image.shape

    # 왼쪽과 오른쪽 차선의 x 좌표
    left_x = left_lane
    right_x = right_lane

    # 두 차선의 가운데 x 좌표 계산
    center_x = (left_x + right_x) // 2

    # 가운데 선의 시작점과 끝점 정의
    start_point = (center_x, 4*height//5)          # 하단 중앙
    end_point = (center_x, height)       # 중간 지점

    # 가운데 선 그리기
    result_image = image.copy()
    cv2.line(result_image, start_point, end_point, (0, 255, 0), 5)  # 녹색 선

    return result_image

def find_contours_canny(image):
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)
    edges = cv2.Canny(blurred, 50, 150)

    contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    cv2.drawContours(image, contours, -1, (0, 255, 0), 2)
    return image, edges

def convert_to_bytes(image):
    _, buffer = cv2.imencode('.jpg', image)
    return buffer.tobytes()

def direction(frame):   #@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    height, width, _ = frame.shape
    roi_height = int(height / 5)
    roi = frame[4 * roi_height: height, :]

    result_frame, _ = find_contours_canny(roi)

    extended_frame = frame.copy()
    extended_frame[4 * roi_height:height, :] = result_frame


    # 히스토그램 기반 차선 위치 계산
    left_lane, right_lane = histogram_lane_detection(_)

    middle_lane = left_lane + (right_lane - left_lane)/2
    middle_tank = width / 2
    val = middle_tank - middle_lane
    #print(val)

    if left_lane < 50: #@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
        return "Forward"
    if right_lane > 300: 
        return "Forward"
    if np.abs(val) > 100: 
        return "Forward" 

    tank_threshold = width*0.07
    if np.abs(val) < tank_threshold:
        #print("Forward")
        return "Forward"
    else:
        if val > 0: 
            #print("Left")
            return "Left"
        else: 
            #print("Right")
            return "Right"

# Object Detection

In [3]:
import torch
from torchvision import transforms
from efficientnet_pytorch import EfficientNet
from ultralytics import YOLO

yolo_model_path = "Misc Files/runs/detect/train/weights/best_v8n.pt"

# YOLO 모델 로드 (YOLOv8)
device = torch.device("cuda")
model = YOLO(yolo_model_path).to(device = device)

# YOLO 예측 및 EfficientNet 적용
def run_inference(image, region_name):
    """
    YOLO로 바운딩 박스를 예측하고 각 객체를 EfficientNet으로 분류.
    Args:
        image_path (str): 입력 이미지 경로
    Returns:
        list: 예측 결과 (클래스 ID 및 신뢰도)
    """
    # YOLO 예측
    results = model.predict(image, save=False, device = device)  #conf=0.3, 

    # 바운딩 박스 및 클래스 정보
    class_ids = results[0].boxes.cls.cpu().numpy()  # 클래스 ID

    prediction_dict = dict()
    prediction_dict["region_name"] = region_name
    prediction_dict["enemy_tank"] = 0
    prediction_dict["ally_tank"] = 0
    prediction_dict["enemy_person"] = 0
    prediction_dict["ally_person"] = 0

    for predicted_class in class_ids:
        if predicted_class == 0:
            prediction_dict["enemy_tank"] += 1
        elif predicted_class == 1:
            prediction_dict["ally_tank"] += 1
        elif predicted_class == 2:
            prediction_dict["enemy_person"] += 1
        elif predicted_class == 3:
            prediction_dict["ally_person"] += 1
    if prediction_dict["enemy_tank"] != 0:
        attack_tank()
            
    #print("enemy_tank", pred_enemy_tank)
    #print("ally_tank", pred_ally_tank)
    #print("enemy_person", pred_enemy_person)
    #print("ally_person", pred_ally_person)
    #print("---------------------------------------------------")
    return prediction_dict

def attack_tank():
    tiki.play_buzzer(500)
    time.sleep(1)
    tiki.stop_buzzer()
    tiki.fire_cannon()
    #tiki.log("fire tank!")
        

def print_log(target_list):
    for eval_target in target_list:
        log_msg = eval_target["region_name"] + " AF: " + str(eval_target["ally_person"]) + " EF: " + str(eval_target["enemy_person"])
        tiki.log(log_msg)


#-----------------------------------------------------------------------------------------------------
def region_evaluate(feed, region_name):
    """
    All image analyses
    """
    prediction_dict = run_inference(feed, region_name)

    return prediction_dict

# Hardware

In [4]:
cap = cv2.VideoCapture(
    "nvarguscamerasrc ! video/x-raw(memory:NVMM), width=640, height=480, framerate=30/1, format=NV12 ! "
    "nvvidconv flip-method=2 ! video/x-raw, format=BGRx ! videoconvert ! video/x-raw, format=BGR ! appsink max-buffers=1 drop=True")
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 320)  # 가로 해상도 설정
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 240)  # 세로 해상도 설정

GST_ARGUS: Creating output stream
CONSUMER: Waiting until producer is connected...
GST_ARGUS: Available Sensor modes :
GST_ARGUS: 3264 x 2464 FR = 21.000000 fps Duration = 47619048 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 3264 x 1848 FR = 28.000001 fps Duration = 35714284 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1920 x 1080 FR = 29.999999 fps Duration = 33333334 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1640 x 1232 FR = 29.999999 fps Duration = 33333334 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1280 x 720 FR = 59.999999 fps Duration = 16666667 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1280 x 720 FR = 120.000005 fps Duration = 8333333 ; Analog Gain range min 1.000000, max 10.625000; Exposur



False

In [5]:
ratio = 3/3  #RATIO MUST BE 2/3 FOR THE BIG CURVEs
default_rpm = 30*ratio
default_speed = 220*ratio #mm per secondS



def forwards(distance):
    """
    Distance units in mm
    """
    duration = distance/default_speed
    tiki.forward(default_rpm)
    time.sleep(duration)
    tiki.stop()

def backwards(distance):
    """
    Distance units in mm
    """
    duration = distance/default_speed
    tiki.backward(default_rpm)
    time.sleep(duration)
    tiki.stop()

def anticlockwise_turn(radius, delta_angle=90):
    """
    Radius units in mm
    """
    if radius == 0:
        tiki.counter_clockwise(default_rpm)
        time.sleep((100*3.14*2/default_speed)*(delta_angle/360))
        tiki.stop() 
    else:
        forwards(radius)
        anticlockwise_turn(0, delta_angle=90)
        forwards(radius)

def clockwise_turn(radius, delta_angle=90):
    """
    Radius units in mm
    """
    if radius == 0:
        tiki.clockwise(default_rpm)
        time.sleep((100*3.14*2/default_speed)*(delta_angle/360))   # 110 for 4/3,   96 for 2/3
        tiki.stop() 
    else:
        forwards(radius)
        anticlockwise_turn(0, delta_angle=90)
        forwards(radius)






def follow_line(distance, cap, offset_multiplier = 0.1, custom_ratio = None):
    """
    Distance units in mm
    """
    distance = distance*(60/400)*(110/100)
    
    if custom_ratio is None:
        speed = default_speed
        rpm = default_rpm
    else:
        rpm = 30*custom_ratio
        speed = 220*custom_ratio #mm per seconds
    
    total_duration = distance/speed
    subduration = 0.005
    offset = rpm*offset_multiplier
    instances = int(total_duration/subduration)

    for _ in range(instances):
        _, feed = cap.read() #@@@@@@@@@@@@@@@@@@@@@@@@@@ EXPERIMENTAL
        required_direction = direction(feed)
        if required_direction == "Forward":
            tiki.forward(rpm)
        elif required_direction == "Left": 
            tiki.set_motor_power(tiki.MOTOR_LEFT, rpm-offset)
            tiki.set_motor_power(tiki.MOTOR_RIGHT, rpm+offset)
        elif required_direction == "Right": 
            tiki.set_motor_power(tiki.MOTOR_LEFT, rpm+offset)
            tiki.set_motor_power(tiki.MOTOR_RIGHT, rpm-offset)          
        time.sleep(subduration)
    tiki.stop()




def square_detected(image):
    """
    For the sake of reproducibility, make sure this returns 'True'
    only when the center (M00 moment) of the square is within
    some fixed distance away from the tank

    Also, add a delay such double detections for a single 
    square doesn't occur
    """
    height, width, _ = image.shape
    roi = image[int(height/2):height, int(width*(0.5-0.2)): int(width*(0.5+0.2)), :]

    hsv_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
    lower_green = np.array([25, 40, 0], dtype=np.uint8)       #loose: [35, 50, 50],   tight: [40, 100, 150]
    upper_green = np.array([85, 255, 255], dtype=np.uint8)     #loose: [85, 255, 255], tight: [70, 255, 255]
    color_mask = cv2.inRange(hsv_roi, lower_green, upper_green)
    masked_roi = cv2.bitwise_and(roi, roi, mask=color_mask)
    gray_mask = cv2.cvtColor(masked_roi, cv2.COLOR_BGR2GRAY)
    _, binary_mask = cv2.threshold(gray_mask, 1, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    area_list = [cv2.contourArea(x) for x in contours]
    
    if not area_list: 
        #print("Green NOT Detected 1")
        return False
    
    max_area = max(area_list)
    if max_area > 500: 
        #print("Green Detected")
        return True #초록색만나면(최대한가깝게 인식하게 area수정)
    else:
        #print("Green NOT Detected 2")
        return False

# Testcode

In [24]:
#400/60

#clockwise_turn(0, delta_angle=90)
#follow_line(500, cap)
tiki.log_clear()
"""
a = []
while cap.isOpened():
    ret, frame = cap.read()
    a.append(region_evaluate(frame, "A")) 
    print_log(a)
    break
"""

clockwise_turn(0)


In [8]:
tiki.stop()
tiki.log_clear()
#cap.release()

# Official

In [9]:
import copy

crossroad_counter = 1
square_counter = 1

count_ally_person = []
count_enemy_person = []
target_list = []

square_counter = 1    #@@@@@@@@@@@@@@@@@@@@@@@@@@@@

tiki.log_clear()
while cap.isOpened():
    ret, feed = cap.read()
    
    if square_detected(feed):
        if square_counter == 1:
            #-----------------------------------------------------------------------------------
            backwards(250+100)
            follow_line(100, cap)

            anticlockwise_turn(0)
            follow_line(400, cap)    #   70
            anticlockwise_turn(0)

            time.sleep(5)
            _, feed1 = cap.read()
            region_A = region_evaluate(feed1, "A")                   #EVALUATION
            target_list.append(region_A)
            
            anticlockwise_turn(0)
            follow_line(460, cap)
            
            anticlockwise_turn(0)  
            follow_line(500, cap)
            #-----------------------------------------------------------------------------------
            square_counter += 1
            continue
        elif square_counter == 2:
            #-----------------------------------------------------------------------------------
            anticlockwise_turn(0, delta_angle = 5+90)   

            time.sleep(5)
            _, feed2 = cap.read()
            region_B = region_evaluate(feed2, "B")                     #EVALUATION
            target_list.append(region_B)
            
            clockwise_turn(0, delta_angle = 5+90+10)  
            follow_line(1200, cap, offset_multiplier = 0.3, custom_ratio = 2/3)
            #-----------------------------------------------------------------------------------
            square_counter += 1
            continue
        elif square_counter == 3:
            #-----------------------------------------------------------------------------------
            backwards(200+100)
            follow_line(100, cap)
            
            clockwise_turn(0)
            follow_line(333, cap)
            #-----------------------------------------------------------------------------------
            square_counter += 1
            continue
        elif square_counter == 4:
            #-----------------------------------------------------------------------------------
            anticlockwise_turn(0, delta_angle = 5+90) 

            time.sleep(5)
            _, feed4 = cap.read()
            region_C = region_evaluate(feed4, "C")                     #EVALUATION
            target_list.append(region_C)
            
            clockwise_turn(0, delta_angle = 5+90)  
            follow_line(333, cap)
            #-----------------------------------------------------------------------------------
            square_counter += 1
            continue
        elif square_counter == 5:
            #-----------------------------------------------------------------------------------
            anticlockwise_turn(0, delta_angle = 5+90)  

            time.sleep(5)
            _, feed5 = cap.read()
            region_D = region_evaluate(feed5, "D")                     #EVALUATION
            target_list.append(region_D)
            
            clockwise_turn(0, delta_angle = 5+90)  
            
            forwards(570) #may need to reduce
            anticlockwise_turn(0)
            follow_line(267, cap)
            #-----------------------------------------------------------------------------------
            square_counter += 1
            continue
#-----------------------------------------------------------------------------------
        else:
          tiki.stop()
          print_log(target_list)
          break
#-----------------------------------------------------------------------------------
    else:
        subduration = 0.005
        offset = default_rpm*0.2
        
        required_direction = direction(feed)
        if required_direction == "Forward":
            tiki.forward(default_rpm)
        elif required_direction == "Left": 
            tiki.set_motor_power(tiki.MOTOR_LEFT, default_rpm-offset)
            tiki.set_motor_power(tiki.MOTOR_RIGHT, default_rpm+offset)
        elif required_direction == "Right": 
            tiki.set_motor_power(tiki.MOTOR_LEFT, default_rpm+offset)
            tiki.set_motor_power(tiki.MOTOR_RIGHT, default_rpm-offset)          
        time.sleep(subduration)


0: 480x640 1 enemy_tank, 245.4ms
Speed: 22.9ms preprocess, 245.4ms inference, 8.5ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 enemy_tank, 1 enemy_person, 2 ally_persons, 240.8ms
Speed: 17.7ms preprocess, 240.8ms inference, 7.6ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 enemy_tank, 2 ally_persons, 247.9ms
Speed: 21.9ms preprocess, 247.9ms inference, 6.5ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 enemy_tank, 1 ally_tank, 2 enemy_persons, 212.9ms
Speed: 26.9ms preprocess, 212.9ms inference, 8.4ms postprocess per image at shape (1, 3, 480, 640)
