In [1]:
import pickle
import json
import nbformat
from PIL import Image
from PIL.PngImagePlugin import PngInfo
from pyperiscope import Scope

class Pilot():
    def __init__(self, workbook):
        self.current_notebook = workbook
        self.steps = []
        self.current_step = -1
        self.steps_total = -1
        self.current_doc = ""
        self.last_run_code = ""
        self.last_error = None
        self.scope_cell_id = -1
        
        self.get_steps()

    # load the steps form automation workbook for fruthert usage
    def get_steps(self):
        with open(self.current_notebook, 'r', encoding='utf-8') as f:
            notebook = json.load(f)
        
        # Find all cells with the marker comment
        marker_cells = []
        for i, cell in enumerate(notebook['cells']):
            if cell['cell_type'] == 'code' and cell['source']:
                if cell['source'][0].find("# comment: Automated step generated with pyPeriscope")>-1:
                    marker_cells.append(i)
        
        if not marker_cells:
            self.steps_total = -1
            print("No marker cells found, are you sure this is the correct jupyter lab notebook?")
            return
        
        # Create step ranges
        self.steps = []
        # First step: from start to first marker
        self.steps.append((0, marker_cells[0]-1))
        
        # Middle steps: between markers
        for i in range(len(marker_cells)-1):
            self.steps.append((marker_cells[i]-1, marker_cells[i+1]-1))
        
        # Last step: from last marker to end
        if marker_cells:
            self.steps.append((marker_cells[-1]-1, len(notebook['cells'])))
    
        # Make the 1st two into one as there are some imports in front
        self.steps = self.steps[1:]
        self.steps[0] = (0,self.steps[0][1])
        
        self.steps_total = len(self.steps)

    # read one cell worth of data
    def get_cell(self, cell_number):
        with open(self.current_notebook, 'r', encoding='utf-8') as f:
            notebook = json.load(f)
        
        code_cells = [cell for cell in notebook['cells']]
        
        if cell_number < 0 or cell_number >= len(code_cells):
            raise ValueError(f"Cell number must be between 0 and {len(code_cells) - 1}")
        
        return code_cells[cell_number]

    # simplified view cell (remove hs object defintions and outputs, use for debug)
    def view_cell(self, cell_number):
        code = self.get_cell(cell_number)
        if 'source' in code:
            if code['source'][0].find("# comment: Automated step") > -1:
                code['source'][1] = 'payload = \'*** removed for view ***\''
        if 'outputs' in code:
            if len(code['outputs']) > 0:
                code['outputs'] = ['*** removed for view ***']
        
        return(code)
    
    # run the steps in global namespace so it would work like running the worksheet on the worksheet
    def run_code(self, code_str):
        self.last_run_code = code_str
        current_globals = globals()  # Get fresh globals each time
        exec(code_str, current_globals)

    # save screenshot with step and data
    def save_screenshot_with_data(self, step):
        data_dict = {}
        # save the step dict
        data_dict['step'] = step.save_dict()
        data_dict['last_found'] = step.found_locations
        # save current step
        data_dict['current_step'] = self.current_step
        # save last error
        data_dict['last_error'] = self.last_error
        # last step scope cell id
        data_dict['scope_cell_id'] = self.scope_cell_id
        # docstring into saved png
        data_dict['current_doc'] = self.current_doc
        
        metadata = PngInfo()
        # Convert dict to bytes using pickle
        serialized_data = pickle.dumps(data_dict)
        # Store in PNG metadata with custom key
        metadata.add_text("custom_data", serialized_data.hex())
        pyautogui.screenshot().save("_"+self.current_notebook[:-5]+"png", "PNG", pnginfo=metadata)


    # load screenshot with data
    def load_screenshot_with_data(self, filepath):
        global step
        image = Image.open(filepath)
        
        try:
            # Extract serialized data from metadata
            serialized_data = bytes.fromhex(image.info.get("custom_data", ""))
            data_dict = pickle.loads(serialized_data)
        except (KeyError, pickle.UnpicklingError):
            data_dict = {}  # Return empty dict if no valid data found

        if not data_dict == {}:
            step = Scope(saved_dict=data_dict['step'])
            step.found_locations = data_dict['last_found']
            self.current_step = data_dict['current_step']
            self.last_error = data_dict['last_error']
            self.scope_cell_id = data_dict['scope_cell_id']
            self.current_doc = data_dict['current_doc']

    # replace the scope object in the current step
    def replace_scope(self, scope_in):
        with open(self.current_notebook, 'r', encoding='utf-8') as f:
            nb = nbformat.read(f, as_version=4)
        
        # Find and edit the cell with matching ID
        for cell in nb.cells:
            if cell.get('id') == self.scope_cell_id:
                cell.source = scope_in.get_string()
                break
        
        with open(self.current_notebook, 'w', encoding='utf-8') as f:
            nbformat.write(nb, f)
    
    # run single cell from notebook
    def run_cell(self, cell_number):
        self.last_error = None
        # get cell content
        current_cell = self.get_cell(cell_number)
        # if docstring cell update the variable
        if not current_cell['cell_type'] == 'code':
                if current_cell['cell_type'] == 'markdown':
                    content = "".join(current_cell['source'])
                    self.current_doc = self.current_doc + content
                else:
                    print("cell in not excecutable in 'code' mode, current mode: "+str(current_cell['cell_type']))
                return
        # join code into single string from line list
        current_code = "".join(current_cell['source'])
        # if the cell contains scope deffintion
        if current_code.find("# comment: Automated step") > -1:
            self.scope_cell_id = current_cell['id']
        # remove render preview & sepprate the step.find() into exception
        code = current_code.replace("step.render_preview()\n","")
        if code.find("step.find()") > -1:
            code = code.replace("step.find()","")      
            try:
                self.run_code("step.find()")
            except Exception as e:
                self.last_error = e
        # after all run the code
        try:
            self.run_code(code)
        except Exception as e:
            self.last_error = e
        # stop if there was an error
        if self.last_error:
            return

    # run one step end to end
    def run_step(self, step_no):
        self.current_doc = ""
        for active_cell in range(self.steps[step_no][0], self.steps[step_no][1]):
            self.current_step = step_no
            self.run_cell(active_cell)
            if self.last_error:
                print(self.last_error)
                break
        self.save_screenshot_with_data(step)

    # select and run a workbook
    def run_workbook(self, firstStep = -1):
        # run all the steps in the workbook
        for step in range(0, len(self.steps)):
            # if the workflow broke donw the continue from the last step
            if firstStep > 0:
                if firstStep == step:
                    firstStep = -1
                else:
                    continue
            # run the indivdual step
            self.run_step(step)
            # if error then stop, error is printed in the step process
            if self.last_error:
                return

In [8]:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import os
import time

class FileChangeHandler(FileSystemEventHandler):
    def __init__(self, binoculars, filename):
        self.binoculars = binoculars
        self.filename = filename
        self.last_modified = 0
        # Buffer time to ensure file is completely written (in seconds)
        self.write_buffer = 0.5

    def on_modified(self, event):
        if not event.is_directory and event.src_path.endswith(self.filename):
            current_time = time.time()
            
            # Check if enough time has passed since last modification
            if current_time - self.last_modified > self.write_buffer:
                try:
                    # Additional check to ensure file is not locked
                    with open(event.src_path, 'rb') as f:
                        pass
                    self.binoculars.update_from_screenshot(event.src_path)
                    self.last_modified = current_time
                except (IOError, PermissionError):
                    # File is still being written
                    pass

In [9]:
from ipywidgets import widgets
from IPython.display import display as ipython_display
from pyperiscope import Scope
from PIL import Image
import io
import markdown
import pickle
import dill
import pyautogui
import time
import pyperclip

class Binoculars:
    def __init__(self, width=400):
        self.width = width
        self.image1 = None
        self.image2 = None
        self.box = None
        self._create_widgets()

    def load_screenshot_with_data(self, filepath):
        global step
        image = Image.open(filepath)
        
        try:
            # Extract serialized data from metadata
            serialized_data = bytes.fromhex(image.info.get("custom_data", ""))
            data_dict = pickle.loads(serialized_data)
        except (KeyError, pickle.UnpicklingError):
            data_dict = {}  # Return empty dict if no valid data found

        if not data_dict == {}:
            step = Scope(saved_dict=data_dict['step'])
            self.current_step = data_dict['current_step']
            self.last_error = data_dict['last_error']
            self.scope_cell_id = data_dict['scope_cell_id']
            self.current_doc = data_dict['current_doc']
        
    def _create_widgets(self):
        """Initialize all widgets including text areas"""
        # Create description text widget (Markdown)
        self.description = widgets.HTML(
            value='',
            placeholder='Description',
            description='',
        )
        
        # Create image widgets
        self.widget1 = widgets.Image(format='png', width=self.width)
        self.widget2 = widgets.Image(format='png', width=self.width)
        self.image_box = widgets.HBox([self.widget1, self.widget2])
        
        # Create error text widget (Markdown)
        self.error_text = widgets.HTML(
            value='',
            placeholder='Error messages',
            description='',
        )
        
        # Stack all widgets vertically
        self.box = widgets.VBox([
            self.description,
            self.image_box,
            self.error_text
        ])
        
        ipython_display(self.box)
        
    def _convert_to_bytes(self, pil_image):
        """Convert PIL image to bytes"""
        buf = io.BytesIO()
        pil_image.save(buf, format='PNG')
        return buf.getvalue()
    
    def update_image(self, image, position=1):
        """Update either the first or second image"""
        if not isinstance(image, Image.Image):
            raise TypeError("Input must be a PIL Image")
            
        image_bytes = self._convert_to_bytes(image)
        if position == 1:
            self.image1 = image
            self.widget1.value = image_bytes
        elif position == 2:
            self.image2 = image
            self.widget2.value = image_bytes
        else:
            raise ValueError("Position must be 1 or 2")
    
    def update_both(self, image1, image2):
        """Update both images at once"""
        self.update_image(image1, 1)
        self.update_image(image2, 2)
    
    def set_description(self, text):
        """Update description text with markdown formatting"""
        # Convert markdown to HTML
        md = markdown.Markdown()
        self.description.value = md.convert(text)
    
    def set_error(self, text):
        """Update error text with markdown formatting"""
        # Convert markdown to HTML with red color for errors
        md = markdown.Markdown()
        self.error_text.value = md.convert(text)
    
    def clear_texts(self):
        """Clear both description and error texts"""
        self.description.value = ''
        self.error_text.value = ''
    
    def get_current_images(self):
        """Return current PIL images"""
        return self.image1, self.image2

    def update_from_screenshot(self, filepath):
        global step, current_doc, image
        image = Image.open(filepath)
        
        try:
            # Extract serialized data from metadata
            serialized_data = bytes.fromhex(image.info.get("custom_data", ""))
            data_dict = pickle.loads(serialized_data)
        except (KeyError, pickle.UnpicklingError):
            data_dict = {}  # Return empty dict if no valid data found
    
        if not data_dict == {}:
            step = Scope(saved_dict=data_dict['step'])
            step.found_locations = data_dict['last_found']
            #self.current_step = data_dict['current_step']
            last_error = data_dict['last_error']
            #self.scope_cell_id = data_dict['scope_cell_id']
            current_doc = data_dict['current_doc']
            current = Scope(mouse_offset=step.found_locations[0]['mouse_offset'], area_offset=step.area_offset, area_size=step.area_size, saved_image=image)
    
            # update UI
            self.update_both(step.render_preview(), current.render_preview())
            if current_doc:
                self.set_description(current_doc)
            if last_error:
                self.set_error(last_error)

    def start_monitoring(self, filepath):
        self.observer = Observer()
        self.event_handler = FileChangeHandler(self, os.path.basename(filepath))
        self.observer.schedule(
            self.event_handler, 
            os.path.dirname(filepath) or '.',
            recursive=False
        )
        self.observer.start()

    def stop_monitoring(self):
        if hasattr(self, 'observer'):
            self.observer.stop()
            self.observer.join()

In [11]:
autorunner_pilot = Pilot("odoo-onshape.ipynb")

In [46]:
for i in range(5):
    autorunner_pilot.run_step(i)

found: 1
found: 1
found: 1
found: 1
found: 1


In [None]:
autorunner_pilot.run_workbook()

In [45]:
b = Binoculars(width = 600)
b.start_monitoring("_odoo-onshape.png")

VBox(children=(HTML(value='', placeholder='Description'), HBox(children=(Image(value=b'', width='600'), Image(…