In [1]:
import numpy as np
from copy import deepcopy
import time
from pycromanager import Acquisition, Bridge, multi_d_acquisition_events

In [2]:
bridge = Bridge()
mmc = bridge.get_core()
mmStudio = bridge.get_studio()

Java ZMQ server version: 4.0.0
Python client expected version: 4.1.0
 To fix, update to BOTH latest pycromanager and latest micro-manager nightly build


In [3]:
mm_pos_list = mmStudio.get_position_list_manager().get_position_list()
number_of_positions = mm_pos_list.get_number_of_positions()

xyz_position_list = [[mm_pos_list.get_position(i).get_x(),
                      mm_pos_list.get_position(i).get_y(),
                      mm_pos_list.get_position(i).get_z()] \
                    for i in range(number_of_positions)]

xyz_position_list = np.around(xyz_position_list, decimals=3)

epi_positions = xyz_position_list[:3]
lf_epi_positions = xyz_position_list[3:]

xyz_position_list

array([], dtype=float64)

In [4]:
epi_channel = {'group': 'Master Channel', 'config': 'Epi'}
epi_exposure = 50

lf_channels = [{'group': 'Master Channel', 'config': f'LF-State{i}'} for i in range(5)]
lf_exposure = 10

# relative z
z_start = -5
z_end = 5
z_step = 0.25
z_range = np.arange(z_start, z_end + z_step, z_step)

num_time_points  = 1
time_interval_s  = 120  # in seconds

In [5]:
events = []
num_epi_positions = np.shape(epi_positions)[0]

for t in range(num_time_points):
    
    # epi-only positions, z stack events are created by autofocus function
    for p_idx, p in enumerate(epi_positions):
        events.append(
            {
                'axes': {'time': t, 'position':p_idx, 'z':0},
                'min_start_time': t*time_interval_s,
                'x': p[0],
                'y': p[1],
                'z': p[2],
                'channel': epi_channel,
                'exposure': epi_exposure,
                'keep_shutter_open': False,
            })

    # epi and lf positions, z stack events are created by autofocus function
    for p_idx, p in enumerate(lf_epi_positions):
        
        # epi channel
        events.append(
            {
                'axes': {'time': t,'position':p_idx+num_epi_positions, 'z':0},
                'min_start_time': t*time_interval_s,
                'x': p[0],
                'y': p[1],
                'z': p[2],
                'channel': epi_channel,
                'exposure': epi_exposure,
                'keep_shutter_open': False
            })
        
        # lf channels, ZC order
        events.append(
            {
                'axes': {'time': t,'position':p_idx+num_epi_positions, 'z':0},
                'min_start_time': t*time_interval_s,
                'x': p[0],
                'y': p[1],
                'z': p[2],
                'channel': lf_channels[0],
                'exposure': lf_exposure,
                'keep_shutter_open': False
            })

In [6]:
events

[]

In [7]:
def autofocus_fn(event, bridge, event_queue):
    
    if not hasattr(autofocus_fn, 'pos_index'):
        autofocus_fn.pos_index = event['axes']['position']

    # Only run autofocus at new positions (except the first one)
    if event['axes']['position'] != autofocus_fn.pos_index:
        # get the Micro-Manager core
        mmc = bridge.get_core()

        # apply autofocus
        print(mmc.get_position())
        mmc.set_relative_position(-z_start)
        print(mmc.get_position())
        mmc.full_focus()
        print(mmc.get_position())
        mmc.set_relative_position(z_start)
        print(mmc.get_position())
        
    # Keep track of last time point on which autofocus ran
    autofocus_fn.pos_index = event['axes']['position']

    return event

In [8]:
def demo_autofocus_fn(event, bridge, event_queue):
    
    if 'axes' in event.keys(): # some events only modify hardware
        
        if not hasattr(autofocus_fn, 'pos_index'):
            autofocus_fn.pos_index = event['axes']['position']
            
        # Only run autofocus at new positions (except the first one)
        if event['axes']['position'] != autofocus_fn.pos_index:
            # get the Micro-Manager core
            mmc = bridge.get_core()
            
            # apply autofocus
            current_z = mmc.get_position()
            print(f'Position before autofocus: {current_z}')
            mmc.full_focus()
            del_z = np.around(np.random.uniform(), decimals=2)
            mmc.set_position(current_z+del_z) # simulate movement after autofocus
            print(f'Position after autofocus: {mmc.get_position()}')
            
#             event['z'] = current_z+del_z
        
        # Keep track of last time point on which autofocus ran
        autofocus_fn.pos_index = event['axes']['position']
    
    return event

In [9]:
# TODO: autofocus_fn errors out when event doesn't have 'axes' key

In [5]:
# v1
z_start = -2
z_end = 2
z_step = 1

events = multi_d_acquisition_events(xyz_positions = [[100, 100, 50], [110, 110, 60]])

num_positions = 2
num_slices = len(np.arange(z_start, z_end + z_step, z_step))
num_channels = 1

def autofocus_fn(event, bridge, event_queue):
    mmc = bridge.get_core()
    mmc.full_focus()
    
    del_z = np.around(np.random.uniform(), decimals=2)
    mmc.set_relative_position(del_z)
    
    pos = mmc.get_position()
    z_range = pos + np.arange(z_start, z_end + z_step, z_step)
    
    event['z'] = z_range[0]
    for z_idx, z in enumerate(z_range[1:]):
        new_event = deepcopy(event)
        new_event['axes']['z'] = z_idx+1
        new_event['z'] = z
        event_queue.put(new_event)
        
    return event

def hook_fn(event, bridge, event_queue):
    if 'axes' in event.keys():
        if not hasattr(hook_fn, 'pos_index'):
            hook_fn.pos_index = event['axes']['position']
            event = autofocus_fn(event, bridge, event_queue)
            
        if event['axes']['position'] != hook_fn.pos_index:
            event = autofocus_fn(event, bridge, event_queue)
            
        hook_fn.pos_index = event['axes']['position']
        
        if (
            event['axes']['position']==num_positions-1 and 
            event['axes']['z']==num_slices-1
            ):
            event_queue.put(None)
        
    return event

events

[{'axes': {'position': 0, 'z': 0}, 'x': 100, 'y': 100, 'z': 50},
 {'axes': {'position': 1, 'z': 0}, 'x': 110, 'y': 110, 'z': 60}]

In [6]:
acq =  Acquisition(directory=r'D:', name='demo_pm_test', 
                   post_hardware_hook_fn=hook_fn)

acq.acquire(events)