In [None]:
# Import Required Packages
import time, cv2, sys, os, torch, re
from threading import Thread
from djitellopy import Tello
import openai
import urllib.request
import numpy as np
import random as r
from transformers import ViltProcessor, ViltForQuestionAnswering

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

# Configure Packages
openai.api_key = os.getenv("OPENAI_API_KEY")

module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path+"/prompts/")
    
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path+"/Scripts/")
    
# Import Local Packages
from vqa_question.v1 import prompt_template as vqa_prompt_template
from drone_explore_prompt import prompt_template as explore_template
from drone_next_action.v1 import prompt_template as drone_prompt_template

from data_collection_utils import take_individual_picture, stream_video, stream_frames, record_streamed_frames

print("Imported all modules")

In [None]:
# Initialize YOLOv8
from ultralytics import YOLO

model = YOLO('yolov8n.pt')


In [None]:
def get_coords(obj_name, img_url, verbose=False):
    """
    Function to obtain the bounding box coordinates of an object in an image.

    Args:
        obj_name (Any Valid YOLO Class): The object of interest.
        img_url (str): Location of the image to perform analysis on.
        verbose (bool, optional): Whether or not to print out the names of all objects detected within the images. Defaults to False.

    Returns:
        result_tuple (tuple(int), tuple(int)): Bounding box coordinates.
    """
    start_time = time.time()
    pred = model(img_url)
    
    if verbose:
        print(pred)

    # get bouning box coordinates for object
    classes = pred[0].boxes.cls

    idx_to_name = pred[0].names

    names = [idx_to_name[int(i)] for i in classes]
    if obj_name not in names:
        if verbose:
            print(names)
        print(time.time() - start_time, 'seconds elapsed')
        return None, None
    
    idx_obj = names.index(obj_name)
    coords = pred[0].boxes.xyxy[idx_obj]
    if verbose:
        print(names)
    print(time.time() - start_time, 'seconds elapsed')
    
    result_tuple = (int(coords[0].item()), int(coords[1].item())), (int(coords[2].item()), int(coords[3].item()))
    return result_tuple

In [None]:
def plot_box(img, top_left, bottom_right):
    """
    Draw a bounding box on a given image.

    Args:
        img (any valid image object): Original image.
        top_left (tuple): Top left coordinates of the bounding box.
        bottom_right (tuple): Bottom right coordinates of the bounding box.
    """
    rect = cv2.rectangle(img, top_left, bottom_right, (0, 255, 0), 2)
    cv2.imshow("Video Feed", rect)
    cv2.waitKey(1)

In [None]:
# Set up Drone
drone = Tello()
drone.connect()

# Check Battery Levels
drone.get_battery()

# Initialize variables for bounding box coordinates
top_left, bottom_right = None, None

### Prompt Experiment

In [None]:
def prompt(prompt, stop_tokens = None):
    """
    Return an LLM Response to a Prompt.

    Args:
        prompt (str): Original LLM Prompt
        stop_tokens (list, optional): List of stop tokens for the LLM. Defaults to None.

    Returns:
        _type_: _description_
    """
    response = openai.Completion.create(
        engine="text-davinci-003",
        prompt=prompt,
        temperature=0.5,
        max_tokens=1000,
        top_p=1,
        frequency_penalty=0,
        presence_penalty=0,
        best_of=1,
        stop=stop_tokens,
    )
    return response.choices[0].text

In [None]:
# Initial Prompt Template for Simple Drone Flight
prompt_text="""You are writing code to control a drone.
Here is a list of commands:
    drone.move_left(X)  - move the drone left by X centimeters, where X is between 20 and 500
    drone.move_right(X) - move the drone right by X centimeters, where X is between 20 and 500
    drone.move_forward(X) - move the drone forward by X centimeters, where X is between 20 and 500
    drone.move_back(X) - move the drone back by X centimeters, where X is between 20 and 500
    drone.takeoff() - lift off the drone
    drone.land() - land the drone
    drone.rotate_clockwise(X) - rotate the drone clockwise by X degrees, where X is between 1 and 360
    drone.rotate_counter_clockwise(X) - rotate the drone counter-clockwise by X degrees, where X is between 1 and 360

Write the code needed for an algorithm to $objective
You need to start by taking off with the drone.takeoff() command and end by landing with the drone.land() command.
Please insert helpful print statements to document the progress towards the objective.

Code:
"""

In [None]:
# Updated Prompt Template which incorporates object detection results to modify trajectory
prompt_image="""You are writing code to control a drone.
Here is a list of commands:
    drone.move_left(X)  - move the drone left by X centimeters, where X is between 20 and 50
    drone.move_right(X) - move the drone right by X centimeters, where X is between 20 and 50
    drone.move_forward(X) - move the drone forward by X centimeters, where X is between 20 and 50
    drone.move_back(X) - move the drone back by X centimeters, where X is between 20 and 50
    drone.takeoff() - lift off the drone
    drone.land() - land the drone
    drone.rotate_clockwise(X) - rotate the drone clockwise by X degrees, where X is between 1 and 360
    drone.rotate_counter_clockwise(X) - rotate the drone counter-clockwise by X degrees, where X is between 1 and 360
    object_detect(X) - takes in the name of an object X, and returns True or False depending on whether the object is in the frame. valid values for X are [bottle, person, chair]

Write the code needed for an algorithm to $objective
You need to start by taking off with the drone.takeoff() command and end by landing with the drone.land() command.
Please insert helpful print statements to document the progress towards the objective.

Code:
"""

In [None]:
def object_detect(obj: str) -> bool:
    """
    Detects an object in the frame and returns True or False depending on whether the object is in the frame. If true, the function draws a bounding box around the object of interest.

    Args:
        obj (str): The name of the object of interest.

    Returns:
        bool: Whether or not the object of interest is within the image frame.
    """
    # Initialize drone stream
    drone.streamon()
    time.sleep(10)
    
    # Obtain live image from drone
    img = drone.get_frame_read().frame
    img = cv2.resize(img, (360, 240))

    # Obtain bounding box coordinates for object and display image + bounding box coordinates
    top_left, bottom_right = get_coords(obj, img)
    print('coords', top_left, bottom_right)
    cv2.imshow("image", img)
    cv2.waitKey(1)
    
    # Plot bounding box if the object is in the frame
    if top_left is not None:
        print("Bottle Found!")
        plot_box(img, top_left=top_left, bottom_right=bottom_right)
        time.sleep(10)
        return True
    else:
        return False

In [None]:
# Test object detection on a bottle in a sample frame
object_detect("bottle")

In [None]:
# Prompt the LLM with an objective and print out the resulting LLM-generated action code
code_str = prompt(prompt_image.replace("$objective", "find the bottle"))
print(code_str)

In [None]:
# Execute the LLM-generated action code
exec(code_str)

### Flight Tests

In [None]:
drone.takeoff() # launch drone

drone.get_battery() # obtain battery levels

In [None]:
# Helper Functions
def find_center_coord_for_frame_size(size=(360, 240)):
    """
    Find the center coordinates for a frame of a given size.

    Args:
        size (tuple, optional): Image Frame Size. Defaults to (360, 240).

    Returns:
        tuple(int): X and Y coordinates of the center.
    """
    return (size[0]//2, size[1]//2)


def find_center(top_l: tuple, bottom_r: tuple) -> tuple:
    """
    Find the center coordinates of a bounding box, given the top left and bottom right coordinates

    Args:
        top_l (tuple): Top left coordinates of the bounding box.
        bottom_r (tuple): Bottom right coordinates of the bounding box.

    Returns:
        tuple: Coordinates of the center of the bounding box.
    """
    top_w, top_h = top_l
    bottom_w, bottom_h = bottom_r
    center_x = ((bottom_w - top_w) // 2) + top_w
    center_y = ((top_h - bottom_h) // 2) + bottom_h

    return (center_x, center_y)


def get_bbox_width_height(top_l: tuple, bottom_r: tuple) -> tuple:
    """
    Find the width and height of a bounding box given the top left and bottom right coordinates.

    Args:
        top_l (_type_): Top left coordinates of the bounding box.
        bottom_r (_type_): Bottom right coordinates of the bounding box.

    Returns:
        _type_: _description_
    """
    top_w, top_h = top_l
    bottom_w, bottom_h = bottom_r
    return (bottom_w - top_w, top_h - bottom_h)


def reorient(frame_center: tuple, bbox_center: tuple) -> None:
    """
    Function to reorient the drone reference frame to be centered at the center of the bounding box by moving the drone.

    Args:
        frame_center (tuple): Center of the drone's image frame.
        bbox_center (tuple): Center of the bounding box of the object of interest.
        
    Returns:
        None
    """
    frame_x, frame_y = frame_center
    bbox_x, bbox_y = bbox_center

    if bbox_x > frame_x:
        print("Move Right")
        move = (bbox_x - frame_x)
        print(move)
        drone.move_right(move)
        
    elif bbox_x < frame_x:
        print("Move Left")
        move = (frame_x - bbox_x)
        print(move)
        drone.move_left(move)

    if bbox_y > frame_y:
        print("Move Down")
        move = (bbox_y - frame_y)
        print(move)
        drone.move_down(move)

    elif bbox_y < frame_y:
        print("Move Up")
        move = (frame_y - bbox_y)
        print(move)
        drone.move_up(move)

In [None]:
def exploit(top_left, bottom_right):
    """
    Once a drone has locked on to an object or task of interest, this function is called to execute the prompted task. Currently, supported tasks include searching for an object and moving towards it.

    Args:
        top_left (tuple): Top left coordinates of the bounding box of the object of interest.
        bottom_right (tuple): Bottom right coordinates of the bounding box of the object of interest.
        
    Returns:
        None
    """
    # Align drone with object
    frame_center = find_center_coord_for_frame_size((360, 240))
    print('frame center', frame_center)
    bbox_center = find_center(top_left, bottom_right)
    print('box center', bbox_center)
    reorient(frame_center, bbox_center)

    # Go to object
    width, height = get_bbox_width_height(top_left, bottom_right)
    while width < 360 * 0.8 and height < 240 * 0.8:
        print('move forward')
        # drone.move_forward(10)
        time.sleep(1)

In [None]:
def track():
    """
    Exploration-Exploitation Function: If the object of interest is detected after prompting, the drone takes off and flies towards it.
    """
    drone.streamon()
    time.sleep(10)
    while True:
        img = drone.get_frame_read().frame
        img = cv2.resize(img, (360, 240))
        top_left, bottom_right = get_coords("bottle", img)
        cv2.imshow("image", img)
        cv2.waitKey(1)
        if top_left is not None:
            print("Bottle Found!")
            plot_box(img, top_left=top_left, bottom_right=bottom_right)
            time.sleep(1)
            drone.takeoff()
            exploit(top_left=top_left, bottom_right=bottom_right)
        time.sleep(1)

In [None]:
# Test exploration-exploitation
if 0: # Tracking thread
    trk = Thread(target=track)
    trk.start()
if 1:
    track() # track objects in observation space

In [None]:
# Legacy Functions (1st Implementation)
def find_corners(upper_left: tuple, length: float, width: float) -> list:
    """
    Find the 4 corners of a bounding box, given the upper left corner, length, and width.

    Args:
        upper_left (tuple): Upper left coordinates of the bounding box.
        length (float): Length of the bounding box
        width (float): Width of the bounding box

    Returns:
        corners (list): A list containing the coordinates of the 4 corners of the bounding box.
    """
    corners = []
    
    upper_right = (upper_left[0] + length, upper_left[1])
    lower_left =  (upper_left[0], upper_left[1] + width)
    lower_right = (upper_left[0] + length, upper_left[1] + width)
    
    corners.append(upper_left)
    corners.append(upper_right)
    corners.append(lower_left)
    corners.append(lower_right)
    
    return corners
    

def find_center(corners: list) -> tuple:
    """
    Find the center coordinates of a bounding box, given the 4 corners of the bounding box.

    Args:
        corners (list): A list containing the coordinates of the 4 corners of the bounding box.
        
    Returns:
        tuple: Coordinates of the center of the bounding box.
    """
    upper_left, upper_right, lower_left, lower_right = corners
    
    # create diagonal using upper left and lower right
    center_x = (upper_left[0] + upper_right[0]) / 2
    center_y = (upper_left[1] + lower_right[1]) / 2
    
    return(center_x, center_y)


def compact_exploit(drone, bbox): # prompt_template = explore_template):
    """
    Legacy compact implementation of the Exploit Function: Given the bounding box of a target of interest, execute two transformations to move closer to the object

    Args:
        drone: Tello (djiteleopy) object that specifies actions to be taken on a DJI Tello drone over WiFi and offers data streaming capabilities.
        bbox (list): A list containing the 4 corners of a bounding box.
        
    Returns:
        None
    """
    # Translate the object to the center
    bounding_box_center = find_center(bbox)
    current_image = drone.get_frame_read().frame
    height, width, channel = current_image.shape
    frame_center = (width / 2, height / 2)
    
    delta_y = frame_center[1] - bounding_box_center[1]
    delta_x = frame_center[0] - bounding_box_center[0]
    
    if delta_y < 0:
        drone.move_up(abs(delta_y))
    else:
        drone.move_down(abs(delta_y))
    
    if delta_x < 0:
        drone.move_left(abs(delta_x))
    else:
        drone.move_right(abs(delta_x))
    
    # Zoom In (move towards the object)
    bbox_length = bbox[1][0] - bbox[0][0]
    bbox_width = bbox[1][1] - bbox[0][1]
    
    while bbox_width <= 100 or bbox_length <= 100:
        drone.move_forward(10)
    
    # Terminate
    return

In [None]:
# Initialize Pre-Trained VQA Model
processor = ViltProcessor.from_pretrained("dandelin/vilt-b32-finetuned-vqa")
model = ViltForQuestionAnswering.from_pretrained("dandelin/vilt-b32-finetuned-vqa")

In [None]:
# Set up the Drone
objective = "Find the person who is wearing a blue cap" # initialize LLM objective prompt

drone.connect() # Establish network connection

take_individual_picture(drone) # take an individual picture to test imaging capabilities

# Begin a thread and stream video frame by frame
stream = Thread(target=stream_frames, args=(drone, False))
stream.start()

# Enable drone streaming and launch
drone.streamon()
drone.takeoff()

In [None]:
def query_the_environment(drone, question):
    """
    Given any image frame streamed from the drone, this function uses VQA to identify objects of interest specified by a prompt question.

    Args:
        drone: Tello (djiteleopy) object that specifies actions to be taken on a DJI Tello drone over WiFi and offers data streaming capabilities.
        question (str): LLM objective prompt - high level task that is to be completed.

    Returns:
        str: Label of the object of interest specified by the question in the image
    """
    image = drone.get_frame_read().frame
    encoding = processor(image, question, return_tensors="pt")
    outputs = model(**encoding)
    logits = outputs.logits
    idx = torch.sigmoid(logits).argmax(-1).item()
    return str(model.config.id2label[idx])

In [None]:
def next_action_prompt(prompt_template, objective, environment_context, previous_commands, previous_context, vqa_questions):
    """
    Generate a complete action prompt for the LLM given an objective, environment context, previous commands, previous context, and VQA questions.

    Args:
        prompt_template (str): LLM Prompt Template.
        objective (str): The objective that the drone should complete.
        environment_context (str): Environmental Context.
        previous_commands (str): Previous commands given to the drone, converted into string form such that it can be added to the prompt.
        previous_context (str): Previous environemal context understood by the drone.
        vqa_questions (str): Questions for Visual Question-Answering.

    Returns:
        tuple(str, list): Prompt text and list containing stop tokens
    """
    prompt = prompt_template.replace("$objective", objective)
    prompt = prompt.replace("$context", environment_context)
    prompt = prompt.replace("$previous_commands", previous_commands)
    prompt = prompt.replace("$previous_context", previous_context)
    prompt = prompt.replace("$vqa_questions", vqa_questions)
    return prompt, ["\n"] # prompt and stop token

In [None]:
def prompt(prompt, stop_tokens):
    """
    Pass a prompt through an LLM model and return the output.

    Args:
        prompt (str): Prompt to be passed through the LLM model.
        stop_tokens (list): List of stop tokens for the LLM to parse through.

    Returns:
        str: LLM Prompt Response
    """
    response = openai.Completion.create(
        engine="text-davinci-003",
        prompt=prompt,
        temperature=0.5,
        max_tokens=1000,
        top_p=1,
        frequency_penalty=0,
        presence_penalty=0,
        best_of=10,
        stop=stop_tokens,
    )
    return response.choices[0].text

In [None]:
def get_vqa_questions(vqa_prompt_template, objective):
    """
    Generate questions for VQA automatically given a prompt template and a user-specified objective.

    Args:
        vqa_prompt_template (str): Prompt template for VQA questions.
        objective (str): User-specified objective that the drone is tasked to complete.

    Returns:
        matches (list): A list of questions that are used to query the VQA model.
    """
    prompt_text = vqa_prompt_template.replace("$objective", objective)
    result = prompt(prompt_text, ["&&&&&&"])
    matches = re.findall(r'@(.+?)@', result)
    return matches

In [None]:
# Generate VQA Questions: test
get_vqa_questions(vqa_prompt_template, objective)

In [None]:
def explore(drone, use_gpt=False, prompt_template=explore_template):
    """
    Explore Function: Survey the room via a random though comprehensive flight path to find objects of interest as specified by the prompt.

    Args:
        drone: Tello (djiteleopy) object that specifies actions to be taken on a DJI Tello drone over WiFi and offers data streaming capabilities.
        use_gpt (bool, optional): Whether or not to use a GPT-type LLM to parse through the prompt. Defaults to False.
        prompt_template (str, optional): Prompt template for the specified objective. Defaults to explore_template.
        
    Returns:
        None
    """
    # Generate Prompt Response
    if use_gpt:
       prompt(prompt_template)
    else:
        # randomly generate a number between 45 and 180
        random_angle = r.randint(45, 180)
        
        # randomly generate a number between 5 and 20
        random_distance = r.randint(5, 20)
        action_space = [
            (drone.move_left, random_distance),
            (drone.move_right, random_distance),
            (drone.rotate_clockwise, random_angle),
            (drone.rotate_counter_clockwise, random_angle),
            (drone.move_forward, random_distance),
            (drone.move_back, random_distance),
        ]
        
        # sample 3 actions at random
        sampled_actions = r.sample(action_space, 3)
        for action in sampled_actions:
            action[0](action[1])
            time.sleep(0.5)

In [None]:
# Get the 4 points of a bounding box
def find_corners(upper_left: tuple, length: float, width: float) -> list:
    """
    Find the 4 corners of a bounding box, given the upper left corner, length, and width.

    Args:
        upper_left (tuple): Upper left coordinates of the bounding box.
        length (float): Length of the bounding box
        width (float): Width of the bounding box

    Returns:
        corners (list): A list containing the coordinates of the 4 corners of the bounding box.
    """
    corners = []
    
    upper_right = (upper_left[0] + length, upper_left[1])
    lower_left =  (upper_left[0], upper_left[1] + width)
    lower_right = (upper_left[0] + length, upper_left[1] + width)
    
    corners.append(upper_left)
    corners.append(upper_right)
    corners.append(lower_left)
    corners.append(lower_right)
    
    return corners

    
# Find the center of a bounding box
def find_center(corners: list) -> tuple:
    """
    Find the center coordinates of a bounding box, given the 4 corners of the bounding box.

    Args:
        corners (list): A list containing the coordinates of the 4 corners of the bounding box.
        
    Returns:
        tuple: Coordinates of the center of the bounding box.
    """
    upper_left, upper_right, lower_left, lower_right = corners
    
    # create diagonal using upper left and lower right
    center_x = (upper_left[0] + upper_right[0]) / 2
    center_y = (upper_left[1] + lower_right[1]) / 2
    
    return(center_x, center_y)


# Exploit Function: Input we have a bounding box with 4 corners (or 1 corner + length and width for a more compact representation)
def exploit(drone, bbox, use_gpt = False, prompt_template = explore_template):
    """
    Legacy implementation of the Exploit Function: Given the bounding box of a target of interest, execute two transformations to move closer to the object

    Args:
        drone: Tello (djiteleopy) object that specifies actions to be taken on a DJI Tello drone over WiFi and offers data streaming capabilities.
        bbox (list): A list containing the 4 corners of a bounding box.
        use_gpt (bool, optional): Whether or not to use GPT prompt for control. Defaults to False.
        prompt_template (str, optional): LLM prompt template. Defaults to explore_template.
        
    Returns:
        None
    """
    if use_gpt:
        prompt(prompt_template)
        
    distance_to_object = None
    angle_to_object = None
    height_to_object = None
    actions = [
        (drone.move_left, distance_to_object),
        (drone.move_right, distance_to_object),
        (drone.rotate_clockwise, angle_to_object),
        (drone.rotate_counter_clockwise, angle_to_object),
        (drone.move_forward, distance_to_object),
        (drone.move_back, distance_to_object) 
    ]
        
    # Step 1: Find out whether or not the drone is facing the object
    object_in_frame = False
    while object_in_frame == False:
        drone.rotate_clockwise(30)
    
    
    # Step 2: Translate the object to the center
    bounding_box_center = find_center(bbox)
    current_image = drone.get_frame_read().frame
    height, width, channel = current_image.shape
    frame_center = (width / 2, height / 2)
    
    delta_y = frame_center[1] - bounding_box_center[1]
    delta_x = frame_center[0] - bounding_box_center[0]
    
    if delta_y < 0:
        drone.move_up(abs(delta_y))
    else:
        drone.move_down(abs(delta_y))
    
    if delta_x < 0:
        drone.move_left(abs(delta_x))
    else:
        drone.move_right(abs(delta_x))
    
    # Step 3: Zoom In (move towards the object)
    bbox_length = bbox[1][0] - bbox[0][0]
    bbox_width = bbox[1][1] - bbox[0][1]
    
    while bbox_width <= 100 or bbox_length <= 100:
        drone.move_forward(10)
    
    # Step 4: Terminate
    return