In [1]:
import datetime
import logging
import os
import shutil
import time
from unittest.mock import Mock
import sys

import numpy as np
from io import StringIO
from pathlib import PurePath, Path

from ophyd import (set_cl, EpicsMotor, Signal, EpicsSignal, EpicsSignalRO,
                   Component as Cpt)
from ophyd.utils.epics_pvs import (AlarmSeverity, AlarmStatus)                   

from ophyd.utils.paths import make_dir_tree
from ophyd import (SimDetector, SingleTrigger, Component, Device,
                   DynamicDeviceComponent, Kind, wait)
from ophyd.areadetector.plugins import (ImagePlugin, StatsPlugin,
                                        ColorConvPlugin, ProcessPlugin,
                                        OverlayPlugin, ROIPlugin,
                                        TransformPlugin, NetCDFPlugin,
                                        TIFFPlugin, JPEGPlugin, HDF5Plugin,
                                        register_plugin
                                        # FilePlugin
                                        )
from ophyd.areadetector.base import NDDerivedSignal
from ophyd.areadetector.filestore_mixins import (
    FileStoreTIFF, FileStoreIterativeWrite,
    FileStoreHDF5)

# we do not have nexus installed on our test IOC
# from ophyd.areadetector.plugins import NexusPlugin
from ophyd.areadetector.plugins import PluginBase
from ophyd.areadetector.util import stub_templates
#from ophyd.device import (Component as Cpt, )
from ophyd.signal import Signal
import uuid

from epics import caget, caput, cainfo

In [2]:
@register_plugin
class ZMQPlugin(PluginBase): #, version_type='ADZMQ'):
    '''An areadetector plugin class that publishes an NDArray to a ZMQ socket'''
    _default_suffix = 'ZMQ1:'
    _suffix_re = r'ZMQ\d:'
    _plugin_type = 'NDPluginZMQ'
    

In [None]:
cainfo('13SIM1:cam1:Acquire')
cainfo('13SIM1:ZMQ1:EnableCallbacks')
cainfo('13SIM1:ZMQ1:PluginType_RBV')

In [4]:
try:
    class TestAD(SingleTrigger, SimDetector):
        zmq1 = Cpt(ZMQPlugin, 'ZMQ1:')

    #caput('13SIM1:ZMQ1:EnableCallbacks', 1)
    
    det = TestAD("13SIM1:", name='test')
    print(det.zmq1.plugin_type)
    
    print(det.cam)

    det.cam.acquire_time.put(.1)
    det.cam.num_images.put(1)
    time.sleep(1)

    #print(caget('13SIM1:ZMQ1:EnableCallbacks'))
        
    det.stage()
    print('staged')
    
    #print(caget('13SIM1:ZMQ1:EnableCallbacks'))

    #time.sleep(1)

    st = det.trigger()
    print('triggered')

    count = 0
    while not st.done:
        time.sleep(.1)

        count += 1

    print(f'count = {count}')
    
    #reading = det.read()
    #print(reading)
    #print(det.describe())
    
    #print(caget('13SIM1:ZMQ1:EnableCallbacks'))
    det.unstage()
    print('unstaged')
    #print(caget('13SIM1:ZMQ1:EnableCallbacks'))

except:
    print(f'Exception: {sys.exc_info()}')

EpicsSignalRO(read_pv='13SIM1:ZMQ1:PluginType_RBV', name='test_zmq1_plugin_type', parent='test_zmq1', value='NDPluginZMQ', timestamp=1631243890.053128, auto_monitor=False, string=False)
SimDetectorCam(prefix='13SIM1:cam1:', name='test_cam', parent='test', read_attrs=[], configuration_attrs=['acquire_period', 'acquire_time', 'image_mode', 'manufacturer', 'model', 'num_exposures', 'num_images', 'trigger_mode'])
staged
triggered
count = 6
unstaged


In [4]:
caput('13SIM1:cam1:Acquire', 1)
caput('13SIM1:cam1:ArrayCallbacks', 1)
caput('13SIM1:ZMQ1:EnableCallbacks', 1)

1

In [5]:
caput('13SIM1:cam1:Acquire', 0)
caput('13SIM1:ZMQ1:EnableCallbacks', 0)

1

In [25]:
caget('13SIM1:cam1:SizeX')

1024

In [29]:
caput('13SIM1:cam1:SizeX', 1024)
caput('13SIM1:cam1:SizeY', 1024)

1