## Experimental Workflows


In [None]:
%load_ext autoreload
%autoreload 2

from fibsem import utils

microscope, settings = utils.setup_session()

### Automated Quality Control

In [None]:

# do contrast test polishing
protocol = {
    "milling": {
        "polish": {
            "stages": [{
        "application_file": "autolamella",
        "cross_section": "CleaningCrossSection",
        "hfw": 40e-6,
        "height": 6.0e-07,
        "width": 6.0e-06,
        "depth": 4.0e-07,
        "milling_current": 6.0e-11,
        "milling_voltage": 3.0e3,
        "type": "Rectangle",
        "name": "AdaptivePolishing",
            }
        ],
        "point": {
            "x": 0.0,
            "y": 5e-6,}
        }
    },
    "options": {
        "experimental": {
            "adaptive_polishing": {
                "threshold": 100,
                "step_size": 5e-6,
                "step_limit": 10,
                "image_resolution": [3072, 2188],
                "image_line_integration": 20,
                "image_dwell_time": 100e-9,
            }

    }
}
}

from autolamella.workflows.experimental import adaptive_mill_polishing


adaptive_mill_polishing(microscope, settings, protocol)

## State Management Refactor

In [None]:
%load_ext autoreload
%autoreload 2

from autolamella.structures import AutoLamellaStage, Experiment, AutoLamellaProtocol
from pprint import pprint

EXP_PATH = "/home/patrick/github/autolamella/autolamella/log/AutoLamella-2025-01-10-14-24/experiment.yaml"
PROTOCOL_PATH = "/home/patrick/github/autolamella/autolamella/log/AutoLamella-2025-01-10-14-24/protocol.yaml"
exp = Experiment.load(EXP_PATH)
pos = exp.positions[0]
protocol = AutoLamellaProtocol.load(PROTOCOL_PATH)

print(protocol.method.workflow)

print(f"Last Completed: {pos.last_completed}")
print(f"Next: ", protocol.method.get_next(pos.workflow))
print(f"Previous: ", protocol.method.get_previous(pos.workflow))
print(f"Workflow: ", protocol.method.workflow)

## AutoLamellaProtocol Class

protocol:
- name
- method:
- configuration:
- options
- supervision
- milling

In [None]:
%load_ext autoreload
%autoreload 2

from autolamella.structures import AutoLamellaStage, Experiment, AutoLamellaProtocol
import os
import glob
from pprint import pprint


BASE_PATH = "/home/patrick/github/autolamella/autolamella/protocol/protocol-*-new.yaml"

# TODO: TRENCH MILLING METHOD

filenames = glob.glob(BASE_PATH)
pprint(filenames)
 

PROTOCOL_PATH =  "/home/patrick/github/autolamella/autolamella/protocol/protocol-waffle-new.yaml"

protocol = AutoLamellaProtocol.load(PROTOCOL_PATH)

# pprint(protocol.to_dict()["name"])

# pprint()
# from fibsem.milling import 
# protocol.milling

pprint(protocol.tmp)
# pprint(protocol.options)
# print(protocol.supervision)
# print(protocol.method.workflow)

# from fibsem.structures import FibsemImage
# from fibsem.milling.patterning.plotting import draw_milling_patterns

# stages = protocol.milling["undercut"]

# image = FibsemImage.generate_blank_image(hfw=stages[0].milling.hfw)
# draw_milling_patterns(image, stages)


pprint(protocol.to_dict())

In [None]:
from autolamella.protocol.validation import validate_protocol
from fibsem import utils
protocol = validate_protocol(utils.load_protocol(protocol_path=PROTOCOL_PATH))

protocol2 = AutoLamellaProtocol.from_dict(protocol)
pprint(protocol2.to_dict())

In [None]:
from autolamella.structures import AutoLamellaMethod

[m.name for m in AutoLamellaMethod]

## Experiment Review Tools

- Time estimatation for remaining
- Display Images for each workflow stage
- Generate a Report (pdf)

In [None]:
%load_ext autoreload
%autoreload 2

from autolamella.structures import Lamella, AutoLamellaStage, Experiment, AutoLamellaProtocol, AutoLamellaMethod, get_completed_stages
from pprint import pprint
from typing import List

import numpy as np
from fibsem.structures import FibsemImage
import glob
import os
import PIL.Image
import matplotlib.pyplot as plt


EXP_PATH = "/home/patrick/github/autolamella/autolamella/log/AutoLamella-2025-01-10-14-24/experiment.yaml"
PROTOCOL_PATH = "/home/patrick/github/autolamella/autolamella/log/AutoLamella-2025-01-10-14-24/protocol.yaml"

PATH = "/home/patrick/data/monash-cryo-em/AutoLamella-Exports" 
filenames = glob.glob(os.path.join(PATH, "**/experiment.yaml"), recursive=True)
filenames.insert(0, EXP_PATH)


# pprint(filenames)
# EXP_PATH = filenames[0]


exp = Experiment.load(EXP_PATH)
pos = exp.positions[0]
protocol = AutoLamellaProtocol.load(PROTOCOL_PATH)

# print(protocol.method.workflow)



In [None]:

def get_workflow_snapshot(pos: Lamella, wf: AutoLamellaStage, target_size: int = 256) -> np.ndarray:
    """Get a snapshot of a workflow stage for a given position"""

    if not isinstance(target_size, int):
        target_size = int(target_size)

    # get the final high res images
    filenames = glob.glob(os.path.join(pos.path, f"*{wf.name}*final_high_res*.tif*"))
    
    if len(filenames) == 0:
        print(f"No images found for {pos.name} - {wf.name}")
        return None

    # resize and stack the images for display
    sarr = None
    for fname in sorted(filenames):
        img = FibsemImage.load(fname)
        shape = img.data.shape
        resize_shape = (int(shape[0] * (target_size / shape[1])), target_size)
        arr = np.asarray(PIL.Image.fromarray(img.data).resize(resize_shape[::-1]))
        
        # stack
        if sarr is None:
            sarr = arr
        else:
            sarr = np.append(sarr, arr, axis=1)
    return sarr

def convert_figure_to_np_array(figure: plt.Figure) -> np.ndarray:
    """Convert a matplotlib figure to a numpy array"""
    # Draw the figure
    figure.canvas.draw()
    # Get the RGBA buffer
    buf = figure.canvas.buffer_rgba()
    # Convert to numpy array
    arr = np.asarray(buf)
    plt.close()
    return arr

target_size = 1536 / 4

for filename in filenames[1:2]:

    print(f"Experiment: {os.path.basename(filename)}")
    exp = Experiment.load(filename)

    for i, pos in enumerate(exp.positions):
        print("-"*80)
        print(pos.name, pos.path)

        stages_completed = get_completed_stages(pos, method=protocol.method)

        for wf in stages_completed:

            # if not on same computer
            if not os.path.exists(pos.path):
                print(f"Path does not exist: {pos.path}, remapping to experiment path")
                pos.path = os.path.join(exp.path, pos.name)

            snapshot = get_workflow_snapshot(pos, wf, target_size=target_size)

            if snapshot is None:
                print(f"Skipping {pos.name} - {wf.name}, no images found.")
                continue

            cx1 = snapshot.shape[1] // 4
            cx2 = snapshot.shape[1] // 2 + cx1
            cy = snapshot.shape[0] // 2

            fig = plt.figure()
            plt.title(f"Lamella {pos.name} - {pos.states[wf].completed}")
            plt.imshow(snapshot, cmap="gray")
            plt.plot([cx1, cx2], [cy, cy], "y+", ms=20)
            plt.axis("off")
            plt.show()

            array = convert_figure_to_np_array(fig)
            print(array.shape)

            # save array using PIL
            # img = PIL.Image.fromarray(array)
            # img.save(f"lamella_{pos.name}_{wf.name}.png")

            # break

            # TODO: scalebar


### Review - Report Gen


In [None]:
%load_ext autoreload
%autoreload 2

from autolamella.structures import Lamella, AutoLamellaStage, Experiment, AutoLamellaProtocol, AutoLamellaMethod, get_completed_stages
from pprint import pprint
from autolamella.tools.data import calculate_statistics_dataframe

import numpy as np
from fibsem.structures import FibsemImage
import glob
import os
import PIL.Image
import matplotlib.pyplot as plt


EXP_PATH = "/home/patrick/github/autolamella/autolamella/log/AutoLamella-2025-01-10-14-24/experiment.yaml"  # v.0.4.0
# PROTOCOL_PATH = "/home/patrick/github/autolamella/autolamella/log/AutoLamella-2025-01-10-14-24/protocol.yaml"

PATH = "/home/patrick/data/monash-cryo-em/AutoLamella-Exports" # 0.3.4+
filenames = glob.glob(os.path.join(PATH, "**/experiment.yaml"), recursive=True)
filenames.insert(0, EXP_PATH)

pprint(filenames)

In [None]:
dfs = calculate_statistics_dataframe(os.path.dirname(filenames[2]), encoding="cp1252")


df_experiment, df_history, df_beam_shift, df_steps, df_stage, df_det, df_click, df_milling =  dfs


display(df_history)
display(df_steps)


In [2]:
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter, A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, Table, TableStyle, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
import pandas as pd
import matplotlib.pyplot as plt
import io
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import numpy as np
from datetime import datetime
import io

class PDFReportGenerator:
    def __init__(self, output_filename: str):
        self.output_filename = output_filename
        self.doc = SimpleDocTemplate(
            output_filename,
            pagesize=A4,
            rightMargin=72,
            leftMargin=72,
            topMargin=72,
            bottomMargin=72
        )
        self.styles = getSampleStyleSheet()
        self.story = []

        # Create custom styles
        self.styles.add(ParagraphStyle(
            'CustomTitle',
            parent=self.styles['Heading1'],
            fontSize=24,
            spaceAfter=30,
            alignment=1  # Center alignment
        ))
        
        self.styles.add(ParagraphStyle(
            'Subtitle',
            parent=self.styles['Normal'],
            fontSize=14,
            textColor=colors.grey,
            alignment=1,
            spaceAfter=20
        ))

    def add_title(self, title, subtitle=None):
        """Add a title and optional subtitle to the document"""
        self.story.append(Paragraph(title, self.styles['CustomTitle']))
        if subtitle:
            self.story.append(Paragraph(subtitle, self.styles['Subtitle']))
        self.story.append(Spacer(1, 20))

    def add_heading(self, text, level=2):
        """Add a heading with specified level"""
        style = self.styles[f'Heading{level}']
        self.story.append(Paragraph(text, style))
        self.story.append(Spacer(1, 12))

    def add_paragraph(self, text):
        """Add a paragraph of text"""
        self.story.append(Paragraph(text, self.styles['Normal']))
        self.story.append(Spacer(1, 12))

    def add_page_break(self):
        """Add a page break"""
        self.story.append(PageBreak())

    def add_image(self, path: str, width=6*inch, height=4*inch):
        """Add an image to the PDF"""
        img = Image(path, width=width, height=height)
        self.story.append(img)
        self.story.append(Spacer(1, 20))

    def add_dataframe(self, df, title=None, includes_totals=False):
        """Add a pandas DataFrame as a table"""
        if title:
            self.add_heading(title, 3)
        
        # Convert DataFrame to list of lists
        data = [df.columns.tolist()] + df.values.tolist()
        
        # Create table style
        style = TableStyle([
            ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#2F4F4F')),
            ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
            ('ALIGN', (0, 0), (-1, -1), 'CENTER'),
            ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
            ('FONTSIZE', (0, 0), (-1, 0), 12),
            ('BOTTOMPADDING', (0, 0), (-1, 0), 12),
            ('BACKGROUND', (0, 1), (-1, -1), colors.white),
            ('TEXTCOLOR', (0, 1), (-1, -1), colors.black),
            ('FONTNAME', (0, 1), (-1, -1), 'Helvetica'),
            ('FONTSIZE', (0, 1), (-1, -1), 10),
            ('GRID', (0, 0), (-1, -1), 1, colors.black),
            ('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.whitesmoke, colors.white])
        ])
        
        if includes_totals:
            style.add('BACKGROUND', (0, -1), (-1, -1), colors.HexColor('#E8E8E8'))
            style.add('FONTNAME', (0, -1), (-1, -1), 'Helvetica-Bold')
        
        table = Table(data)
        table.setStyle(style)
        self.story.append(table)
        self.story.append(Spacer(1, 20))

    def add_plot(self, plot_function, title=None, *args, **kwargs):
        """Add a matplotlib plot
        plot_function should be a function that creates and returns a matplotlib figure
        """
        if title:
            self.add_heading(title, 3)
        
        # Create plot and save to bytes buffer
        fig = plot_function(*args, **kwargs)
        img_buffer = io.BytesIO()
        fig.savefig(img_buffer, format='png', bbox_inches='tight', dpi=300)
        img_buffer.seek(0)
        
        # Add plot to story
        img = Image(img_buffer, width=6*inch, height=4*inch)
        self.story.append(img)
        self.story.append(Spacer(1, 20))
        plt.close(fig)

    def add_mpl_figure(self, fig):
        fig.savefig('temp.png', format='png', bbox_inches='tight', dpi=300)
        self.story.append(Image('temp.png'))

    def add_plotly_figure(self, fig, title=None, width=6.5*inch, height=4*inch):
        """Add a Plotly figure to the PDF"""
        if title:
            self.add_heading(title, 3)
        
        # Convert Plotly figure to static image
        img_bytes = fig.to_image(format="png", width=900, height=500, scale=2)
        
        # Create BytesIO object
        img_buffer = io.BytesIO(img_bytes)
        
        # Add image to story
        img = Image(img_buffer, width=width, height=height)
        self.story.append(img)
        self.story.append(Spacer(1, 20))

    def generate(self):
        """Generate the PDF document"""
        self.doc.build(self.story)



In [None]:
from datetime import datetime
from typing import Tuple, Dict
import numpy as np
from autolamella.structures import AutoLamellaStage
from fibsem.milling.patterning.plotting import draw_milling_patterns
from fibsem.milling import get_milling_stages

import glob
import os
import pandas as pd
from fibsem.structures import FibsemImage
from autolamella.protocol.validation import convert_old_milling_protocol_to_new_protocol, MILL_POLISHING_KEY, MILL_ROUGH_KEY, MICROEXPANSION_KEY, FIDUCIAL_KEY

from copy import deepcopy


def get_lamella_figures(p: Lamella, exp_path: str) -> dict:

    # convert old lamella protocol to new style
    if "lamella" in p.protocol or "MillRoughCut" in p.protocol:
        print(f"Converting protocol for {p.name}")
        nprotocol = convert_old_milling_protocol_to_new_protocol(p.protocol)
        if "MillRoughCut" in nprotocol:
            nprotocol[MILL_ROUGH_KEY] = nprotocol.pop("MillRoughCut")

        if "MillPolishingCut" in nprotocol:
            nprotocol[MILL_POLISHING_KEY] = nprotocol.pop("MillPolishingCut")

        if "lamella" in nprotocol:
            del nprotocol["lamella"]

        p.protocol = deepcopy(nprotocol)

    p.path = os.path.join(exp_path, p.name)

    def get_lamella_milling_plots(p: Lamella) -> plt.Figure:

        # DRAW MILLING PATTERNS
        milling_workflows = [MILL_ROUGH_KEY, MILL_POLISHING_KEY, MICROEXPANSION_KEY, FIDUCIAL_KEY]
        milling_stages = []
        for mw in milling_workflows:
            milling_stages.extend(get_milling_stages(key=mw, protocol=p.protocol))

        filenames = sorted(glob.glob(os.path.join(p.path, "ref_MillPolishing*_final_high_res_ib.tif*")))

        if len(filenames) == 0:
            print(f"No images found for {p.name}")
            # continue
            return None

        # sem_image = FibsemImage.load(filenames[0])
        fib_image = FibsemImage.load(filenames[0])

        fig, ax = draw_milling_patterns(fib_image, milling_stages, title=f"{p.name}")

        return fig

    fig_milling = get_lamella_milling_plots(p)


    filenames = sorted(glob.glob(os.path.join(p.path, "ref_MillPolishing*_final_high_res*.tif*")))
    sem_image = FibsemImage.load(filenames[0])
    fib_image = FibsemImage.load(filenames[1])

    fig_images, ax = plt.subplots(1, 2, figsize=(10, 5))
    ax[0].imshow(sem_image.data, cmap="gray")
    ax[1].imshow(fib_image.data, cmap="gray")
    # plt.show()

    FIGURES = {"milling": deepcopy(fig_milling), "images": deepcopy(fig_images)}
    return FIGURES










def plot_multi_gantt(df: pd.DataFrame, color_by='piece_id', barmode='group') -> go.Figure:
    """
    Create a Gantt chart for multiple pieces/processes
    
    Parameters:
    - df: DataFrame with columns [piece_id, step, timestamp, end_time]
    - color_by: Column to use for color coding ('piece_id' or 'step')
    - barmode: 'group' or 'overlay' for how bars should be displayed
    """
    fig = px.timeline(
        df, 
        x_start='start_time',
        x_end='end_time',
        y='step',
        color=color_by,
        # title='Multi-Process Timeline',
        # hover_data=['duration']  # Uncomment to show duration in hover
    )

    # Update layout
    fig.update_layout(
        title_x=0.5,
        xaxis_title='Time',
        yaxis_title='Workflow Step',
        height=400,
        barmode=barmode,  # 'group' or 'overlay'
        yaxis={'categoryorder': 'array', 
               'categoryarray': df['step'].unique()},
        showlegend=True,
        # legend_title_text='Piece ID'
    )

    # Reverse y-axis so first step is at top
    fig.update_yaxes(autorange="reversed")
    
    return fig

def generate_workflow_steps_timeline(df: pd.DataFrame) -> Dict[str, go.Figure]:

    timezone = datetime.now().astimezone().tzinfo

    df["start_time"] = pd.to_datetime(df["timestamp"], unit="s").dt.tz_localize("UTC").dt.tz_convert(timezone)
    df['end_time'] = df['start_time'] + pd.to_timedelta(df['duration'], unit='s')

    # # drop step in Created, Finished
    df = df[~df["stage"].isin(["Created", "PreSetupLamella", "SetupLamella", "PositionReady", "Finished"])]
    # drop step in [STARTED, FINISHED, NULL_END]
    df = df[~df["step"].isin(["STARTED", "FINISHED", "NULL_END"])]

    WORKFLOW_STEPS_FIGURES = {}

    for stage_name in df["stage"].unique():
        df1 = df[df["stage"] == stage_name]
        fig = plot_multi_gantt(df1, color_by='step', barmode='overlay')
        
        WORKFLOW_STEPS_FIGURES[stage_name] = fig

    return WORKFLOW_STEPS_FIGURES    


def generate_workflow_timeline(df: pd.DataFrame) -> go.Figure:

    # drop rows with duration over 1 day
    df = df[df["duration"] < 86400]

    timezone = datetime.now().astimezone().tzinfo
    df["start_time"] = pd.to_datetime(df["start"], unit="s").dt.tz_localize("UTC").dt.tz_convert(timezone)
    df["end_time"] = pd.to_datetime(df["end"], unit="s").dt.tz_localize("UTC").dt.tz_convert(timezone)

    df.rename({"stage": "step"}, axis=1, inplace=True)

    # drop step in Created, Finished
    df = df[~df["step"].isin(["Created", "Finished"])]

    fig = plot_multi_gantt(df, color_by='step', barmode='overlay')
    
    return fig

def generate_report_timeline(df: pd.DataFrame):
    # plot time series with x= step_n and y = timestamp with step  as hover text
    df.dropna(inplace=True)
    df.duration = df.duration.astype(int)

    # convert timestamp to datetime, aus timezone 
    df.timestamp = pd.to_datetime(df.timestamp, unit="s")

    # convert timestamp to current timezone
     # get current timezone?
    timezone = datetime.now().astimezone().tzinfo
    df.timestamp = df.timestamp.dt.tz_localize("UTC").dt.tz_convert(timezone)

    df.rename(columns={"stage": "Workflow"}, inplace=True)

    fig_timeline = px.scatter(df, x="step_n", y="timestamp", color="Workflow", symbol="lamella",
        # title="AutoLamella Timeline", 
        hover_name="Workflow", hover_data=df.columns)
        # size = "duration", size_max=20)
    return fig_timeline

def generate_interaction_timeline(df: pd.DataFrame) -> go.Figure:

    if len(df) == 0:
        return None
    
    df.dropna(inplace=True)

    # convert timestamp to datetime, aus timezone 
    df.timestamp = pd.to_datetime(df.timestamp, unit="s")

    # convert timestamp to australian timezone
    timezone = datetime.now().astimezone().tzinfo
    df.timestamp = df.timestamp.dt.tz_localize("UTC").dt.tz_convert(timezone)

    df["magnitude"] = np.sqrt(df["dm_x"]**2 + df["dm_y"]**2)

    fig_timeline = px.scatter(df, x="timestamp", y="magnitude", color="stage", symbol="type",
        # title="AutoLamella Interaction Timeline", 
        hover_name="stage", hover_data=df.columns,)
        # size = "duration", size_max=20)

    return fig_timeline

def generate_duration_data(df: pd.DataFrame) -> Tuple[pd.DataFrame, go.Figure]:
    df = df.copy()
    df.rename(columns={"petname": "Name", "stage": "Workflow"}, inplace=True)

    # convert duration to hr;min;sec
    df["duration"] = pd.to_timedelta(df["duration"], unit='s')
    df["Duration"] = df["duration"].apply(lambda x: f"{x.components.hours:02d}:{x.components.minutes:02d}:{x.components.seconds:02d}")

    # drop Workflow in ["Created", "SetupLamella", "Finished"]
    # TODO: better handling of SetupLamella
    columns_to_drop = ["Created", "PositionReady","Finished"]
    # if "ReadyLamella" in df["Workflow"].unique():
        # print("DROPPING OLD STAGES")
        # columns_to_drop = ["PreSetupLamella", "SetupLamella", "ReadyTrench", "Finished"]
    df = df[~df["Workflow"].isin(columns_to_drop)]


    fig_duration = px.bar(df, x="Name", y="duration", 
                        color="Workflow", barmode="group")
    
    return df[["Name", "Workflow", "Duration"]], fig_duration

    # # display df_experiment dataframe
    # st.subheader("Experiment Data")
    # df_lamella = df_experiment[["petname", "current_stage", "failure", "failure_note", "failure_timestamp"]].copy()
    # # rename petname to lamella
    # df_lamella.rename(columns={"petname": "lamella"}, inplace=True)
    # # convert timestamp to datetime, aus timezone
    # df_lamella.failure_timestamp = pd.to_datetime(df_lamella.failure_timestamp, unit="s")
    # st.dataframe(df_lamella)


def generate_experiment_summary(df: pd.DataFrame) -> go.Figure:
    pass


def generate_report_data(filename: str, encoding: str = "cp1252") -> dict:

    REPORT_DATA = {}

    # Load experiment data
    exp = Experiment.load(filename)
    dfs = calculate_statistics_dataframe(exp.path, encoding=encoding)
    df_experiment, df_history, df_beam_shift, df_steps, df_stage, df_det, df_click, df_milling = dfs

    df, fig_duration = generate_duration_data(df_history)

    REPORT_DATA["experiment_name"] = exp.name
    REPORT_DATA["experiment_summary_dataframe"] = exp.to_summary_dataframe()

    # timeline
    REPORT_DATA["workflow_timeline_plot"] = generate_workflow_timeline(df_history)
    REPORT_DATA["step_timeline_plots"] = generate_workflow_steps_timeline(df_steps)
    # REPORT_DATA["step_timeline_plot"] = generate_report_timeline(df_steps)
    # REPORT_DATA["interactions_timeline_plot"] = generate_interaction_timeline(df_click)

    # duration
    REPORT_DATA["duration_dataframe"] = df
    REPORT_DATA["duration_plot"] = fig_duration

    # lamella figures
    REPORT_DATA["lamella_data"] = {}
    for p in exp.positions:
        # figs = get_lamella_figures(p, exp.path)
        # REPORT_DATA["lamella_data"][p.name] = figs
        REPORT_DATA["lamella_data"][p.name] = "TODO"

    return REPORT_DATA

# report generation
def generate_report(filename: str, encoding="utf-8"):

    report_data = generate_report_data(filename, encoding=encoding)

    # Create PDF generator
    pdf = PDFReportGenerator('autolamella.pdf')
    
    # Add content
    pdf.add_title(f"AutoLamella Report: {report_data['experiment_name']}",
                  f'Generated on {datetime.now().strftime("%B %d, %Y")}')
    pdf.add_paragraph('This report summarises the results of the AutoLamella experiment.')
    pdf.add_dataframe(report_data["experiment_summary_dataframe"], 'Experiment Summary')

    # timeline
    pdf.add_page_break()
    pdf.add_plotly_figure(report_data["workflow_timeline_plot"], "Workflow Timeline")
    for stage_name, fig in report_data["step_timeline_plots"].items():
        pdf.add_plotly_figure(fig, f"{stage_name} Timeline")

    # pdf.add_plotly_figure(report_data["interactions_timeline_plot"], "Interaction Timeline")

    # duration
    # pdf.add_dataframe(report_data["duration_dataframe"], 'Workflow Duration')
    pdf.add_plotly_figure(report_data["duration_plot"], "Workflow Duration by Lamella")

    # TODO: 
    # show overall summary
    # show overview image with positions
    # show individual lamella data
    # show final images for each lamella
    # show milling patterns for each lamella
    # show milling data

    for p, figs in report_data["lamella_data"].items():
        pdf.add_page_break()
        pdf.add_heading(f"Lamella: {p}")
        # pdf.add_mpl_figure(figs["milling"])
        # pdf.add_mpl_figure(figs["images"])
        plt.close()

    # Generate PDF
    pdf.generate()

generate_report(filename=filenames[2], encoding="cp1252")

In [90]:
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib.units import inch
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image as PILImage
import io

class PDFReportGenerator:
    def __init__(self, output_filename):
        self.doc = SimpleDocTemplate(
            output_filename,
            pagesize=letter,
            rightMargin=72,
            leftMargin=72,
            topMargin=72,
            bottomMargin=72
        )
        self.styles = getSampleStyleSheet()
        self.story = []

    def add_array_as_heatmap(self, array, title=None, width=6, height=4, 
                            cmap='viridis', colorbar=True):
        """Add a numpy array as a heatmap"""
        # Create matplotlib figure
        fig, ax = plt.subplots(figsize=(width, height))
        im = ax.imshow(array, cmap=cmap)
        if colorbar:
            plt.colorbar(im)
        if title:
            plt.title(title)
        
        # Save to bytes buffer
        img_buffer = io.BytesIO()
        plt.savefig(img_buffer, format='png', bbox_inches='tight', dpi=300)
        img_buffer.seek(0)
        plt.close()
        
        # Add to PDF
        img = Image(img_buffer, width=width*inch, height=height*inch)
        self.story.append(img)
        self.story.append(Spacer(1, 12))

    def add_array_as_image(self, array, title=None, width=6, height=4):
        """Add a numpy array as an image"""
        # Convert array to PIL Image
        if array.dtype != np.uint8:
            # Normalize array to 0-255 range
            array = ((array - array.min()) * (255.0 / (array.max() - array.min()))).astype(np.uint8)
        
        img_pil = PILImage.fromarray(array)
        
        # Save to bytes buffer
        img_buffer = io.BytesIO()
        img_pil.save(img_buffer, format='PNG')
        img_buffer.seek(0)
        
        # Add to PDF
        img = Image(img_buffer, width=width*inch, height=height*inch)
        if title:
            self.story.append(Paragraph(title, self.styles['Heading2']))
        self.story.append(img)
        self.story.append(Spacer(1, 12))

    def generate(self):
        """Generate the PDF document"""
        self.doc.build(self.story)

# Example usage
def example_with_arrays():
    # Create sample arrays
    # 1. Numeric array for heatmap
    data_heatmap = np.random.rand(10, 10)
    
    # 2. Image-like array
    x = np.linspace(0, 10, 500)
    y = np.linspace(0, 10, 500)
    X, Y = np.meshgrid(x, y)
    image_array = np.sin(X) * np.cos(Y)
    
    # Create PDF
    pdf = PDFReportGenerator('array_visualization.pdf')
    
    # Add heatmap
    pdf.add_array_as_heatmap(
        data_heatmap, 
        title='Random Data Heatmap',
        width=6,
        height=4,
        cmap='viridis'
    )
    
    # Add image array
    pdf.add_array_as_image(
        image_array,
        title='Sine-Cosine Pattern',
        width=6,
        height=4
    )
    
    pdf.generate()

if __name__ == '__main__':
    example_with_arrays()

In [None]:
import pandas as pd
import plotly.figure_factory as ff
import plotly.express as px
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta

# Create sample data
data = {
    'step': ['Data Loading', 'Preprocessing', 'Analysis', 'Validation', 'Report'],
    'timestamp': [
        '2024-01-01 10:00:00',
        '2024-01-01 10:30:00',
        '2024-01-01 11:15:00',
        '2024-01-01 13:45:00',
        '2024-01-01 14:30:00'
    ],
    'duration': [1800, 2700, 9000, 2700, 3600]  # duration in seconds
}

df = pd.DataFrame(data)

# Convert timestamp to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])

# Calculate end time by adding duration
df['end_time'] = df['timestamp'] + pd.to_timedelta(df['duration'], unit='s')

def plot_gantt_plotly(df):
    """
    Create a Gantt chart using Plotly
    """
    # Create the figure
    fig = px.timeline(
        df, 
        x_start='timestamp',
        x_end='end_time',
        y='step',
        title='Process Timeline',
        color='step'  # Color bars by step
    )

    # Update layout
    fig.update_layout(
        title_x=0.5,  # Center title
        xaxis_title='Time',
        yaxis_title='Process Step',
        height=400,
        showlegend=False,
        yaxis={'categoryorder': 'total ascending'}
    )

    # Update yaxis to show all steps
    fig.update_yaxes(autorange="reversed")  # Optional: reverse order of steps
    
    return fig

def plot_gantt_matplotlib(df):
    """
    Create a Gantt chart using Matplotlib
    """
    fig, ax = plt.subplots(figsize=(12, 6))
    
    # Calculate duration in minutes for scaling
    durations = (df['end_time'] - df['timestamp']).dt.total_seconds() / 60
    
    # Create positions for each task
    y_positions = range(len(df))
    
    # Calculate start times relative to first timestamp
    start_times = (df['timestamp'] - df['timestamp'].min()).dt.total_seconds() / 60
    
    # Create bars
    ax.barh(y_positions, durations, left=start_times, height=0.3)
    
    # Customize chart
    ax.set_yticks(y_positions)
    ax.set_yticklabels(df['step'])
    ax.invert_yaxis()  # Invert to match typical Gantt chart format
    
    # Add grid
    ax.grid(True, axis='x', alpha=0.1)
    
    # Set title and labels
    ax.set_title('Process Timeline')
    ax.set_xlabel('Time (minutes from start)')
    ax.set_ylabel('Process Step')
    
    plt.tight_layout()
    return fig

# Example usage:
# Plotly version
fig_plotly = plot_gantt_plotly(df)
# fig_plotly.show()  # Use this to display in Jupyter notebook

# Matplotlib version
fig_matplotlib = plot_gantt_matplotlib(df)
# plt.show()  # Use this to display in Jupyter notebook

# If you want to save the plots:
# fig_plotly.write_html("gantt_plotly.html")
# fig_matplotlib.savefig("gantt_matplotlib.png")

In [None]:
import pandas as pd
import plotly.express as px
from datetime import datetime

# Create sample data with multiple pieces
data = {
    'piece_id': ['A1', 'A1', 'A1', 'A1', 
                 'B2', 'B2', 'B2', 'B2',
                 'C3', 'C3', 'C3', 'C3'],
    'step': ['Data Loading', 'Preprocessing', 'Analysis', 'Validation'] * 3,
    'timestamp': [
        # Piece A1
        '2024-01-01 10:00:00', '2024-01-01 10:30:00', 
        '2024-01-01 11:15:00', '2024-01-01 13:45:00',
        # Piece B2
        '2024-01-01 11:00:00', '2024-01-01 11:45:00',
        '2024-01-01 12:30:00', '2024-01-01 14:15:00',
        # Piece C3
        '2024-01-01 12:00:00', '2024-01-01 12:45:00',
        '2024-01-01 13:30:00', '2024-01-01 15:15:00'
    ],
    'duration': [1800, 2700, 9000, 2700] * 3  # duration in seconds
}

df = pd.DataFrame(data)

# Convert timestamp to datetime and calculate end time
df['start_time'] = pd.to_datetime(df['timestamp'])
df['end_time'] = df['start_time'] + pd.to_timedelta(df['duration'], unit='s')

# Create two different views
fig1 = plot_multi_gantt(df, color_by='piece_id', barmode='group')
fig2 = plot_multi_gantt(df, color_by='piece_id', barmode='overlay')

# Optional: Add custom hover template
fig1.update_traces(
    hovertemplate="<br>".join([
        "Piece: %{customdata[0]}",
        "Step: %{y}",
        "Start: %{x[0]}",
        "End: %{x[1]}",
        "<extra></extra>"
    ]),
    customdata=df[['piece_id']]
)

fig1.show()
fig2.show()

# If you want to save the plots:
fig1.write_html("gantt_grouped.html")
fig2.write_html("gantt_overlay.html")

In [None]:
CSV_PATH = "/home/patrick/data/monash-cryo-em/AutoLamella-Exports/autolamella/20241022_Sai/AutoLamella-2024-10-22-09-37/history.csv"
# CSV_PATH = "/home/patrick/github/autolamella/autolamella/log/AutoLamella-2025-01-10-14-24/history.csv"

import pandas as pd

df = pd.read_csv(CSV_PATH)

display(df)

# fig = generate_workflow_timeline(df)
# fig.show()

In [None]:
import pandas as pd
CSV_PATH = "/home/patrick/data/monash-cryo-em/AutoLamella-Exports/autolamella/20241022_Sai/AutoLamella-2024-10-22-09-37steps.csv"
# CSV_PATH = "/home/patrick/github/autolamella/autolamella/log/AutoLamella-2025-01-10-14-24/history.csv"

df = pd.read_csv(CSV_PATH)

workflow_steps_figures = generate_workflow_steps_timeline(df)

for k, v in workflow_steps_figures.items():
    print(f"WORKFLOW: {k}")
    v.show()

In [None]:
import pandas as pd
PATH = "/home/patrick/data/monash-cryo-em/AutoLamella-Exports/autolamella/20241022_Sai/AutoLamella-2024-10-22-09-37"
# PATH = "/home/patrick/github/autolamella/autolamella/log/AutoLamella-2025-01-10-14-24"

# from autolamella.tools.data import calculate_statistics_dataframe
from autolamella.structures import Experiment

dfs = calculate_statistics_dataframe(PATH, encoding="utf-8")
exp = Experiment.load(f"{PATH}/experiment.yaml")
protocol = AutoLamellaProtocol.load(f"{PATH}/protocol.yaml")

# display(exp.to_dataframe_v2())
# df = exp.to_summary_dataframe()
# display(df)


In [None]:
from autolamella.structures import AutoLamellaStage
from fibsem.milling.patterning.plotting import draw_milling_patterns
from fibsem.milling import get_milling_stages

import glob
import os
import pandas as pd
from fibsem.structures import FibsemImage
from autolamella.protocol.validation import convert_old_milling_protocol_to_new_protocol, MILL_POLISHING_KEY, MILL_ROUGH_KEY, MICROEXPANSION_KEY, FIDUCIAL_KEY
PATH = "/home/patrick/data/monash-cryo-em/AutoLamella-Exports/autolamella/20241022_Sai/AutoLamella-2024-10-22-09-37"
# PATH = "/home/patrick/github/autolamella/autolamella/log/AutoLamella-2025-01-10-14-24"

exp = Experiment.load(f"{PATH}/experiment.yaml")

print(f"Total Lamella: {len(exp.positions)}, Finished Lamella: {len(exp.at_stage(AutoLamellaStage.Finished))})")
failed_lamella = exp.at_failure()
print(f"Failed Lamella: {[l.name for l in failed_lamella]}")


for p in exp.positions:
    print(p.name)

    # replace keys in protocol
    # MillRoughCut -> MILL_ROUGH_KEY
    # MillPolishingCut -> MILL_POLISHING_KEY
    # remove lamella

    figures = get_lamella_figures(p)
    plt.show()
    
    # if figures is not None:
        # figures["milling"].show()
        # figures["images"].show()


# # drop spacing, rate, preset, spot_size #Tescan only
# TESCAN_PARAMS = ["spacing", "rate", "preset", "spot_size"]
# df = df.drop(columns=TESCAN_PARAMS)

# # filter to WorkflowStage == "MillRough", "MillPolishing", fiducial
# milling_workflows = ["MillRoughCut", "MillPolishingCUt", "microexpansion", "fiducial"]
# df = df[df["WorkflowStage"].isin(milling_workflows)]

# # filter to only milling_current, voltage, depth
# df = df[["Experiment", "Lamella", "WorkflowStage", "MillingStage", 
#             "type", "milling_current", "milling_voltage", "depth", "lamella_height", "lamella_width", "height", "width"]]
# display(df)

# # save to csv at exp.path "milling.csv"
# df.to_csv(os.path.join(exp.path, "protocol.csv"), index=False)

# # continue
# for pos in exp.positions:
    
#     if not pos.is_failure:
#         continue
#     print(f"{pos.name}: {pos.failure_note}")
    
#     # load milling stages
#     protocol = pos.protocol
#     milling_stages = []
#     for mw in milling_workflows:
#         stages = get_milling_stages(key=mw, protocol=protocol, point=Point.from_dict(protocol[mw]["point"]))
#         milling_stages.extend(stages)

#     # TODO: lamella path is not correct when re-loaded on another machine
#     fib_image = FibsemImage.load(os.path.join(exp.path, pos.name, "ref_MillPolishingCut_final_high_res_ib.tif"))
#     sem_image = FibsemImage.load(os.path.join(exp.path, pos.name, "ref_MillPolishingCut_final_high_res_eb.tif"))

#     fig, ax1 = draw_milling_patterns(fib_image, milling_stages)
#     plt.title(pos.name)
#     plt.show()

#     fig, ax = plt.subplots(1, 2, figsize=(10, 5))
#     ax[0].imshow(sem_image.data, cmap="gray")
#     ax[1].imshow(fib_image.data, cmap="gray")

#     plt.show()


In [None]:
from autolamella.structures import AutoLamellaStage
from fibsem.milling.patterning.plotting import draw_milling_patterns
from fibsem.milling import get_milling_stages

for filename in filenames:
    experiment_path = os.path.dirname(filename)
    print(f"Experiment: {experiment_path}")

    try:

            print("-"*80)
            continue
    except Exception as e:
        print(e)
        continue