In [6]:
import cv2
import numpy as np
from sklearn.cluster import DBSCAN
from pupil_apriltags import Detector
import time

def apply_threshold_hsv(roi, hsv_color, threshold_h=10, threshold_s=35, threshold_v=80):
    """
    Applies a color threshold to a region of interest (ROI) in HSV color space.

    Parameters:
    - roi (array): The region of interest, an image array where the color thresholding will be applied.
    - hsv_color (tuple): The central HSV color around which the thresholding will be performed. It's a tuple
      of (Hue, Saturation, Value).
    - threshold_h (int, optional): The threshold for the Hue component. Defaults to 10.
    - threshold_s (int, optional): The threshold for the Saturation component. Defaults to 35.
    - threshold_v (int, optional): The threshold for the Value component. Defaults to 80.

    Returns:
    - array: A mask where pixels within the specified HSV threshold range are white (255) and others are black (0).
    """
    
    lower_bound = np.array([max(0, hsv_color[0] - threshold_h), max(0, hsv_color[1] - threshold_s), max(0, hsv_color[2] - threshold_v)])
    upper_bound = np.array([min(180, hsv_color[0] + threshold_h), min(255, hsv_color[1] + threshold_s), min(255, hsv_color[2] + threshold_v)])
    mask = cv2.inRange(roi, lower_bound, upper_bound)
    return mask

def find_clusters(mask):
    """
    Identifies clusters of non-zero pixels in a mask using the DBSCAN clustering algorithm.

    Parameters:
    - mask (array): A 2D numpy array where non-zero values represent pixels of interest.

    Returns:
    - dict: A dictionary where each key is a cluster index and the value is a list of [y, x] coordinates
      belonging to that cluster. Clusters are formed based on the spatial proximity of non-zero pixels
      in the mask. If no clusters are found, returns an empty dictionary.
    """
    
    y_coord, x_coord = np.where(mask != 0)
    
    if len(y_coord) == 0:
        return {}
    
    coord_array = np.stack((y_coord, x_coord), axis=-1)
    sorted_array = coord_array[coord_array[:, 1].argsort()]
    
    dbscan = DBSCAN(eps=5, min_samples=10)
    clusters = dbscan.fit_predict(sorted_array)
    
    cluster_dict = {}
    for point, cluster_idx in zip(sorted_array, clusters):
        if cluster_idx != -1:  # Ignore noise points which are labeled as -1.
            cluster_dict.setdefault(cluster_idx, []).append(point.tolist())
            
    return cluster_dict

def filter_noise_clusters(cluster_dict, size_threshold=(50, 300)):
    """
    Filters clusters based on size thresholds.

    Parameters:
    - cluster_dict (dict): A dictionary where each key represents a cluster index,
      and the value is a list of points belonging to that cluster.
    - size_threshold (tuple): A tuple of two integers where the first value is the minimum 
      number of points a cluster must have to be included, and the second value is the maximum 
      number of points a cluster can have to be included.

    Returns:
    - dict: A new dictionary containing only the clusters whose sizes are within the specified range.
    """
    
    filtered_clusters = {}
    lower_limit, upper_limit = size_threshold
    for key, points in cluster_dict.items():
        if lower_limit < len(points) < upper_limit:
            filtered_clusters[key] = points
            
    return filtered_clusters

def generate_error_bounds_for_clusters(cluster_dict_black, cluster_dict_white, initial_threshold=5):
    """
    Generates error bounds for black and white clusters based on their counts and an initial threshold.

    Parameters:
    - cluster_dict_black (dict): The black clusters dictionary.
    - cluster_dict_white (dict): The white clusters dictionary.
    - initial_threshold (int): The initial threshold for error calculation.

    Returns:
    - tuple: Contains two tuples for black and white error bounds, each with a list for lower and upper bounds.
    """
    error_lower_bound_black = [initial_threshold for _ in range(len(cluster_dict_black))]
    error_upper_bound_black = [initial_threshold + 15 for _ in range(len(cluster_dict_black))]
    
    error_lower_bound_white = [initial_threshold for _ in range(len(cluster_dict_white))]
    error_upper_bound_white = [initial_threshold + 12 for _ in range(len(cluster_dict_white))]

    black_error_bounds = (error_lower_bound_black, error_upper_bound_black)
    white_error_bounds = (error_lower_bound_white, error_upper_bound_white)
    
    return black_error_bounds, white_error_bounds

def calibrate_error_bounds(error_keys, error_bounds):
    """
    Adjusts error bounds based on the presence of keys in the error_keys list. If error_keys is not empty,
    increments the bounds for those keys. Otherwise, indicates that calibration is done.

    Parameters:
    - error_keys (list): A list of indices corresponding to clusters that met error criteria.
    - error_bounds (tuple): A tuple containing two lists (error_lower_bound, error_upper_bound) representing the current error bounds for filtering.

    Returns:
    - tuple: The updated error_bounds tuple after adjustment.
    """
    error_lower_bound, error_upper_bound = error_bounds
    if error_keys:
        for _, value in enumerate(error_keys):
            error_lower_bound[value] += 1
            error_upper_bound[value] += 1
        updated_bounds = (error_lower_bound, error_upper_bound)
    else:
        print("calibration_done:", error_lower_bound, error_upper_bound)
        # each value in error_lower_bound + something to notget other keys  
        updated_bounds = error_bounds  # No change if calibration done

    return updated_bounds

def find_pressed_keys(ref_distance, inf_distance, displacement_threshold):
    """
    Identifies keys that are considered pressed based on their displacement exceeding specified thresholds.

    Parameters:
    - ref_distance (list): A list of reference distances, typically representing the default position of each key.
    - inf_distance (list): A list of influenced distances, representing the current position of each key.
    - displacement_threshold (list): A list of threshold values for each key; a key is considered pressed if its
      displacement (ref_distance - inf_distance) is less than its corresponding threshold.

    Returns:
    - list: A list of indices representing keys that are considered pressed.
    """
    key_pressed = []
    
    if len(ref_distance) == len(inf_distance):
        print(ref_distance)
        print(inf_distance)
        displacement = ref_distance - inf_distance
        print("displacement", displacement)
    
        for i in range(len(displacement)):
            if displacement[i] < displacement_threshold[i]:
                key_pressed.append(i)

    return key_pressed

def reference_frame(frame, mask_bound, hsv_color_1, hsv_color_2, right_corner_coord, threshold=40):
    x1, y1, x2, y2 = mask_bound
    roi = frame[y1:y2, x1:x2]
    roi_hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)

    # black
    mask_1 = apply_threshold_hsv(roi_hsv, hsv_color_1)
    cluster_dict_1 = find_clusters(mask_1)

    # white
    mask_2 = apply_threshold_hsv(roi_hsv, hsv_color_2)
    cluster_dict_2 = find_clusters(mask_2)

    cluster_dict_1 = filter_noise_clusters(cluster_dict_1, size_threshold=(50, 300))
    cluster_dict_2 = filter_noise_clusters(cluster_dict_2, size_threshold=(50, 300))

    x_centroids_1, y_centroids_1, _ = process_clusters(cluster_dict_1)
    x_centroids_2, y_centroids_2, _ = process_clusters(cluster_dict_2)

    distances_1 = find_distance_corner(right_corner_coord, x_centroids_1, y_centroids_1)
    distances_2 = find_distance_corner(right_corner_coord, x_centroids_2, y_centroids_2)

    return roi, distances_1, distances_2

def inference_frame(inf_frame, mask_bound, hsv_color_1, hsv_color_2, ref_distance_1, ref_distance_2, right_corner_coord, roi, threshold=40, error_bound_1=(), error_bound_2=()):
    x1, y1, x2, y2 = mask_bound
    inf_roi = inf_frame[y1:y2, x1:x2]
    inf_roi_hsv = cv2.cvtColor(inf_roi, cv2.COLOR_BGR2HSV)

    # black
    mask_1 = apply_threshold_hsv(inf_roi_hsv, hsv_color_1)
    cluster_dict_1 = find_clusters(mask_1)
    cv2.imshow("mask 1", mask_1)
    
    # white
    mask_2 = apply_threshold_hsv(inf_roi_hsv, hsv_color_2)
    cluster_dict_2 = find_clusters(mask_2)
    cv2.imshow("mask 2", mask_2)

    cluster_dict_1 = filter_noise_clusters(cluster_dict_1, size_threshold=(50, 300))
    cluster_dict_2 = filter_noise_clusters(cluster_dict_2, size_threshold=(50, 300))

    x_centroids_1, y_centroids_1, _ = process_clusters(cluster_dict_1)
    x_centroids_2, y_centroids_2, _ = process_clusters(cluster_dict_2)

    inf_distances_1 = find_distance_corner(right_corner_coord, x_centroids_1, y_centroids_1)
    inf_distances_2 = find_distance_corner(right_corner_coord, x_centroids_2, y_centroids_2)

    black_pressed_keys = find_pressed_keys(ref_distance_1, inf_distances_1, error_bound_1)
    white_pressed_keys = find_pressed_keys(ref_distance_2, inf_distances_2, error_bound_2)

    return inf_roi, black_pressed_keys, white_pressed_keys, cluster_dict_1, cluster_dict_2

def frame_correction(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    at_detected = False

    camera_params = [640, 480, 640, 480]  # fx, fy, cx, cy
    tag_size = 0.05  # Tag size in meters

    tags = at_detector.detect(gray, estimate_tag_pose=True, camera_params=camera_params, tag_size=tag_size)
    corners, euler_angles = [], []

    for tag in tags:
        corners = tag.corners
        for idx in range(len(tag.corners)):
            at_detected = True
            cv2.line(frame, tuple(tag.corners[idx - 1].astype(int)), tuple(tag.corners[idx].astype(int)), (0, 255, 0), 2)

        # euler_angles = rotationMatrixToEulerAngles(tag.pose_R)
    return corners, euler_angles, at_detected

def get_sampling_coord(coords):
    """
    Calculates and returns sets of sampling coordinates based on a rectangular region defined by input coordinates. 
    The function identifies the bounding rectangle for the given points and generates two sets of sampling points 
    ('pink' and 'yellow') along the vertical borders outside the defined rectangle.

    Parameters:
        coords (list of lists): A list of four sublists, each representing the (x, y) coordinates of one corner 
        of the rectangle. The structure is [[x1, y1], [x2, y2], [x3, y3], [x4, y4]], where each pair [xi, yi] 
        corresponds to a corner of the rectangle.
    
    Returns:
        list of lists: A list containing two lists of coordinates:
            - The first list, 'pink', contains coordinates extending 10 units to the right of the maximum x-value 
            of the provided rectangle, at every 10 units of the y-axis within the rectangle's bounds.
            - The second list, 'yellow', contains coordinates extending 10 units to the left of the minimum x-value 
            of the provided rectangle, at every 10 units of the y-axis within the rectangle's bounds.
    """
    
    pink = []
    yellow = []
    
    x_max = max(coords[0][0], coords[1][0], coords[2][0], coords[3][0])
    x_min = min(coords[0][0], coords[1][0], coords[2][0], coords[3][0])
    y_max = max(coords[0][1], coords[1][1], coords[2][1], coords[3][1])
    y_min = min(coords[0][1], coords[1][1], coords[2][1], coords[3][1])
    
    for index in range(int(y_min), int(y_max), 10):
        pink.append([x_max + 10, index])
        yellow.append([x_min - 20, index])

    return [pink, yellow]

def get_tag_color(img, sample_coords):
    """
    Extracts and calculates the average color values for specified 'pink' and 'yellow' sample points within an image.
    
    Parameters:
        img (numpy.ndarray): The image from which color samples will be taken. The image should be in a format 
        that is indexable with coordinates, typically a NumPy array where color channels are accessed as 
        img[y, x].
    
        sample_coords (tuple of lists): A tuple containing two lists of coordinates. The first list, 'pink', 
        contains the (x, y) coordinates for pink color sampling. The second list, 'yellow', contains the 
        (x, y) coordinates for yellow color sampling. Each coordinate pair should be formatted as (x, y).
    
    Returns:
        tuple of lists: A tuple containing two lists, each with three integers representing the average RGB 
        color values sampled from the specified 'pink' and 'yellow' areas. The first list corresponds to the 
        'yellow' color average (notably, this seems inverted based on the return statement; if this is a mistake, 
        adjust accordingly), and the second list corresponds to the 'pink' color average. If no colors are 
        sampled in a particular set, `None` is returned for that set's average color.
    
    Note:
        The function assumes that the image is indexed as img[y, x], following typical NumPy array conventions. 
        Ensure that the sample coordinates provided match the dimensions and orientation of the image.
    """
    
    pink, yellow = sample_coords
    pink_colors = []
    yellow_colors = []

    for index in pink:
        pink_colors.append(img[int(index[1]), int(index[0])])
        # print(img[int(index[1]), int(index[0])])

    for index in yellow:
        yellow_colors.append(img[int(index[1]), int(index[0])])
        # print(img[int(index[1]), int(index[0])])
        
    if pink_colors:
        average_pink = np.mean(pink_colors, axis=0)
        average_pink = [int(average_pink[0]), int(average_pink[1]), int(average_pink[2])]
        # print("Average pink color:", average_pink)
    else:
        print("No pink colors sampled.")

    if yellow_colors:
        average_yellow = np.mean(yellow_colors, axis=0)
        average_yellow = [int(average_yellow[0]), int(average_yellow[1]), int(average_yellow[2])]
        # print("Average yellow color:", average_yellow)
    else:
        print("No yellow colors sampled.")
        
    return (average_yellow if 'average_pink' in locals() else None, 
            average_pink if 'average_yellow' in locals() else None)

def BGR_to_HSV(bgr_color_1, bgr_color_2):
    """
    Convert a BGR color to HSV.
    
    Parameters:
        bgr_color_1 (tuple): The BGR color of black as a tuple of three integers.
        bgr_color_2 (tuple): The BGR color of white as a tuple of three integers.
        
    Returns:
        tuple: The HSV color as a tuple of three integers.
    """
    
    bgr_array = np.uint8([[bgr_color_1, bgr_color_2]])
    hsv_array = cv2.cvtColor(bgr_array, cv2.COLOR_BGR2HSV)
    hsv_color_1, hsv_color_2 = hsv_array[0]
    
    return tuple(hsv_color_1), tuple(hsv_color_2)

def distance_matching(ref_distances, inf_distances):
    pressed_keys = []
    for i in range(len(ref_distances)):
        if(ref_distances[i] == inf_distances[i]):
            pressed_keys.append(i)

    return pressed_key

def find_distance_corner(at_right_corner, x_centroids, y_centroids):
    x, y = at_right_corner
    dx = x_centroids - x
    dy = y_centroids - y
    distances = (np.sqrt(dx**2 + dy**2))

    return distances

def process_clusters(cluster_dict):
    centroids_x = []
    centroids_y = []
    corners = []

    for cluster_idx, points in cluster_dict.items():
        points_array = np.array(points)

        centroid = np.mean(points_array, axis=0, dtype=np.float64)
        centroids_x.append(centroid[1])
        centroids_y.append(centroid[0])

        min_x = np.min(points_array[:, 1])
        max_x = np.max(points_array[:, 1])
        min_y = np.min(points_array[:, 0])
        max_y = np.max(points_array[:, 0])
        
        corners_cluster = [(min_x, min_y), (max_x, min_y), (max_x, max_y), (min_x, max_y)]
        corners.append(corners_cluster)  # Append the corners for this cluster

    return np.array(centroids_x), np.array(centroids_y), np.array(corners)

def get_lower_corners(coords):
    sorted_by_lower_y = sorted(coords, key=lambda x: x[1], reverse=True)
    lower_corners = sorted_by_lower_y[:2]

    sorted_by_x = sorted(lower_corners, key=lambda x: x[0])
    left_corner, right_corner = sorted_by_x

    return left_corner, right_corner

white_keys = ['C', 'D', 'E', 'F', 'G', 'A', 'B']
black_keys = ['C#', 'D#', 'F#', 'G#', 'A#']

def encode_to_scale(values, scale):
    encoded_notes = []
    scale_length = len(scale)
    for value in values:
        note = scale[value % scale_length]
        encoded_notes.append(note)
    return encoded_notes

In [12]:
cap =  cv2.VideoCapture(0, cv2.CAP_DSHOW)
ref_img = False
# white = (196, 140, 233)
# black = (161, 222, 216)
# HSV_color_2 = (174, 131, 201)
# HSV_color_1 = (29, 58, 201)

# Initialize the AprilTag detector
at_detector = Detector(families='tag36h11',
                   nthreads=1,
                   quad_decimate=1.0,
                   quad_sigma=0.0,
                   refine_edges=1,
                   decode_sharpening=0.25,
                   debug=0)

while cap.isOpened():
    
    success, frame_img = cap.read()
    frame_img = cv2.rotate(frame_img, cv2.ROTATE_180)

    if not success:
        print("Ignoring empty camera frame.")
        break

    if not ref_img:    
        cv2.imshow('Pressed Key Frame', frame_img)
        
        if cv2.waitKey(1) & 0xFF == ord('s'):
            # Color Detector
            ref_at_coord, ref_angle, ref_at_detected = frame_correction(frame_img)
            color_sample_coords = get_sampling_coord(ref_at_coord)
            BGR_color_1, BGR_color_2 = get_tag_color(frame_img, color_sample_coords)
            HSV_color_1 , HSV_color_2 = BGR_to_HSV(BGR_color_1, BGR_color_2)
            ref_left_corner, ref_right_corner = get_lower_corners(ref_at_coord)
            
            # Tag Detection
            mask_bound = (0, 0, 640, 480)
            roi, ref_distance_1, ref_distance_2 = reference_frame(frame_img, mask_bound, HSV_color_1 , HSV_color_2, ref_right_corner)
            ref_img = True
            # need to change this
            # black_error_bounds, white_error_bounds = generate_error_bounds_for_clusters(cluster_dict_1, cluster_dict_2, initial_threshold=7)
            black_error_bounds = [-1.0, -1.0, -0.8, -0.84, -0.7, -0.6, -0.5, -0.4, -1.0, -1.0, -0.65, -0.7, -0.6, -0.5, -0.5, -0.4]
            white_error_bounds = [-1.0, -1.0, -0.8, -0.84, -0.7, -0.6, -0.5, -0.4, -1.0, -1.0, -0.65, -0.7, -0.6, -0.5, -0.5, -0.4]

    elif(ref_img):
        # April Tag Correction
        inf_at_coord, inf_angle, inf_at_detected = frame_correction(frame_img)
        if(inf_at_detected):
            inf_left_corner, inf_right_corner = get_lower_corners(inf_at_coord)
    
            # Tag Detection
            frame_roi, black_error_keys, white_error_keys, cluster_dict_1, cluster_dict_2 = inference_frame(frame_img, mask_bound, HSV_color_1, HSV_color_2,
                    ref_distance_1, ref_distance_2, inf_right_corner, roi, threshold=40, 
                    error_bound_1=black_error_bounds, error_bound_2=white_error_bounds)

            
            encoded_notes_black = encode_to_scale(black_error_keys, black_keys)
            encoded_notes_white = encode_to_scale(white_error_keys, white_keys)
            all_notes = encoded_notes_white + encoded_notes_black
            
            if(all_notes):
                print(all_notes)
    
            # for keys in black_error_keys:
            #     for i in cluster_dict_1[keys]:
            #         rows, columns = i
                    
            #         frame_roi[rows][columns][0] = 0
            #         frame_roi[rows][columns][1] = 255
            #         frame_roi[rows][columns][2] = 0
                    
            # for keys in white_error_keys:
            #     for i in cluster_dict_2[keys]:
            #         rows, columns = i
                    
            #         frame_roi[rows][columns][0] = 0
            #         frame_roi[rows][columns][1] = 125
            #         frame_roi[rows][columns][2] = 125
            
            cv2.imshow('Pressed Key Frame', frame_img)

    cv2.waitKey(1)
    if cv2.getWindowProperty('Pressed Key Frame', cv2.WND_PROP_VISIBLE) < 1:
        break
    
cap.release()
cv2.destroyAllWindows()

[120.22380875 140.16948261 176.58727101 205.64765829 235.02665609
 281.99825837 314.93318917 362.65406184 393.54706592]
[120.27163513 140.18710053 176.60835517 205.48554671 234.94234706
 281.73369928 314.70695141 362.67340336 393.46795641]
displacement [-0.04782637 -0.01761792 -0.02108415  0.16211158  0.08430903  0.26455909
  0.22623776 -0.01934152  0.07910951]
[113.34006901  92.0909604  121.3609023  137.63835254 159.16973603
 154.39478913 195.87634315 224.12185597 252.61859831 257.85452005
 299.18789756 330.46661012 337.98962868 378.34167928 408.82628466]
[113.32805221  92.0820644  121.39229026 137.61592317 159.0367405
 154.2357587  195.81561838 224.03378377 252.5614033  257.67804856
 299.09252047 330.35103072 337.93145552 378.22435227 408.7110233 ]
displacement [ 0.0120168   0.008896   -0.03138796  0.02242938  0.13299552  0.15903044
  0.06072477  0.0880722   0.05719501  0.17647149  0.09537709  0.1155794
  0.05817316  0.11732701  0.11526136]
[120.22380875 140.16948261 176.58727101 205

KeyError: 1