<a href="https://colab.research.google.com/github/ghoshatanu857/Instrument_Automation/blob/main/Experimental_Applications/LifeTimeMeasurement_modified.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Measuring Lifetime by Instrument Automation
> Prof. Siddharth Dhomkar and Mr. Atanu Ghosh

#### Installing Modules

In [None]:
!pip install pulsestreamer
!pip install nidaqmx
import numpy as np
import inspect,dis
import sys
import time
from tqdm import trange
from plotly.subplots import make_subplots
from plotly import graph_objs as go
import scipy
from scipy.optimize import curve_fit
import pulsestreamer
import nidaqmx
import nidaqmx.stream_readers
import pprint
from pulsestreamer import PulseStreamer,findPulseStreamers,OutputState,TriggerStart,Sequence,TriggerRearm
import os
import scipy.stats as stats
# from tkinter import *
# from tkinter.messagebox import askyesno
# permission='' #creating a gloabl variable

program_path = os.getcwd()

#### loading Instruments

In [None]:
# # loading the local Pulse Streamer system
# IPaddress = findPulseStreamers(search_serial='')[0][0]
# pulser = PulseStreamer(IPaddress)

# # loading the local NIDAQmx system
# system = nidaqmx.system.System.local()
# DAQ_device = system.devices['Dev1']

#### Functions

In [None]:
# Laser_Initialization Sequence
def seqInit(*args):
  laser_block = [(delay2,0),(laser_on,1),(delay2,0)]
  seq_init = pulser.createSequence()
  seq_init.setDigital(laser_port,laser_block)
  condition_check(seq_init)
  return seq_init

# Laser_Read Sequence
def seqRead(*args):
    laser_block = [(delay2, 0), (laser_on, 1), (delay2, 0)]
    trigger_block = [(delay2 + delay3, 0),(read_on, 1),(laser_on - read_on - delay3, 0),(delay2, 0)]
    timing_block = [(delay2 + delay3, 0),(read_on - triggerTimingDelay, 1),(laser_on - read_on - delay3 + triggerTimingDelay, 0),(delay2, 0),]
    seq_read = pulser.createSequence()
    seq_read.setDigital(laser_port, laser_block)
    seq_read.setDigital(trigger_port, trigger_block)
    seq_read.setDigital(timing_port, timing_block)
    condition_check(seq_read)
    return seq_read

# Free Evolution Sequence
def seqLifetime(*args):
  timing_read_on = read_on-triggerTimingDelay
  for t in range(steps-1): # neglecting the last step
    trigger_block = [(read_on,1),(delay1+timeRange[t],0),(read_on,1),(timeRange[-1]-timeRange[t]-delay1-2*read_on,0)]
    timing_block = [(timing_read_on,1),(delay1+timeRange[t]+triggerTimingDelay,0),
                     (timing_read_on,1),((timeRange[-1]-timeRange[t]-delay1-triggerTimingDelay-2*timing_read_on),0)]

    seq_evolution = pulser.createSequence()
    seq_evolution.setDigital(trigger_port, trigger_block)
    seq_evolution.setDigital(timing_port, timing_block)
    condition_check(seq_evolution)
    seq_lifetime = seqInit(*args) + seq_evolution
    yield seq_lifetime

# Function to check the conditions
def condition_check(sequence):
  if sequence.isEmpty()!=0:
    raise Exception(f"{list(locals().keys())} is empty!")
  if sequence.getDuration()%8!=0:
    raise Exception(f"{list(locals().keys())} duration is not multiple of 8ns!")


# Function for doing the Lifetime measurement
def lifetime(*args):

     numberofpoints=samples*2
     buffersamplecount=numberofpoints
     count_per_average = buffersamplecount*(steps-1) # as we are ignoring the very last steps
     DAQ_device.reset_device()

     # Counter
     counter = nidaqmx.Task()
     ciChannel = counter.ci_channels.add_ci_count_edges_chan('/Dev1/ctr1',edge=nidaqmx.constants.Edge.RISING, initial_count=0,
                                                             count_direction=nidaqmx.constants.CountDirection.COUNT_UP)
     # print(task.ci_channels[0].ci_count_edges_term)

     # Trigger
     counter.triggers.pause_trigger.dig_lvl_src='/Dev1/PFI4'
     counter.triggers.pause_trigger.trig_type=nidaqmx.constants.TriggerType.DIGITAL_LEVEL
     counter.triggers.pause_trigger.dig_lvl_when=nidaqmx.constants.Level.LOW

     # Timing
     counter.timing.cfg_samp_clk_timing(rate=1e8,source='/Dev1/PFI5',active_edge=nidaqmx.constants.Edge.FALLING,
                                        sample_mode = nidaqmx.constants.AcquisitionType.FINITE, samps_per_chan=count_per_average)

     # Pulse streamer Gating
     gate_task = nidaqmx.Task()
     gate_task.do_channels.add_do_chan(lines = 'Dev1/port0/line7')

     # Counter read task
     reader = nidaqmx.stream_readers.CounterReader(counter.in_stream)
     highCount = np.zeros(buffersamplecount, dtype = np.uint32)

     cps = []
     callback=[]

     # Callback function
     def readBuffer(task_handle, every_n_samples_event_type, number_of_samples, callback_data):
         counter.in_stream.read_all_avail_samp = True
         reader.read_many_sample_uint32(highCount, number_of_samples_per_channel= -1, timeout=10.0)  #10s
         cps.extend(highCount)
         callback.extend([1])
         return 0
     counter.register_every_n_samples_acquired_into_buffer_event(buffersamplecount,readBuffer)

     time.sleep(0.5)
     t=0
     run=0
     data=[]
     print(f"callback number in beginning: {len(callback)}\n")

     for i in trange(averages):

         time.sleep(0.5)
         counter.control(nidaqmx.constants.TaskMode.TASK_RESERVE)
         gate_task.control(nidaqmx.constants.TaskMode.TASK_RESERVE)
         time.sleep(0.5)
         counter.start()

         reSet = pulser.reset()
         sequence = seqLifetime(*args)
         pulser.setTrigger(start=TriggerStart.HARDWARE_RISING,rearm=TriggerRearm.AUTO)
         if pulser.hasSequence()!=0:
            raise Exception('Pulse Streamer has no Sequence uploaded!')

         start1=time.time_ns()
         for s in sequence:
             t1=len(callback)

             # performing the streaming samples_number times
             pulser.stream(s,n_runs=samples,final=([],0,0))

             gate_task.write(True)
             while len(callback)==t1:
                 time.sleep(0.05)
             gate_task.write(False)

         end1=time.time_ns()
        #  print('Time(s) for single average: ', (end1-start1)/1e9)
         print(f"callback number at {i+1}-th average end: {len(callback)}\n")
         run=run+1
         counter.control(nidaqmx.constants.TaskMode.TASK_UNRESERVE)
        #  gate_task.control(nidaqmx.constants.TaskMode.TASK_UNRESERVE)

     data=signal_counts(cps,count_per_average)
     counter.close()
     gate_task.close()
     return data


# Function to Modify the Data
def signal_counts(all_counts,counts_in_one_average,*args):
     all_counts=np.array(all_counts)
     no_of_averages=int(len(all_counts)/counts_in_one_average)
     print("Crosscheck number of averges=",no_of_averages)

     # Changing the cumulative counts to actual counts
     cumulative_counts = np.reshape(all_counts,(no_of_averages,counts_in_one_average))
     modified_matrix = np.delete(cumulative_counts, -1, 1)
     zero_array = np.zeros(no_of_averages, dtype=int)
     new_matrix = np.hstack((zero_array[:, np.newaxis], modified_matrix))
     actual_counts = np.subtract(cumulative_counts,new_matrix)
     averaged_actual_counts = np.mean(actual_counts,axis=0)

     # Separating Reference and Signal and averaging over Samples
     reference_samples = np.mean(np.reshape(averaged_actual_counts[::2],(steps-1,samples)),axis=1)
     signal_samples = np.mean(np.reshape(averaged_actual_counts[1::2],(steps-1,samples)),axis=1)

     signal_photon = signal_samples/reference_samples
     return signal_photon

# Curve_fitting Function
def curveFit(x,y0,y_max,tau):
    return y0+y_max*np.exp(-x/tau)

# Function to calculate the Lifetime
def lifetime_fit(*args):
    indices = np.where(y_old!=0)
    yOld = y_old[indices]; xOld = x_old[indices]
    if type(fit_range)==np.ndarray:
        range_indicies = np.where(np.logical_and(xOld>=fit_range[0],xOld<=fit_range[1]))
        x_old_ranged = xOld[range_indicies]; y_old_ranged = yOld[range_indicies]
        coefficient, covariance_matrix = curve_fit(curveFit,x_old_ranged,y_old_ranged,p0=guess_params,absolute_sigma=False)
        error_bars = np.sqrt(np.diag(covariance_matrix))
        condition_number =  np.format_float_scientific(np.linalg.cond(covariance_matrix),precision=2)
        x_curve_fit = x_old_ranged; y_curve_fit = y_old_ranged
    else:
        coefficient, covariance_matrix = curve_fit(curveFit,xOld,yOld,p0=guess_params,absolute_sigma=False)
        error_bars = np.sqrt(np.diag(covariance_matrix))
        condition_number =  np.format_float_scientific(np.linalg.cond(covariance_matrix),precision=2)
        x_curve_fit = xOld; y_curve_fit = yOld

    x_new = x_curve_fit
    y_new = curveFit(x_new,*coefficient)

    # Different ways of 'Goodness of Fit' Test
    chi_square_test, p_value = stats.chisquare(y_curve_fit, y_new)
    ss_res = np.sum(np.square(y_curve_fit-y_new )); ss_total = np.sum(np.square(y_curve_fit-np.mean(y_curve_fit)))
    r_squared = 1-(ss_res/ss_total)
    mean_squared_error = np.square(np.subtract(y_new,y_curve_fit)).mean()

    print(f'Lifetime in nano_second is : {coefficient[2]}.\n')
    print(f'Chi_square, p-value, R_squared,MeanSquaredError and Condition Number are : {np.round(chi_square_test,3)}\t{np.round(p_value,3)}\
    \t{np.round(r_squared,3)}\t{np.round(mean_squared_error,5)}\t{condition_number}.\n')
    if p_value<=0.05:
        print('The p_value of fitting is low. Please check the fitting!')
    return xOld,yOld,x_new,y_new,coefficient,error_bars

# Replacing mistakes in file naming
def replace_space(name):
    name = name.replace(' ', '_').replace('.','_').replace('__','_').replace('___','_')
    if name[-1]=='_':  name=name[:-1]
    return name

# # Tkinter Messagebox
# def tkinter_permission():
#     root = tk.Tk()
#     root.title('Permission accesss')
#     root.geometry('150x150')
#     root.eval('tk::PlaceWindow . center')
#     def confirmation():
#         globals()['permission'] = askyesno(title = 'Confirmation of overwriting',message='Do you want to overwrite it?')
#         if globals()['permission']:
#             root.destroy()
#         else:
#             print('Data file has not been saved')
#             root.destroy()
#     root_button = Button(root,text='The file name already exists!',command=confirmation)
#     root_button.pack(side='top')
#     root.mainloop()


# Saving file in given directory
def file_save(directory_name,file_name,contents):
    file_name = file_name+'.txt'
    if not os.path.exists(directory_name):
        os.makedirs(directory_name)
    total_path = os.path.join(directory_name, file_name)
    if os.path.exists(total_path)==True:
        # tkinter_permission()
        # if globals()['permission']==0:
        #     return 0
        print('The same file name already exist. Do you want to overwrite it?\n')
        overwrite_permission = input('Type 0 or 1 : ')
        if int(overwrite_permission)==0:
            raise Exception('New data file has not been saved')
        else:
            print('File is going to be overwritten.')
    np.savetxt(total_path,np.transpose(contents),newline='\n') # saving in column mode
    if os.path.exists(total_path)==False:
        raise Exception('Saved file does not exist!\n')
    elif os.stat(total_path).st_size == False:
        raise Exception('Saved file is empty!\n')
    else:
        print(f'Saving data_file {file_name} is successful!\n')

# Saving Image in given directory
def image_save(directory_name,file_name,fig_to_save,extension):
    file_name = file_name+'.'+extension
    if not os.path.exists(directory_name):
        os.makedirs(directory_name)
    total_path = os.path.join(directory_name, file_name)
    if os.path.exists(total_path)==True:
        print('The same file name already exist. Do you want to overwrite it?\n')
        permission = input('Type 0 or 1 : ')
        if int(permission)==0:
            raise Exception('New image has not been saved.')
        else:
            print('Image is going to be overwritten.')
    if str(extension)=='html':
        fig.write_html(total_path)                          # saving image in 'html' format
    else:
        fig.write_image(total_path)                         # saving image in mentioned static format
    if os.path.exists(total_path)==False:
        raise Exception('Saved image does not exist!\n')
    elif os.stat(total_path).st_size == False:
        raise Exception('Saved image is empty!\n')
    else:
        print(f'Saving Image {file_name} is successful!\n')

#### Fig Template

In [None]:
fig_template = go.layout.Template()
fig_template.layout = {
    'template': 'simple_white+presentation',
    'autosize': False,
    'width': 800,
    'height': 600,
    # 'opacity': 0.2,
    'xaxis': {
        'ticks': 'inside',
        'mirror': 'ticks',
        'linewidth': 1.5+0.5,
        'tickwidth': 1.5+0.5,
        'ticklen': 6,
        'showline': True,
        'showgrid': False,
        'zerolinecolor': 'white',
        },
    'yaxis': {
        'ticks': 'inside',
        'mirror': 'ticks',
        'linewidth': 1.5+0.5,
        'tickwidth': 1.5+0.5,
        'ticklen': 6,
        'showline': True,
        'showgrid': False,
        'zerolinecolor': 'white'
        },
    'font':{'family':'mathjax',
            'size': 22,
            }
}

#### Setting Parameters and getting Data

In [None]:
# Parameters Used (All times are in nanosecond range)
# Please choose accordingly so that it is multiple of 8ns.
# We will not get the signal for lifetime measurement up to (delay1+delay2+read_on).

delay1=16               # delay between reference and signal
delay2=16               # delay around laser_on time
delay3=16               # delay between laser_start and reference for seqRead
triggerTimingDelay=8    # delay between
laser_on=200*1000; read_on=20*100; Tmax=200*1000
steps=10; samples=400; averages=4
laser_port=0; trigger_port=1; timing_port=2
timeRange=np.linspace(0,Tmax,num=steps)

allowed_steps = timeRange[-1]/(2*read_on+delay1)
if timeRange.shape[0] > allowed_steps:
  raise Exception(f'Please reduce the steps or read_on time. \nMaximum allowed steps: {allowed_steps}')

pulse_args = [delay1,delay2,delay3,laser_on,timeRange,triggerTimingDelay,read_on,averages,samples,steps,laser_port,trigger_port,timing_port]

In [None]:
# Data Collection
signal_data  = lifetime(*pulse_args)

#### Saving files and Plotting

In [None]:
y_old = signal_data
x_old = delay1+delay2+read_on+timeRange[:-1]    # as we are neglecting the last step
guess_params = np.array([0.3,1,50*1e3])         # (y0,y_max,tau)
# fit_range = np.array([0,400])*1e3             # provide (x_min,x_max) or 'False'(for all x_range)
fit_range = False

# fitting curve
xOld,yOld,xNew,yNew,coefficient,error_bars = lifetime_fit(x_old,y_old,guess_params,pulse_args,fit_range)

In [None]:
# Saving Data and Images in the mentined folder
# Keep 'r' before Directory name or use "C:\\Users\...."
directory_name = r"C:\Users\Administrator\Desktop\Lifetime_Measurement\Exp_Data\year_2024\9march\with rodamine die"      # Experimental Data Directory
image_directory_name = r"C:\Users\Administrator\Desktop\Lifetime_Measurement\Images\year_2024\9march\with rodamine die"  # Image Directory
file_name ='photonic nanojet_focus on sphere 7'

file_name = replace_space(file_name)
directory_name = replace_space(directory_name)
contents = np.array([xOld,yOld])
file_save(directory_name,file_name,contents)                                                                             # saving the data file

In [None]:
# Plotting the Fitted Curve
fig = go.Figure()

fig.add_scatter(x=xNew,y=curveFit(xNew,*(coefficient+error_bars)),mode='lines',line=dict(width=0.01),name='plus_one_std')
fig.add_scatter(x=xNew,y=curveFit(xNew,*(coefficient-error_bars)),mode='lines',line=dict(width=0.01),name='minus_one_std',fill='tonexty',fillcolor='rgb(211, 211, 211)')
fig.add_scatter(x=xNew,y=yNew,mode='lines',line=dict(color="royalblue"),name='Fitted Curve')
fig.add_scatter(x=x_old,y=y_old,mode='markers',marker=dict(color="royalblue"),name='Experimental Data' )

fig.update_layout(template = fig_template,width=800,height=600)
fig.update_xaxes(title_text = "Time (ns)"); fig.update_yaxes(title_text = "Signal_counts")
fig.add_annotation(
    xref="x domain", yref="y domain",align = "left", x=0.95, y=0.95,
    text=f"<b>y<sub>0</sub></b> : {np.round(coefficient[0],3)} &plusmn; {np.round(error_bars[0],3)}\
    <br><b>y<sub>max</sub></b> : {np.round(coefficient[1],3)} &plusmn; {np.round(error_bars[1],3)}\
    <br><b>\u03C4</b> : {np.round(coefficient[2]/1e3,3)} &plusmn; {np.round(error_bars[2]/1e3,3)} &mu;s",
    showarrow=False, font_family="Times New Roman",font_size=20
)

# allowed saving formats : 'html','svg','pdf','png','jpeg','webp'
image_save(image_directory_name,file_name,fig_to_save=fig,extension='html')
fig.show()