In [3]:
import TimeTagger
import numpy as np
%matplotlib ipympl
import ipywidgets as widgets

In [4]:
dump_file = r"Z:\PriyaM\swabian_data\20230712_tissue\test_tissue_2023-07-12_154752.1.ttbin"

In [8]:
tagger = TimeTagger.createTimeTaggerVirtual()

## channel/channel example (photon mean per channel)

In [5]:
arr = []

photon = 4
frame = 3

In [82]:
class PhotonMean(TimeTagger.CustomMeasurement):
    """
    CustomMeasurement that gets channel_1/channel_2.
    """

    def __init__(self, tagger, channels):
        TimeTagger.CustomMeasurement.__init__(self, tagger)
        self.channels = list(channels)
        self.counts = np.zeros((len(channels),), dtype=np.int32)

        # Each used channel must be registered
        for channel in channels:
            self.register_channel(channel)

        self.clear_impl()

        # At the end of a Measurement construction we must indicate that we
        # have finished
        self.finalize_init()

    def __del__(self):
        # The measurement must be stopped before deconstruction to avoid
        # concurrent measure() calls
        self.stop()

    def getData(self):
        # lock this instance to avoid conflicting results while measure is
        # running apart.
        with self.mutex:
            counts = np.array(self.counts)
        if counts[1]!=0:
            return counts[0]/counts[1]
        else:
            return "no frames yet"

    def getIndex(self):
        # this method does not depend on the internal state, so there is no
        # need for a lock
        return list(self.channels)

    def clear_impl(self):
        # the lock is already acquired
        self.counts *= 0

    def on_start(self):
        # the lock is already acquired
        pass

    def on_stop(self):
        # the lock is already acquired
        pass

    def process(self, incoming_tags, begin_time, end_time):
        # the lock is already acquired
        # self.data is provided as reference, so it must not be accessed
        # anywhere else without locking the mutex.
        # incoming_tags is provided as a read-only reference. The storage will
        # be deallocated after this call, so you must not store a reference to
        # this object. Make a copy instead.
        channel_numbers, counts = np.unique(incoming_tags["channel"], return_counts=True)
        for channel_number, count in zip(channel_numbers, counts):
            self.counts[self.channels.index(channel_number)] += count
            
            if count%1E5:
                arr.append(count)

In [58]:
tagger = TimeTagger.createTimeTaggerVirtual()

In [80]:
slider_duration = widgets.IntSlider(
    orientation='horizontal',
    description='Duration (ps):',
    step=.5E12,
    min=0,
    max=45E12
)

slider_begin = widgets.IntSlider(
    orientation='horizontal',
    description='Start at (ps):',
    step=.5E12,
    min=0,
    max=45E12
)

slider_duration.layout.width = '60%'
slider_begin.layout.width = '60%'


In [106]:
arr3= [1,2,1,3,4,5,6,7,7,8,5,3,2,4,5,6,3,2,2,1,4,5,7,8,4,5,6]
np.unique(arr3, return_counts = True)

(array([1, 2, 3, 4, 5, 6, 7, 8]), array([3, 4, 3, 4, 5, 3, 3, 2], dtype=int64))

In [81]:

def update_duration(begin, duration):
    arr.clear()
    tagger.reset()
    photon_mean = PhotonMean(tagger, channels=[photon, frame])
    
    tagger.setReplaySpeed(-1)
    replay = tagger.replay(dump_file, begin=begin, duration=duration)
    tagger.waitForCompletion()
    
    photon_mean_data = photon_mean.getData()
    print("mean photons per frame:", photon_mean_data)
    
    print("length= ",len(arr), '\n', arr)
    
widgets.interact(update_duration, begin=slider_begin, duration=slider_duration)


interactive(children=(IntSlider(value=0, description='Start at (ps):', layout=Layout(width='60%'), max=4500000…

<function __main__.update_duration(begin, duration)>

# Photon median

In [18]:
class PhotonMedian(TimeTagger.CustomMeasurement):
    """
    CustomMeasurement that gets channel_1/channel_2.
    """

    def __init__(self, tagger, channels):
        TimeTagger.CustomMeasurement.__init__(self, tagger)
        self.channels = list(channels)
        self.it=[]
        
        # Each used channel must be registered
        for channel in channels:
            self.register_channel(channel)
        
        
        self.clear_impl()

        # At the end of a Measurement construction we must indicate that we
        # have finished
        self.finalize_init()

    def __del__(self):
        # The measurement must be stopped before deconstruction to avoid
        # concurrent measure() calls
        self.stop()

    def getData(self):
        # lock this instance to avoid conflicting results while measure is
        # running apart.
        with self.mutex:
            return self.data.copy

    def getIndex(self):
        # this method does not depend on the internal state, so there is no
        # need for a lock
        return list(self.channels)

    def clear_impl(self):
        # the lock is already acquired
        self.it *= 0

    def on_start(self):
        # the lock is already acquired
        pass

    def on_stop(self):
        # the lock is already acquired
        pass

    def process(self, incoming_tags, begin_time, end_time):
        # the lock is already acquired
        # self.data is provided as reference, so it must not be accessed
        # anywhere else without locking the mutex.
        # incoming_tags is provided as a read-only reference. The storage will
        # be deallocated after this call, so you must not store a reference to
        # this object. Make a copy instead.
        self.it = incoming_tags['channel']

In [19]:
tagger.reset()
photon_med = PhotonMedian(tagger, channels=[photon])

tagger.setReplaySpeed(-1)
replay = tagger.replay(dump_file)
tagger.waitForCompletion()

True

In [20]:
photon_med.getData()

array([], dtype=int32)

In [90]:
photon_med.getData()

array([],
      dtype={'names': ['type', 'missed_events', 'channel', 'time'], 'formats': ['u1', '<u2', '<i4', '<i8'], 'offsets': [0, 2, 4, 8], 'itemsize': 16, 'aligned': True})