In [None]:
from abc import ABC, abstractmethod
from enum import Enum
import cv2, numpy
from cv2.mat_wrapper import Mat as MatLike

from app.steps.enums import OnFalseEnums
from app.adb import ADB

class WorkSpaceArea():
    def __call__(self, workspace_area):
        if sum(workspace_area) > 0:
            x, y, w, h = workspace_area
            return f"{x}x{y}+{w}+{h}"
        return []
    


class ColorTypes(Enum):
    BGR = "BGR"
    BINARY = "BINARY"
    GRAYSCALE = "GRAYSCALE"
    UNKNOWN = "UNKNOWN"


class BitwiseScreenshotProcessor():
    def check_image_format(self, image):
        # if not isinstance(image, numpy.ndarray):
        #     image = numpy.ndarray(image[0][0])

        if len(image.shape) == 3 and image.shape[2] == 3:
            return ColorTypes.BGR
        
        elif len(image.shape) == 2:
            unique_values = numpy.unique(image)
            if set(unique_values).issubset({0, 255}):
                return ColorTypes.BINARY
            else:
                return ColorTypes.GRAYSCALE
        else:
            return ColorTypes.UNKNOWN
    
    def screenshot_to_bitwise(self, screenshot_path: str | numpy.ndarray, convert_to_grayscale: bool = False, convert_to_binary: bool = False):
        if not isinstance(screenshot_path, str):
            return ValueError("Template path is not defined")
        bitwise_screenshot = cv2.imread(screenshot_path, cv2.IMREAD_COLOR)

        if convert_to_grayscale:
            bitwise_screenshot = cv2.cvtColor(bitwise_screenshot, cv2.COLOR_BGR2GRAY)

        if convert_to_binary:
            _, bitwise_screenshot = cv2.threshold(bitwise_screenshot, 127, 255, cv2.THRESH_BINARY)
        
        return bitwise_screenshot
    
    def bitwise_find_image_in_screen(self, screenshot: MatLike | str, template: MatLike , threshold=0.8):

        if isinstance(screenshot, str):
            template_type = self.check_image_format(template)

            if template_type == ColorTypes.BGR:
                screenshot = self.screenshot_to_bitwise(screenshot)
            
            if template_type == ColorTypes.BINARY:
                screenshot = self.screenshot_to_bitwise(screenshot, convert_to_binary=True, convert_to_grayscale=True)

            if template_type == ColorTypes.GRAYSCALE:
                screenshot = self.screenshot_to_bitwise(screenshot, convert_to_grayscale=True)
            

        result = cv2.matchTemplate(screenshot, template, cv2.TM_CCOEFF_NORMED)
        
        location = numpy.where(result >= threshold)

        matched_coordinates = []
        for pt in zip(*location[::-1]):
            matched_coordinates.append((pt[0], pt[1]))

        if matched_coordinates:
            return True 
        return False
    
    def bitwise_crop(self, screenshot: MatLike, coordinates: list[int]):
        x, y, w, h = coordinates
        return screenshot[y:y + h, x:x + w]
    
    

class ScreenshotProcessor(BitwiseScreenshotProcessor):
    def crop(self, screenshot_path: str, coordinates: list[int], output_path: str):
        # Read the image from the specified path
        screenshot = cv2.imread(screenshot_path, cv2.IMREAD_COLOR)
        
        # Check if the image was loaded successfully
        if screenshot is None:
            raise FileNotFoundError(f"Could not load image at path: {screenshot_path}")

        # Ensure the coordinates are valid
        if len(coordinates) != 4:
            raise ValueError("Coordinates should be a list of four integers: [x, y, width, height].")
        
        x, y, w, h = coordinates
        height, width = screenshot.shape[:2]
        print(height, width)
        print(coordinates)
        # Check if the coordinates are within the bounds of the image
        if x < 0 or y < 0 or x + w > screenshot.shape[1] or y + h > screenshot.shape[0]:
            raise ValueError("Crop coordinates are out of bounds.")

        # Crop the image
        cropped_image = screenshot[y:y + h, x:x + w]

        # Save the cropped image to the specified output path
        success = cv2.imwrite(output_path, cropped_image)

        if not success:
            raise IOError(f"Failed to write image to path: {output_path}")

        return success  # Return whether the writing was successful
    
    def find_image_in_screenshot(screenshot_path: str, template_path: str, threshold=0.8):
        # Load the screenshot and the template imageё
        screenshot = cv2.imread(screenshot_path)  
        template = cv2.imread(template_path)

        # Convert both images to grayscale
        # screenshot_gray = cv2.cvtColor(screenshot, cv2.COLOR_BGR2GRAY)
        # template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)

        # Match the template using cv2.matchTemplate
        result = cv2.matchTemplate(screenshot, template, cv2.TM_CCOEFF_NORMED)

        # Find the locations where the match exceeds the threshold
        loc = numpy.where(result >= threshold)

        # Collecting the coordinates of matched areas
        matched_coordinates = []
        for pt in zip(*loc[::-1]):  # Switch columns and rows
            matched_coordinates.append((pt[0], pt[1]))  # Append (x, y) coordinates

        if matched_coordinates:
            return True

        return False

In [28]:
from app.adb import ADB

adb = ADB("emulator-5554")
adb.ScreenCapture(path="./screenshots/", name="screenshot")

bit_proc = BitwiseScreenshotProcessor()
template = bit_proc.screenshot_to_bitwise("./screenshots/youtube_toolbar.png", convert_to_binary=True)
# template = "./screenshots/template.png"
screenshot = bit_proc.screenshot_to_bitwise("./screenshots/screenshot.png", convert_to_binary=True)
bit_proc.bitwise_crop(screenshot, [2, 1208, 715, 66])

bit_proc.bitwise_find_image_in_screen(screenshot, template, 0.8)

True

In [None]:
import yaml
import logging
from dataclasses import dataclass, field
from enum import Enum
import time

from app.adb import ADB
from app.adb import LDConnector
from app.steps.enums import OnFalseEnums, StepTypesEnums
from app.steps import ClickStep, ValidatorStep, InputStep, ScreenGetterStep, Step, BitwiseScreenshotProcessor, ScreenshotProcessor
from config import load_config

@dataclass(init=False)
class StepData:
    step_name: str
    screenshot_name: str
    type: str
    extra_args: list = field(default_factory=list)
    extra_kwargs: dict = field(default_factory=dict)

    def __init__(self, step_name: str, screenshot_name: str, type: str, *args, **kwargs):
        self.step_name = step_name
        self.screenshot_name = screenshot_name
        self.type = type
        self.extra_args = args
        self.extra_kwargs = kwargs
        
class StepManager():
    def __init__(self, config: str):
        self.adb = ADB(emulator_name="emulator-5554")
        self.ldplayer = LDConnector()
        self.emulator_name = "emulator-5554"
        self.steps = []
        self.config = load_config(config)
        self.screenshots_path = self.config.get("screenshots_path")
        self.work_dir = self.config.get("work_dir")
        self.screen_processor = ScreenshotProcessor()
        self.bitwise_screen_processor = BitwiseScreenshotProcessor()

        for step in self.config.get("steps"):
            data = StepData(**step)

            if data.type == StepTypesEnums.button.value:
                self.steps.append(ClickStep(**step))

            if data.type == StepTypesEnums.input_filed.value:
                self.steps.append(InputStep(**step))

            if data.type == StepTypesEnums.validator.value:
                self.steps.append(ValidatorStep(**step))

    def initialize_steps_binary(self, screenshots_work_dir):
        for step in self.steps:
            try:
                step.bitwise_screenshot_with_workspace_area(screenshots_work_dir)
            except:
                ValueError(f"Step {step.step_name} not initialized")

    def validate_screenshot(self, step: Step, attemp_delay = 0.25):
        attemps = 1
        print(f"Validating {step.step_name}")
        print(step.on_false)

        while attemps <= step.retries_limit:
            self.adb.ScreenCapture(self.work_dir, self.emulator_name)

            if hasattr(step, "bitwise_screenshot"):
                screenshot = self.bitwise_screen_processor.screenshot_to_bitwise(f"{self.screenshots_path}{step.screenshot_name}")
                result = self.bitwise_screen_processor.bitwise_find_image_in_screen(
                    step.bitwise_screenshot, 
                    f"{self.work_dir}{self.emulator_name}.png", 
                    step.threshold)
            else:
                print(f"screenshot: {self.screenshots_path}{step.screenshot_name}")
                print(f"{self.work_dir}{self.emulator_name}.png")

                result = self.screen_processor.find_image_in_screenshot(f"{self.screenshots_path}{step.screenshot_name}", f"{self.work_dir}{self.emulator_name}.png", step.threshold)

            if not result and step.on_false == OnFalseEnums.skip:
                logging.debug(f"Skipping {step.step_name}")
                return OnFalseEnums.skip

            if not result:
                logging.debug(f"State of {step.step_name} screenshot not valid, trying again. Retries {attemps}")
                attemps += 1
                logging.debug(attemp_delay)
                continue

            if result:
                logging.debug(f"Valide state of {step.step_name} after {attemps} retries")
                return True

        return False

In [1]:
from app.steps import ScreenGetterStep

first = ScreenGetterStep(
    step_name = "toolbar",
    screenshot_name="youtube_toolbar.png",
    type = "screen_getter",
    required_screen_check=True,
    workspace_area=[2, 1208, 715, 66]
)

first.init_bitwise(work_dir = "./screenshots")

# manager = StepManager("racer-pointx.yml")

# manager.validate_screenshot(first)