Here I'll make an example of making the pmt channel and timing instructions a seperate dtype

In [1]:
import strax
import wfsim
import cutax
import numpy as np

*** Detector definition message ***
You are currently using the default XENON10 template detector.



In [2]:
st = cutax.contexts.xenonnt_sim_SR0v0_cmt_v5()
st.register(wfsim.strax_interface.RawRecordsFromFaxOpticalNT)

  plugins = self._get_plugins((target,), run_id)
  plugins = self._get_plugins((target,), run_id)
  plugins = self._get_plugins((target,), run_id)
  plugins = self._get_plugins((target,), run_id)
  plugins = self._get_plugins((target,), run_id)


wfsim.strax_interface.RawRecordsFromFaxOpticalNT

In [3]:
@strax.takes_config(
strax.Option('instructions',track=False),
strax.Option('timings',track=False),
strax.Option('channels',track=False),
strax.Option('max_length',track=False),
strax.Option('instructions_per_chunk',default=False,track=False))
class PulseInstructions(strax.Plugin):
    depends_on=()
    data_kind='wfsim_instructions'
    is_ready=True
    max_chunks=1
    _source_finished=False
    
    def setup(self,):
        if self.config['instructions_per_chunk']:
            self.max_chunks= np.ceil(len(self.config['instructions'])/self.config['instructions_per_chunk'])
        else:
            self.config['instructions_per_chunk']=len(self.config['instructions'])
    
    def infer_dtype(self,):
        max_num_photons = self.config['max_length']
        dtype = wfsim.strax_interface.instruction_dtype+ \
                wfsim.strax_interface.optical_extra_dtype+ \
                [('timings',max_num_photons),('channels',max_num_photons),('endtime',np.int64)]
        return dtype
    
    def compute(self,chunk_i):
        result=np.zeros(self.config['instructions_per_chunk'],dtype=self.dtype)
        
        start = chunk_i*self.config['instructions_per_chunk']
        stop = (chunk_i+1)*self.config['instructions_per_chunk']
        
        if chunk_i==self.max_chunks:
            stop=len(self.config['instructions'])+1
        
        for d in self.config['instructions'].dtype.names:
            result[d]=self.config['instructions'][start:stop][d]
        result['timings']=self.config['timings'][start:stop]
        result['channels']=self.config['channels'][start:stop]
        result['endtime']= np.max(result['timings'])
        return self.chunk(start=result['time'][0],end=result['endtime'][-1],data=result)
    
    def is_ready(self,chunk_i):
        if chunk_i==self.max_chunks:
            self._source_finished=True
        return not chunk_i==self.max_chunks
    
    def source_finished(self,):
        return self._source_finished

In [4]:
def get_s1_photons(instruction,config,resource):
    
    t = instruction['time']
    x = instruction['x']
    y = instruction['y']
    z = instruction['z']
    n_photons = instruction['amp']
    recoil_type = instruction['recoil']
    positions = np.array([x, y, z]).T  # For map interpolation
    n_photon_hits = wfsim.S1.get_n_photons(n_photons=n_photons,
                                       positions=positions,
                                       s1_lce_correction_map=resource.s1_lce_correction_map,
                                       config=config)

    # The new way interpolation is written always require a list
    
    _photon_channels = wfsim.S1.photon_channels(positions=positions,
                                                 n_photon_hits=n_photon_hits,
                                                 config=config, 
                                                 s1_pattern_map=resource.s1_pattern_map)

    _photon_timings = wfsim.S1.photon_timings(t=t,
                                               n_photon_hits=n_photon_hits, 
                                               recoil_type=recoil_type,
                                               config=config,
                                               phase='liquid',
                                               channels=_photon_channels,
                                               positions=positions,
                                               resource=resource)

    # Sorting times according to the channel, as non-explicit sorting
    # is performed later and this breaks timing of individual channels/arrays
    sortind = np.argsort(_photon_channels)

    _photon_channels = _photon_channels[sortind]
    _photon_timings = _photon_timings[sortind]
    return _photon_channels,_photon_timings

In [5]:
def get_s2_photons(instructions,config,resource):
    if len(instructions.shape) < 1:
        # shape of recarr is a bit strange
        instruction = np.array([instructions])

    _, _, t, x, y, z, n_electron, recoil_type, *rest = [
        np.array(v).reshape(-1) for v in zip(*instructions)]

    # Reverse engineerring FDC
    if config['field_distortion_on']:
        z_obs, positions = inverse_field_distortion(x, y, z, resource=resource)
    else:
        z_obs, positions = z, np.array([x, y]).T

    sc_gain = wfsim.S2.get_s2_light_yield(positions=positions,
                                      config=config,
                                      resource=resource)

    n_electron = wfsim.S2.get_electron_yield(n_electron=n_electron,
                                         positions=positions,
                                         z_obs=z_obs,
                                         config=config,
                                         resource=resource)

    # Second generate photon timing and channel
    _electron_timings, _photon_timings, _instruction = wfsim.S2.photon_timings(t, n_electron, z_obs,
                                                                                          positions, sc_gain,
                                                                                          config=config,
                                                                                          resource=resource,
                                                                                          phase='gas')

    _photon_channels, _photon_timings = wfsim.S2.photon_channels(n_electron=n_electron,
                                                                       z_obs=z_obs,
                                                                       positions=positions,
                                                                       _photon_timings=_photon_timings,
                                                                       _instruction=_instruction,
                                                                       config=config,
                                                                       resource=resource)
    return _photon_channels,_photon_timings

In [10]:
def instructions_function(c):
    '''
    So we we'll need to dump everything for the generation of the instruction. 
    This we will pass to the strax class for saving in the end
    Because we need to specify the dtype in advance for strax we need to calculate everything here,
    and pass to strax the final result. Note that the array we'll make be the size of the largest timing and channel array
    '''
    run_id='MakeSureToSetThisToSomething_01'
    # st = cutax.contexts.xenonnt_sim_SR0v0_cmt_v5()
    st.register(PulseInstructions)
    import nestpy
    half_life = 156.94e-9 #Kr intermediate state half-life in ns
    decay_energies = [32.2,9.4] # Decay energies in kev
    
    n = c['nevents'] = c['event_rate'] * c['chunk_size'] * c['nchunk']
    c['total_time'] = c['chunk_size'] * c['nchunk']

    instructions = np.zeros(4 * n, dtype=wfsim.instruction_dtype+wfsim.optical_extra_dtype)
    instructions['event_number'] = np.digitize(instructions['time'],
         1e9 * np.arange(c['nchunk']) * c['chunk_size']) - 1
    
    instructions['type'] = np.tile([1, 2], 2 * n)
    instructions['recoil'] = [7 for i in range(4 * n)]
    
    r = np.sqrt(np.random.uniform(0, 2500, n))
    t = np.random.uniform(-np.pi, np.pi, n)
    instructions['x'] = np.repeat(r * np.cos(t), 4)
    instructions['y'] = np.repeat(r * np.sin(t), 4)
    instructions['z'] = np.repeat(np.random.uniform(-80, 0, n), 4)
    
    #To get the correct times we'll need to include the 156.94 ns half life of the intermediate state.

    uniform_times = c['total_time'] * (np.arange(n) + 0.5) / n
    delayed_times = uniform_times + np.random.exponential(half_life/np.log(2),len(uniform_times))
    instructions['time'] = np.repeat(list(zip(uniform_times,delayed_times)),2) * 1e9


    # Here we'll define our XENON-like detector
    nc = nestpy.NESTcalc(nestpy.VDetector())
    A = 131.293
    Z = 54.
    density = 2.862  # g/cm^3   #SR1 Value
    drift_field = 82  # V/cm    #SR1 Value
    interaction = nestpy.INTERACTION_TYPE(7)
    
    energy = np.tile(decay_energies,n)
    quanta = []
    for en in energy:
        y = nc.GetYields(interaction,
                         en,
                         density,
                         drift_field,
                         A,
                         Z,
                         (1, 1))
        q = nc.GetQuanta(y, density)
        quanta.append(q.photons)
        quanta.append(q.electrons)
    instructions['amp'] = quanta
    
    c['turned_off_pmts'] = []
    resource = wfsim.load_resource.load_config(c)
    s1_channels, s1_timings = get_s1_photons(instructions[instructions['type']==1],c,resource)
    s2_channels, s2_timings = get_s2_photons(instructions[instructions['type']==2],c,resource)
    
    #hacky way to make the simulator work, with s2s it will crash. but we don't use that part anyway
    instructions['type']=1
    
    timings=np.concatenate((s1_timings,s2_timings))
    channels=np.concatenate((s1_channels,s2_channels))
    
    sortind = np.argsort(timings)
    
    timings=timings[sortind]
    channels=channels[sortind]
    
    max_photons=np.max(timings)
    wfsim.optical_adjustment(instructions,timings,channels)
    
    st.set_config(dict(instructions=instructions,
                       timings=timings,
                       channels=channels,
                       max_length=max_photons))
    st.make(run_id,'pulse_instructions')
    return instructions,channels,timings

In [11]:
wfsim.strax_interface.read_optical=instructions_function
st.set_config(dict(nevents=5,
                   nchunk=1,
                   chunk_size=1,
                   fax_file='bla.root',
                   fax_config_override=dict(s1_model_type='simple',
                                            enable_pmt_afterpulses=False,
                                            enable_electron_afterpulses=False)))
st.make('bla','raw_records')

Removing old incomplete data in ./strax_data/bla-raw_records_he-4w2cexf5p7
Removing old incomplete data in ./strax_data/bla-truth-4w2cexf5p7
Removing old incomplete data in ./strax_data/bla-raw_records-4w2cexf5p7
Removing old incomplete data in ./strax_data/bla-raw_records_aqmon-4w2cexf5p7




Simulating Raw Records:   0%|          | 0/1000 [00:00<?, ?it/s]

Source finished!
