# Module 1: Mouse Pointer Detction

In [1]:
import cv2
import os
from sklearn.cluster import KMeans
from sklearn.cluster import DBSCAN
from collections import Counter
import numpy as np

## Step 1: Frame Differential Screening

In [None]:
# Video file path
video_path = 'testvideo2.mkv'

# Create a folder to save the moving object frames
moving_frame_folder = 'moving_frames'
if not os.path.exists(moving_frame_folder):
    os.makedirs(moving_frame_folder)

cap = cv2.VideoCapture(video_path)

ret, frame1 = cap.read()
gray_frame1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)

frame_count = 0
frame_count += 1

# Create a dictionary to store the locations of moving objects in each frame
locations = {}

while cap.isOpened():
    ret, frame2 = cap.read()
    if not ret:
        break
    
    # Convert to grayscale
    gray_frame2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
    
    # Calculate the difference between two consecutive frames
    frame_diff = cv2.absdiff(gray_frame1, gray_frame2)
    
    # Apply a threshold to the difference image, in order to mark the changed area
    _, thresh = cv2.threshold(frame_diff, 25, 255, cv2.THRESH_BINARY)
    
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
    thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
    
    # Find contours in the threshold image
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    # Draw bounding boxes around the contours
    for contour in contours:
        if cv2.contourArea(contour) < 110 or cv2.contourArea(contour) > 400:
            continue
        (x, y, w, h) = cv2.boundingRect(contour)
        cv2.rectangle(frame2, (x, y), (x+w, y+h), (0, 0, 255), 2)
        
        if frame_count not in locations:
            locations[frame_count] = []
        locations[frame_count].append((x, y))
        
        position_text = f"({x}, {y})"
        cv2.putText(frame2, position_text, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
    
        # Save the frame
        output_frame_path = os.path.join(moving_frame_folder, f"frame_{frame_count}.jpg")
        cv2.imwrite(output_frame_path, frame2)
        
    frame_count += 1
    gray_frame1 = gray_frame2

print(f"Processed {frame_count} frames")
print(locations)

cap.release()


## Step 2: Intra-frame Clustering by Kmeans

In [None]:
cluster_centers = {}

# Calculate the cluster center of moving objects in each frame
for frame, points in locations.items():
    points_array = np.array(points)
    
    # If there is only one moving object in the frame, the cluster center is exactly the location of the moving object
    if len(points) == 1:
        cluster_centers[frame] = points[0]
    else:
        # If there are multiple moving objects in the frame, use KMeans to calculate the cluster center
        kmeans = KMeans(n_clusters=1, random_state=0).fit(points_array)
        cluster_centers[frame] = tuple(map(int, np.round(kmeans.cluster_centers_[0]))) 

print(cluster_centers)

## Step 3: Inter-frame Clustering by DBSCAN

In [None]:
# DBSCAN parameters
eps = 20  # neighborhood distance
min_samples = 2  # minimum number of samples required to form a dense region

new_cluster_centers = {}

keys = list(cluster_centers.keys())

# Iterate through each frame
for i, frame in enumerate(keys):
    
    # Get the window frames for the current frame (3 frames before and 3 frames after)
    window_frames = keys[max(i - 3, 0):min(i + 4, len(keys))]
    print(f"Window frames for frame {frame}: {window_frames}")
    
    # Get the cluster center points for the window frames
    window_points = [cluster_centers[f] for f in window_frames]
    print(f"Window points for frame {frame}: {window_points}")
    
    # Convert the points to a numpy array
    points_array = np.array(window_points)
    
    # Use DBSCAN to cluster the points
    dbscan = DBSCAN(eps=eps, min_samples=min_samples).fit(points_array)
    labels = dbscan.labels_
    
    # If all labels are -1 (noise points), use the original cluster center
    if np.all(labels == -1):
        new_cluster_centers[frame] = cluster_centers[frame]
    else:
        # Count the number of points in each cluster, determine the largest cluster
        label_count = Counter(labels[labels != -1])
        if label_count:
            most_common_label = label_count.most_common(1)[0][0]
            most_common_points = points_array[labels == most_common_label]
            
            # Calculate the new cluster center using the mean of the points in the largest cluster
            new_center = np.mean(most_common_points, axis=0)
            new_cluster_centers[frame] = tuple(map(int, np.round(new_center))) 
        else:
            # If there is no cluster, use the original cluster center
            new_cluster_centers[frame] = cluster_centers[frame]

for frame, center in new_cluster_centers.items():
    print(f"Frame {frame}: New Cluster Center {center}")


In [18]:
# Draw the bounding box around the moving object in the frame, save the original image at the same time
cap = cv2.VideoCapture(video_path)
frame_count = 0
moving_cluster_frame_folder = 'moving_cluster_frames'
if not os.path.exists(moving_cluster_frame_folder):
    os.makedirs(moving_cluster_frame_folder)
moving_cluster_original_frames_folder = 'moving_cluster_original_frames'
if not os.path.exists(moving_cluster_original_frames_folder):
    os.makedirs(moving_cluster_original_frames_folder)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    frame_count += 1
    
    if frame_count in new_cluster_centers:
        output_original_frame_path = os.path.join(moving_cluster_original_frames_folder, f"frame_{frame_count}.jpg")
        cv2.imwrite(output_original_frame_path, frame)
        
        center = new_cluster_centers[frame_count]
        cv2.rectangle(frame, (center[0]-10, center[1]-10), (center[0]+10, center[1]+10), (0, 255, 0), 2)
        
        output_frame_path = os.path.join(moving_cluster_frame_folder, f"frame_{frame_count}.jpg")
        cv2.imwrite(output_frame_path, frame)
        

# Module 2: Global Frame Differential

In [10]:
import cv2
import numpy as np
import os

In [None]:
DIFF_THRESHOLD = 0.2

key_frame_folder = 'key_frames'
if not os.path.exists(key_frame_folder):
    os.makedirs(key_frame_folder)

def calculate_frame_difference(frame1, frame2):
    
    diff = cv2.absdiff(frame1, frame2)
    
    diff_value = np.mean(diff)
    
    return diff_value

def detect_key_frames(video_path, threshold=DIFF_THRESHOLD):
    cap = cv2.VideoCapture(video_path)
    
    frame_count = 0
    detect_count = 0
    key_frames = []
    
    ret, prev_frame = cap.read()
    frame_count += 1
    
    while ret:
        ret, current_frame = cap.read()
        if not ret:
            break
        
        # Calculate the difference between two consecutive frames
        diff_value = calculate_frame_difference(prev_frame, current_frame)
        # print(f"Key frame detected at frame {frame_count}, diff: {diff_value}")
        
        # Compare the difference with the threshold
        if diff_value > threshold:
            # Make sure the time interval between keyframes is at least 10 frames
            if frame_count - detect_count > 11:
                key_frames.append(frame_count)
                frame_filename = os.path.join(key_frame_folder, f"frame_{frame_count}.jpg")
                cv2.imwrite(frame_filename, current_frame)
                print(f"Key frame detected at frame {frame_count}, diff: {diff_value}")
                
            detect_count = frame_count
        
        prev_frame = current_frame
        frame_count += 1
    
    cap.release()
    
    return key_frames

video_path = 'testvideo2.mkv'
key_frames = detect_key_frames(video_path)

print(f"Detected {len(key_frames)} key frames: {key_frames}")


# Module 3: Local Frame Differential

In [12]:
import numpy as np
import cv2
import os

In [None]:
moving_pairs = []

keys = list(new_cluster_centers.keys())

for i, frame in enumerate(keys):
    window_frames = keys[max(i - 1, 0):i+1]
    print(f"Window frames for frame {frame}: {window_frames}")
    
    window_points = [new_cluster_centers[f] for f in window_frames]
    print(f"Window points for frame {frame}: {window_points}")
    
    for j in range(len(window_points) - 1):
        for k in range(j + 1, len(window_points)):
            point1 = window_points[j]
            point2 = window_points[k]
            distance = np.linalg.norm(np.array(point1) - np.array(point2))
            print(distance)
            if distance < 3:
                moving_pairs.append((window_frames[j], window_frames[k]))

print(moving_pairs)

In [None]:
moving_frame_original_folder = 'moving_cluster_original_frames'
color_threshold = 30
color_pixel_threshold = 20
local_key_frames = []

patch_folder = 'patch_frames'
if not os.path.exists(patch_folder):
    os.makedirs(patch_folder)

# Determine if a pixel is a color pixel
def is_color_pixel(pixel, threshold=15):
    b, g, r = pixel
    return (abs(int(r) - int(g)) > threshold or 
            abs(int(r) - int(b)) > threshold or 
            abs(int(g) - int(b)) > threshold)

for pair in moving_pairs:
    frame1_path = os.path.join(moving_frame_original_folder, f"frame_{pair[0]}.jpg")
    frame1 = cv2.imread(frame1_path)
    
    frame2_path = os.path.join(moving_frame_original_folder, f"frame_{pair[1]}.jpg")
    frame2 = cv2.imread(frame2_path)
    
    # Extract the patch (40x40) around the center
    center = new_cluster_centers[pair[1]]
    x, y = center
    x1, y1 = x - 20 , y - 20
    x2, y2 = x + 20, y + 20
    
    patch1 = frame1[y1:y2, x1:x2]
    patch2 = frame2[y1:y2, x1:x2]
    
    patch1_path = os.path.join(patch_folder, f"patch1_{pair[0]}.jpg")
    cv2.imwrite(patch1_path, patch1)
    patch2_path = os.path.join(patch_folder, f"patch2_{pair[1]}.jpg")
    cv2.imwrite(patch2_path, patch2)
    
    # Calculate the average of the color pixels in the patch
    color_pixel1 = [pixel for pixel in patch1.reshape(-1, 3) if is_color_pixel(pixel, color_pixel_threshold)]
    color_pixel2 = [pixel for pixel in patch2.reshape(-1, 3) if is_color_pixel(pixel, color_pixel_threshold)]
    avg_color1 = np.mean(color_pixel1, axis=0)
    avg_color2 = np.mean(color_pixel2, axis=0)
    color_diff = np.linalg.norm(avg_color1 - avg_color2)
    
    # Compare the color difference with the threshold
    if color_diff > color_threshold:
        print(f"Key frame detected at frame {pair[1]}, diff:{color_diff}")
        local_key_frames.append(pair[1])

print(local_key_frames)
        

# Module 4: Keyframe Aggregation

In [15]:
import cv2
import numpy as np
import os

In [None]:
merged_keyframes = key_frames.copy()
# Aggregate the key frames detected by local color difference and global difference
# If there is no global keyframe within 5 frames of the local keyframe, add it to the final list
for local_key in local_key_frames:
    if all(abs(local_key - key) >= 5 for key in key_frames):
        merged_keyframes.append(local_key)

merged_keyframes = sorted(merged_keyframes)
print(merged_keyframes)
# Create a dictionary to store the final cluster center of each key frame
final_pointer = {}
for key_frame in merged_keyframes:
    if key_frame in new_cluster_centers:
        final_pointer[key_frame] = new_cluster_centers[key_frame]
        print(f"Key frame {key_frame}: {new_cluster_centers[key_frame]}")
    # If the key frame is not in the new cluster center, find the closest frame
    else:
        closest_frame = min(new_cluster_centers.keys(), key=lambda x: abs(x - key_frame))
        final_pointer[key_frame] = new_cluster_centers[closest_frame]
        print(f"Key frame {key_frame}: {new_cluster_centers[closest_frame]}")

final_frame_folder = 'final_frames'
if not os.path.exists(final_frame_folder):
    os.makedirs(final_frame_folder)
    
cap = cv2.VideoCapture(video_path)

frame_count = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    if frame_count in final_pointer.keys():
        x, y = final_pointer[frame_count]
        x1, y1 = x, y
        # 指针矩阵大小为12x20
        x2, y2 = x + 12, y + 20

        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)

        output_frame_path = os.path.join(final_frame_folder, f"frame_{frame_count}.jpg")
        cv2.imwrite(output_frame_path, frame)

    frame_count += 1

# Module 5: OCR

In [21]:
import cv2
import easyocr
import os
import PIL
PIL.Image.ANTIALIAS = PIL.Image.LANCZOS

In [None]:
input_folder = 'final_frames'
output_folder = 'text_square_frames'

if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# Initialize the EasyOCR reader
reader = easyocr.Reader(['en'])

# Create a dictionary to store the text and location information of each frame
text_location = {}

for filename in merged_keyframes:
    image_path = os.path.join(input_folder, f"frame_{filename}.jpg")
    image = cv2.imread(image_path)
    results = reader.readtext(image)

    for (bbox, text, prob) in results:
        (top_left, top_right, bottom_right, bottom_left) = bbox
        top_left = tuple(map(int, top_left))
        bottom_right = tuple(map(int, bottom_right))

        cv2.rectangle(image, top_left, bottom_right, (0, 255, 0), 2)
        
        if filename not in text_location:
            text_location[filename] = []

        text_location[filename].append((text, top_left))
        
        output_path = os.path.join(output_folder, f"frame_{filename}.jpg")
        cv2.imwrite(output_path, image)

print(text_location)

In [None]:
print(final_pointer)
print(text_location)

# Module 6: Prompt Construction

In [None]:
Prompt1 = "This is a video about a laboratory workflows on a software interface, and I need you to understand each step in the workflow. I have done some preliminary analysis and will provide some information to you later, including key frames you need to focus, the coordinates of mouse pointer, text and the coordinates of text ." 

Prompt2 = "Firstly, I have captured the keyframes and the corresponding coordinates of mouse pointer in this video. Data form is {frame_number: (x, y)}. Your first task is to use this information to generate 'pyautogui' code to simulate the click process."
Prompt2 = f"{Prompt2} {final_pointer}"
print(Prompt2)

Prompt3 = "Secondly, I have extracted the text in the images and their coordinates. Data form is {frame_number: [text,(x, y)]}. Your second task is to match the text information to the corresponding step in the workflow."
Prompt3 = f"{Prompt3} {text_location}"
print(Prompt3)

Prompt4 = "Thirdly, there are several steps missing before clicking 'Apply' in the workflow, they are: 1. add discrete amounts of B in ul to Plate 1; 2. add discrete amounts of C in ul to Plate 1; 3. add discrete amounts of D in ul to Plate 1. Your third task is to only find the key frame that need to be revised and add the missing steps before clicking 'Apply'."

Prompt5 = "Finally, Your final task is to generate the revised 'pyautogui' code of the whole process, noted that if you want to check these missing steps, click the check box about 390 pixels to the right of the matching text."


# Module 7: Generate Object Code by using LLM:

In [27]:
from openai import OpenAI
from IPython.display import display, Image, Audio

In [None]:
client = OpenAI(
    base_url="https://oneapi.xty.app/v1",
    api_key="your_api_key " # your api key 
)

PROMPT_MESSAGES = [
    {
        "role": "user",
        "content": [{
            "type": "text",
            "text": Prompt1,
            # *map(lambda x: {"image": x, "resize": 768}, base64Frames),
        }],
    },
    {
        "role": "user",
        "content": [{
            "type": "text",
            "text": Prompt2,
        }],
    },
    {
        "role": "user",
        "content": [{
            "type": "text",
            "text": Prompt3,
        }],
    },
    {
        "role": "user",
        "content": [{
            "type": "text",
            "text": Prompt4,
        }],
    },
    {
        "role": "user",
        "content": [{
            "type": "text",
            "text": Prompt5,
        }],
    },
]
temp = 0.0
penalty =0.0
messages = []
for message in PROMPT_MESSAGES:
    messages.append(message)
    params = {
        "model": "gpt-4o",
        "messages": messages,
        "max_tokens": 2048,
        "temperature": temp,
        "presence_penalty": 1.0,
    }
    print(result.choices[0].message.content)
    print("**************Next Step**************")
    result = client.chat.completions.create(**params)
    messages.append({"role":"assistant","content":result.choices[0].message.content})
