In [419]:
import h5py
from matplotlib import pyplot as plt 
import numpy 
import numpy as np 
import scipy 
import ast
import threading
from multiprocessing import  Process 
import time
import random
from multiprocessing import Queue
import os 

In [420]:
# The default length unit is always um 
minimum_step=0.001
minimum_stream_step=0.001

## Basic Parameters

In [421]:
# Open the parameter text file 
with open("parameter.txt","r") as f : 
    parameter=ast.literal_eval(f.read())

parameter['minimum_step']= "{}[um]".format(minimum_step)
parameter['minimum_stream_step']= "{}[um]".format(minimum_stream_step)

In [422]:
# Geometry Scale
print("Length Unit: {} m".format(parameter['length_u']) )
scale=parameter['length_u']*1e6
# Default length unit is um 

Length Unit: 1 m


## Electric field 

In [423]:
# Get the electric field from the h5 file 
h5f= h5py.File("singlecell4.h5","r")
info=np.array(h5f["info"])
h5f.close()

### Re-Scale the coordinate;

In [424]:
info[0]=info[0]*scale
info[1]=info[1]*scale

## Streamline

In [425]:
# Get the electric streamline from the processed h5 file .
h5f= h5py.File("singlecell_stream.h5","r")
stream=np.array(h5f["stream"])
h5f.close()

### Re-Scale the coordinate

In [426]:
stream[0]=stream[0]*scale
stream[1]=stream[1]*scale

In [427]:
int(parameter['number'])

8

In [428]:
info[0].max()

22500.0

# Electron Movement

In [429]:
class single_electron():
    def __init__ (self,info,parameter,stream,):  
        # the basic huge information of the particle 
        self.info=info
        self.parameter=parameter
        self.stream=stream 
        
        self.position_init()
        # sx is to log the history of the particle 
        self.sx=np.array([])
        self.sy=np.array([])
        self.sm=np.array([])
        self.se=np.array([])
        self.sEx=np.array([])
        self.sEy=np.array([])

        # the basic information about the stream 
        self.stream_N=0
        self.arg=0
        self.local_stream= self.stream[0:2,self.stream[2]==self.stream_N]
        self.step_length =1 
        self.status=""
        self.stream_within=1
    
    def scale_convert(self,length_u):
        if length_u=="µm":
            result=1e-6
        elif length_u=="um":
            result=1e-6
        elif length_u=="nm":
            result=1e-9
        elif length_u=='mm':
            result=1e-3
        elif length_u=='m':
            result=1
        else :
            print("Warning!! Unrecognized unit of length_u")
        return result
    
    def parameter_get(self,tar_para):
        # Our basic length unit is um 
        length_u= self.parameter[tar_para].split("[")[1].split("]")[0]
        length_u_n = self.scale_convert(length_u)
        result= float(self.parameter[tar_para].split("[")[0]) * length_u_n * 1e6
        return result        
    
    def position_init(self) :
        simul_n=2
        self.x = 0.5*(info[0].max()+info[0].min()) \
                + simul_n* self.parameter_get("vertical_distance")* (random.random()-0.5)
        self.y= 0.5*info[1].max()
        self.multiply = 0  # we have no photon gain at first. 
        self.electron_N =1 # we have one electron at first .
        self.step_count =0 
        self.vy= -1 
        self.vx= 0
        self.Ex =0 
        self.Ey =0 
    
    # check the stream line that we will use according to the initial position .
    def get_stream(self) : 
        stream_err = (stream[0]-self.x)**2 + (stream[1]-self.y)**2
        self.stream_N = stream[2][stream_err.argmin()]    
    def stream_check(self) : 
        # It is typically that the stream will not end in the multiplier , but that is OK .. 
        # But if the end of the stream line is about 10 times larger than the multiple height , there should be something wrong with it 
        while float(self.stream[1][self.stream[2]==self.stream_N].min()) > 10*self.parameter_get("anode_radius") :
            print("Stream line too short , rechoose it \n")
            self.position_init()
            self.get_stream() 
        #print("Electric Stream : {}".format(self.stream_N))
        self.local_stream= self.stream[0:2,self.stream[2]==self.stream_N]
        self.local_stream[0]= np.flip(self.local_stream[0])
        self.local_stream[1]= np.flip(self.local_stream[1])
    
    
    def check_stop(self):
        wire_num=int(self.parameter["number"])
        vertical_distance= self.parameter_get("vertical_distance")
        anode_radius= self.parameter_get("anode_radius")
        for i in range(wire_num):
            if (self.x- vertical_distance*i)**2 + (self.y-0)**2 < anode_radius**2:
                return 1 
        if self.y<- self.parameter_get("down_distance") : 
            return 2 
        return 0 
        
        
    def get_direction_length(self): 
        if self.arg < len(self.local_stream[1]):
            if self.y>0 :
                # Select nearest point from the downwards points
                stream_now= self.local_stream[:,np.where(self.local_stream[1]<self.y)[0]]
                err= (stream_now[0]-self.x)**2  + (stream_now[1]-self.y)**2 
                tar_arg= err.argmin()
                
                # generate the arg 
                self.arg=np.where(self.local_stream[1]<self.y)[0].min()
                
                try :
                    base_len = np.sqrt((self.local_stream[1][self.arg]- self.local_stream[1][self.arg-1])**2+ \
                                       (self.local_stream[0][self.arg]- self.local_stream[0][self.arg-1])**2)
                except:
                    base_len = np.sqrt((self.local_stream[1][self.arg]- self.local_stream[1][self.arg+1])**2+ \
                                       (self.local_stream[0][self.arg]- self.local_stream[0][self.arg+1])**2)
                
                # Generate the velocity and the step length
                self.vy= (stream_now[1][tar_arg]- self.y) / np.sqrt((stream_now[1][tar_arg]- self.y)**2 +(stream_now[0][tar_arg]- self.x)**2 )
                self.vx= (stream_now[0][tar_arg]- self.x) / np.sqrt((stream_now[1][tar_arg]- self.y)**2 +(stream_now[0][tar_arg]- self.x)**2 )
                #self.step_length = 0.2*(np.sqrt((stream_now[1][tar_arg]- self.y)**2+(stream_now[0][tar_arg]- self.x) **2)) + minimum_stream_step*(1+random.random())/2 + np.abs(self.y)/2000
                self.step_length = 0.2*base_len  \
                            +0.2*(np.sqrt((stream_now[1][tar_arg]- self.y)**2+(stream_now[0][tar_arg]- self.x) **2)) \
                            + minimum_stream_step*(1+random.random())/2 \
                            + np.abs(self.y)/2000
            else : # if it has gone below the 0 (anode position threshold), it will move exactly along the streamline
                # it will move along the trajectory
                try : 
                    self.vy= (self.local_stream[1][self.arg]-self.y)/np.sqrt((self.local_stream[1][self.arg]-self.y)**2 + (self.local_stream[0][self.arg]-self.x)**2 )
                    self.vx= (self.local_stream[0][self.arg]-self.x)/np.sqrt((self.local_stream[1][self.arg]-self.y)**2 + (self.local_stream[0][self.arg]-self.x)**2 )
                    self.step_length= np.sqrt((self.local_stream[1][self.arg]-self.y)**2 + (self.local_stream[0][self.arg]-self.x)**2 )
                    # move to the following point
                    self.arg= self.arg+1
                except :
                    self.vx = -self.Ex / np.sqrt(self.Ex**2 + self.Ey**2)
                    self.vy = -self.Ey / np.sqrt(self.Ex**2 + self.Ey**2)
                    self.step_length= np.abs(self.y)/2000 + minimum_step
        else : 
            self.stream_within=0
            if self.y>4*self.parameter_get("anode_radius"):
                if self.Ex==0  :
                    self.vy= -np.sign(self.y)
                    self.vx= 0 
                elif self.Ey==0 : 
                    self.vy= 0
                    self.vx= -np.sign(self.x- self.info[0].max()/2)
                else : 
                    self.vx = -self.Ex / np.sqrt(self.Ex**2 + self.Ey**2)
                    self.vy = -self.Ey / np.sqrt(self.Ex**2 + self.Ey**2)
                self.step_length= np.abs(self.y)/2000 + minimum_step
            else : 
                self.vx = -self.Ex / np.sqrt(self.Ex**2 + self.Ey**2)
                self.vy = -self.Ey / np.sqrt(self.Ex**2 + self.Ey**2)
                self.step_length= np.abs(self.y)/2000 + minimum_step      
    
    def get_electric(self):
        err = (self.info[0]-self.x)**2 + (self.info[1]-self.y)**2 
        near_arg = err.argmin()
        # Get the electric field
        self.Ey= numpy.nan_to_num(self.info[3])[near_arg]
        self.Ex= numpy.nan_to_num(self.info[2])[near_arg]
        
    
    def renew_status(self):
        self.status="X:{} Y:{} Ex:{} Ey:{} vx:{} vy:{} Multiply:{} Electron:{} Stream:{} Step:{} Within:{}" \
        .format(self.x,self.y,self.Ex,self.Ey,self.vx,self.vy, \
                self.multiply,self.electron_N,self.stream_N,self.step_count,self.stream_within)
    
    def report_status(self):
        self.renew_status()
        print(self.status)
    
    
    def eff_E(self):
        return np.sqrt((self.vx*self.Ex)**2 + (self.vy*self.Ey)**2)
        #if self.stream_within==0 : 
        #    return np.sqrt((self.vx*self.Ex)**2 + (self.vy*self.Ey)**2)
        #else : 
        #    return np.sqrt(self.Ex**2 + self.Ey**2)
    
    def photon_gain(self):
        # According to paper shared about the S2 gain. 
        # The gain rate is three times higher than the original gain rate 
        # \theta_3 2.26 \times 10^{-2} ph/e/(kV/cm*um) = 2.26e-7 ph/e-/(V/m *um )
        # \theta_4 416 kV/cm 
        ph_threshold=4.16e7
        ph_gain_factor=2.26e-7
        eff_E = self.eff_E()
        if eff_E > ph_threshold : 
            return ph_gain_factor*(eff_E-ph_threshold )
        else : 
            return 0 
        
    def electron_gain(self):
        # According to paper shared about the S2 gain
        # \theta_0 charge gain factor 1.46/ (um*e)
        # \theta_1 charge gain slope 2.98e7 V/m
        # \theta_2 charge gain threshold 7.5e7 V/m 
        charge_gain_factor = 1.46
        charge_gain_slope = 2.98e7
        charge_gain_threshold = 7.5e7
        eff_E = self.eff_E()
        if eff_E > 1.01*charge_gain_threshold : 
            return charge_gain_factor*np.exp(- charge_gain_slope/( eff_E -charge_gain_threshold) )
        else : 
            return 0
    
    def single_move(self):
        # Get the basic parameters
        self.get_electric()
        self.get_direction_length()
        photon_rate = self.photon_gain()
        electron_rate = self.electron_gain()
        # Single move
        self.x = self.x + self.vx * self.step_length 
        self.y = self.y + self.vy * self.step_length 
        self.multiply = self.multiply +  np.random.poisson(self.electron_N*photon_rate * self.step_length )
        self.electron_N = self.electron_N + np.random.poisson(self.electron_N*electron_rate * self.step_length)
        # Log the process 
        self.sx = np.append(self.sx,self.x)
        self.sy = np.append(self.sy,self.y)
        self.sm = np.append(self.sm,self.multiply)
        self.se = np.append(self.se,self.electron_N)
        self.sEx= np.append(self.sEx,self.Ex)
        self.sEy= np.append(self.sEy,self.Ey)
        
    def move(self):  
        self.get_stream()
        self.stream_check()
        self.single_move()
        while True:
            self.step_count = self.step_count +1 
            if self.check_stop()!=0:
                break
            elif self.step_count > 80000+ 5*len(self.local_stream[0]):
                print("What happened??")
                break 
            else : 
                self.single_move()
        self.renew_status()
        return self.multiply                                 
                 

# Simulation 

In [430]:
parameter

{'up_radius': '5[um]',
 'down_radius': '5[um]',
 'anode_radius': '9.5[um]',
 'up_distance': '3[mm]',
 'down_distance': '3[mm]',
 'vertical_distance': '3[mm]',
 'up_stagger_distance': '0[mm]',
 'down_stagger_distance': '0[mm]',
 'drift_length': '40[mm]',
 'number': '8',
 'up_voltage': '0[V]',
 'anode_voltage': '9400[V]',
 'down_voltage': '0[V]',
 'cathode_voltage': '100[V]',
 'drift_ratio': '1',
 'cathode_distance': '10[mm]',
 'cathode_height': '1[mm]',
 'length_u': 1,
 'electric_x_u': '(V/m)',
 'electric_y_u': '(V/m)',
 'dimension': 2,
 'stream_number': 801,
 'minimum_step': '0.001[um]',
 'minimum_stream_step': '0.001[um]'}

In [431]:
#Multi Process Module
class simul_process(Process):
    def __init__(self,info,parameter,stream,q,number):
        super(simul_process,self).__init__()
        self.info=info
        self.parameter=parameter
        self.stream=stream
        self.number=number
    def run(self):
        pid= os.getpid()
        for i in range(self.number):
            self.electron=single_electron(self.info,self.parameter,self.stream)
            self.electron.move()
            q.put("{}:{}".format(pid,self.electron.status))

In [432]:
q=Queue()
process_number=100
single_simul_number=30

process_list=[]
for i in range(process_number):
    time.sleep(0.05)
    p= simul_process(info,parameter,stream,q,single_simul_number)
    p.start()
    process_list.append(p)

In [433]:
# Back up Field files
name=time.time()
tar_dir="anode_voltage"

import os 
import shutil
os.mkdir("{}/backup/{}".format(tar_dir,name))
dep_list=["parameter.txt","singlecell4.h5","singlecell_stream.h5"]
for i in dep_list:
    shutil.copy("{}".format(i),"{}/backup/{}/{}".format(tar_dir,name,i))

In [None]:
from tqdm import trange
from tqdm import tqdm 
information_list=[]
for i in trange(process_number*single_simul_number):
    information_list.append(q.get())

 33%|████████████████████████▏                                                 | 983/3000 [12:06<35:09,  1.05s/it]

In [None]:
len(information_list)

In [None]:
import numpy as np
np.where(np.arange(8)<3)[0].min()

## Save simulation result

In [None]:
with open("{}/information{}.txt".format(tar_dir,time.time()),"w") as f:
    f.write("{}\n".format(str(parameter)))
    for i in information_list:
        f.write("{}\n".format(i))

# Simulate a single Electron

In [None]:
single=single_electron(info,parameter,stream)
single.get_stream()
single.stream_check()
single.get_electric()
single.get_direction_length()
single.single_move()
single.move()
single.report_status()
print(single.arg)

In [None]:
plt.plot(single.local_stream[0],single.local_stream[1])
plt.show()

### Plot example electron

In [None]:
plt.figure(dpi=300,figsize=(12,12))
plt.suptitle("Example electron")
plt.subplot(2,2,1)
plt.title("Electron Trajectory")
plt.xlabel("x/um")
plt.ylabel("y/um")
plt.plot(single.sx,single.sy)

plt.subplot(2,2,2)
plt.title("Photon Gain along trajectory")
plt.xlabel("y/um")
plt.ylabel("Photon Gain/ph")
plt.plot(single.sy,single.sm)

plt.subplot(2,2,3)
plt.title("Electron Gain along trajectory")
plt.xlabel("y/um")
plt.ylabel("Electron Gain/e")
plt.plot(single.sy,single.se)

plt.subplot(2,2,4)
plt.title("Electric Field Along trajectory")
plt.xlabel("y/um")
plt.ylabel("Electric Field Ey")
plt.plot(single.sy,single.sEy)

plt.show()