In [37]:
import pandas as pd
import numpy as np
import os, sys
import random
import math
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from scipy import stats
import itertools
import json
from scipy.optimize import minimize
from PIL import Image
import time

In [9]:
testing = False

# read in the working directories and iteration number
if len(sys.argv) == 4:
    # set locations for working files
    automation_dir = sys.argv[1]
    num_sims = int(sys.argv[2])
    iteration = int(sys.argv[3])
else:
    if testing:
        print("TESTING MODE")
        automation_dir = '/mnt/analysis/e17023/Adam/GADGET2/'
        num_sims = 4
        iteration = 0
        print(f"Using default directory, {num_sims} sims, iteration {iteration}")
    else:
        print("Usage: python tuning.py <automation_dir> <num_sims> <iteration>")
        raise ValueError("Incorrect number of arguments passed to tuning.py")

TESTING MODE
Using default directory, 4 sims, iteration 0


In [15]:
def initialize_files():
    param_df = pd.DataFrame(columns=['Sim', 'Status', 'P0', 'E0', 'P1', 'E1', 'N', 'Score'])
    tuning_log = {
        'VarParameters' : { # initial values of tuning parameters, changed to best fit values after each iteration
            'Threshold' : 86,
            'EIonize' : 22.3,
            'Fano' : 0.24,
            'CoefL' : 0.000114,
            'CoefT' : 0.00284,
            'Gain' : 8000,
            'GETGain' : 120,
            'PeakingTime' : 1014
        },
        'FixedParameters' : { # parameters that are fixed for all simulations
            'Xb' : 99,
            'Yb' : 2,
            'Seed' : 2,
            'CD' : 1,
            'CDH' : 1,
            'GasPressure' : 800
        },
        'N': 100, # number of events per simulation
        'BestN' : 5, # number of best simulations to use for next iteration
        'VarRange' : 0.1, # maximum range of single step variation
        'IntParams' : ['Threshold', 'Gain', 'GETGain', 'PeakingTime'], # parameters that must be integers
        'Weights' : {}, # weights for each attribute in the score function
        'TuningParticles' : [],
        'MaxIterations' : 1000
    }
    
    # save tuning information to json file for future iterations
    with open(automation_dir + 'tuning_log.json', 'w') as f:
        json.dump(tuning_log, f, indent=4)
    
    # add blank columns for each parameter and save to csv
    for param in tuning_log['VarParameters'].keys():
        param_df[param] = np.nan
    for param in tuning_log['FixedParameters'].keys():
        param_df[param] = np.nan
    
    param_df.to_csv(automation_dir + 'parameters.csv', index=False)
    print('Tuning Files Initialized')

In [16]:
if iteration == 0:
    initialize_files()
    print('Confirm all particles to tune against are listed in Analysis/real_dirs.csv')
    input()
    
else:
    tuning_log = json.load(open(automation_dir + 'tuning_log.json'))
    param_df = pd.read_csv(automation_dir + 'parameters.csv')

Tuning Files Initialized


In [36]:
# IMAGE PROCESSING FUNCTIONS
def get_energy(image, scale=3000):
    # extract energy bar from image
    ebar_bounds = ((5,8),(145,17))
    ebar = image[ebar_bounds[0][0]:ebar_bounds[1][0], ebar_bounds[0][1]:ebar_bounds[1][1], :]

    ebar_slice = np.array([np.mean(ebar[i,1,:]) for i in range(ebar.shape[0])]) # 1d slice of energy bar
    for i in range(ebar_slice.shape[0]):
        if ebar_slice[i] != 255:
            break
    proportion_filled = 1 - (i-1)/ebar_slice.shape[0] # proportion of energy bar filled (0-1)
    event_energy = (proportion_filled * scale) # scale to max energy
    event_energy += 27.766 # offset to match data
    return event_energy
def get_track(image):
    # extract padplane from image
    padplane_bounds = ((3,40),(148,185))
    padplane = image[padplane_bounds[0][0]:padplane_bounds[1][0], padplane_bounds[0][1]:padplane_bounds[1][1], :]
    
    # extract track from padplane
    track = padplane[:,:,0].copy() # copy red channel for track
    track[track == 255] = 0 # set white to black
    track_bounds = np.where(track != 0) # get track bounds
    track_bounds = ((min(track_bounds[0]), max(track_bounds[0])+1), (min(track_bounds[1]), max(track_bounds[1])+1))
    track = track[track_bounds[0][0]:track_bounds[0][1], track_bounds[1][0]:track_bounds[1][1]] # crop track
    track = track[::4,::4] # downsample track to remove grid effect
    return track
def get_trace(image):
    trace_img = image[150:,:,0] # extract trace from image
    trace_cumsum = np.cumsum(255-trace_img, axis=0) # cumulative sum of trace
    trace = trace_cumsum[-1,:].astype(float) # height of trace at each pixel
    
    trace_diff = np.abs(np.diff(trace))
    edges = np.arange(trace_diff.shape[0])[trace_diff > 100] # find edges of trace
    
    # crop trace_height to edges
    trace = trace[edges[0]+5:edges[-1]-5]
    
    # set baseline to average of first and last 10 pixels
    baseline = np.mean(np.concatenate((trace[:10], trace[-10:])))
    trace -= baseline # subtract baseline
    trace[trace < 0] = 0 # set negative values to 0
    
    return trace
def analyze_trace(trace):
    tsum1 = np.cumsum(trace)
    tsum2 = np.cumsum(trace[::-1])[::-1]
    # find edges of trace peak
    cutoff=np.mean(trace) 
    ledge=np.arange(tsum1.shape[0])[tsum1 >= cutoff][0]
    redge=np.arange(tsum2.shape[0])[tsum2 >= cutoff][-1]
    
    trace_width = redge - ledge # width of trace peak
    
    trace = trace[ledge:redge] # crop trace to edges
    
    trace_max = np.max(trace) # peak height of trace
    trace_avg = np.mean(trace) # average height of trace (ignoring baseline)
    
    # determine number of peaks in trace
    trace_diff = np.diff(trace)
    trace_diff = np.convolve(trace_diff, np.ones(5), mode='same') # smooth trace_diff with moving average
    trace_diff[trace_diff <= 0] = -1 # set negative values to -1
    trace_diff[trace_diff > 0] = 1 # set positive values to 1
    trace_diff = -1*np.diff(trace_diff) # separate to only look for changes in slope direction
    num_peaks = np.sum(trace_diff > 0) # number of peaks in trace
    
    return trace_width, trace_max, trace_avg, num_peaks
def analyze_track(track):
    length = (track.shape[0]**2 + track.shape[1]**2)**0.5 # length of track
    num_pads = track[track>0].reshape(-1).shape[0] # pads in track
    width = num_pads / length # width of track
    
    # number of pixels in track larger than all surrounding pixels in 3x3 window
    num_peaks = np.sum(track[1:-1,1:-1] > np.max(np.array([track[:-2,:-2], track[:-2,1:-1], track[:-2,2:], track[1:-1,:-2], track[1:-1,2:], track[2:,:-2], track[2:,1:-1], track[2:,2:]]), axis=0))
    
    num_noise = 0
    # look for free-standing pads with no neighbors
    track = np.pad(track, ((1,1),(1,1)), mode='constant', constant_values=0) # pad track with 0s
    for i in range(1,track.shape[0]-1):
        for j in range(1,track.shape[1]-1):
            if track[i,j] > 0 and np.sum(track[i-1:i+2,j]) == track[i,j] and np.sum(track[i,j-1:j+2]) == track[i,j]:
                # not including diagonal neighbors
                num_noise += 1
    num_peaks -= num_noise # subtract free-standing pads from num_peaks
    
    # pad energy statistics
    track = track[track > 0] # remove 0s
    max_pad = np.max(track) # highest measured pad energy
    min_pad = np.min(track) # lowest measured pad energy
    avg_pad = np.mean(track) # average pad energy    
    return length, width, num_pads, num_peaks, max_pad, min_pad, avg_pad, num_noise
def get_event_length(length, trace_width):
    # weight of trace in length calculation
    # obtained by minimizing the standard deviation of the length calculation for events of the same energy
    trace_weight = 0.59176
    
    scale = 1/2.2 # scale factor for length calculation (pads to mm)
    overshoot = 0 # overshoot of length calculation (mm)
    
    return scale*(length**2 + trace_weight*trace_width**2)**0.5 - overshoot

def Analyze_Image(file_dir):
    img_array = np.array(Image.open(file_dir))[:,:,:3]
    event_energy = get_energy(img_array)
    track = get_track(img_array)
    trace = get_trace(img_array)

    # normalize energy
    track = event_energy * track / np.sum(track) # assumes all energy is represented in track pixels, bad with high threshold
    trace = event_energy * trace / np.sum(trace)

    trace_width, trace_max, trace_avg, trace_peaks = analyze_trace(trace)
    track_length, track_width, num_pads, track_peaks, max_pad, min_pad, avg_pad, num_noise = analyze_track(track)

    event_length = get_event_length(track_length, trace_width)
    num_peaks = np.max((trace_peaks, track_peaks))

    attributes = {
        'Energy' : event_energy,
        'Length' : event_length,
        'Width' : track_width,
        'NumPads' : num_pads,
        'NumPeaks' : num_peaks,
        'MaxPad' : max_pad,
        'MinPad' : min_pad,
        'AvgPad' : avg_pad,
        'NumNoise' : num_noise,
        'TraceWidth' : trace_width,
        'TraceMax' : trace_max,
        'TraceAvg' : trace_avg,
        'TracePeaks' : trace_peaks,
        'TrackLength' : track_length,
        'TrackPeaks' : track_peaks
    }
    
    return attributes

In [23]:
def filter_events(event_list):
    for event in event_list:
        # events with NumNoise > 0 are messy, remove them
        if event['NumNoise'] > 0:
            event_list.remove(event)
            
    return event_list

In [34]:
if iteration == 0: # analyze target images for scoring reference
    tuning_dirs = pd.read_csv(f"{automation_dir}Analysis/real_dirs.csv", index_col=0).to_dict()['dir']
    tuning_log = json.load(open(automation_dir + 'tuning_log.json', 'r'))
    param_df = pd.read_csv(automation_dir + 'parameters.csv')
    
    tuning_log['TuningParticles'] = list(tuning_dirs.keys()) # list of particles to tune against
    
    # validate that all tuning particles have matches and images exist
    for particle in tuning_log['TuningParticles']:
        if not os.path.isdir(tuning_dirs[particle]):
            raise ValueError(f"Directory {tuning_dirs[particle]} not found")
        if len(os.listdir(tuning_dirs[particle])) == 0:
            raise ValueError(f"No images found in {tuning_dirs[particle]}")
    
    tuning_log['TargetAttributes'] = {} # initialize target attributes
    
    for particle in tuning_log['TuningParticles']:
        print(f"Analyzing target {particle} images")
        tuning_log['TargetAttributes'][particle] = {} # attributes for each particle type
        event_list = []
        for file in os.listdir(tuning_dirs[particle]):
            if file.endswith('.png'):
                event_list.append(Analyze_Image(tuning_dirs[particle] + file))
        
        # filter out events with bad attributes
        for event in event_list:
            event_list = filter_events(event_list)
        
        # average attributes of all images
        for attribute in event_list[0].keys():
            tuning_log['TargetAttributes'][particle][attribute] = np.mean([event_list[i][attribute] for i in range(len(event_list))])
            tuning_log['Weights'][attribute] = 1 # initialize weights to 1 for all attributes
        
        # save target attributes to json file
        with open(automation_dir + 'tuning_log.json', 'w') as f:
            json.dump(tuning_log, f, indent=4)
        
        # save target attributes to csv file
        attribute_df = pd.DataFrame(tuning_log['TargetAttributes']).T
        attribute_df.index.name = 'Sim'
        attribute_df.to_csv(automation_dir + 'AttributesLog.csv', index=True)
    
    # set default weights to 0 for noise, peaks, and energy attributes
    for attribute in tuning_log['Weights'].keys():
        if 'noise' in attribute.lower():
            tuning_log['Weights'][attribute] = 0
        if 'peaks' in attribute.lower():
            tuning_log['Weights'][attribute] = 0
        if 'energy' in attribute.lower():
            tuning_log['Weights'][attribute] = 0
    
    with open(automation_dir + 'tuning_log.json', 'w') as f:
        json.dump(tuning_log, f, indent=4)
    
    # prompt user to modify tuning_log.json as needed
    print('Tuning Log Initialized')
    print('Please modify tuning_log.json as needed, then press enter to continue')
    input()

{'806p': '/mnt/analysis/e21072/h5test/run_0277/len90_ic600000_pads21_eps5_samps5_poly2/673876CUT_Date_12_20_2023/', '1682p': '/mnt/analysis/e21072/h5test/run_0277/len90_ic600000_pads21_eps5_samps5_poly2/241372CUT_Date_12_20_2023/'}
Analyzing target 806p images
Analyzing target 1682p images


In [35]:
def Scoring_Function(attributes, particle, tuning_log):
    # calculate score for each attribute
    score = 0
    for attribute in attributes.keys():
        weight = tuning_log['Weights'][attribute]
        target = tuning_log['TargetAttributes'][particle][attribute]
        # Square deviation of attribute from target value (lower is better)
        score += weight * (attributes[attribute] - target)**2
        if target != 0:
            score *= 1/target # normalize score to target value
        
    return score

In [74]:
def Score_Simulations(param_df=param_df, image_dir = automation_dir+'Output/images/', tuning_log=tuning_log, automation_dir=automation_dir):
    full_image_list = [i for i in os.listdir(image_dir) if i.endswith('.png')]
    
    Attribute_df = pd.read_csv(automation_dir + 'AttributesLog.csv')
    
    for index, row in param_df.iterrows():
        if row['Score'] == -1 and row['Status'] == 4: # if a completed simulation has not been scored
            sim_name = row['Sim']
            sim_image_list = [i for i in full_image_list if i.split('_image_')[0] == sim_name]
            
            ptype = ''
            if row['E0'] > 1:
                ptype += f"{int(row['E0'])}{row['P0']}"
            if row['E1'] > 1:
                ptype += f"{int(row['E1'])}{row['P1']}"
            
            sim_events = []
            for image in sim_image_list:
                sim_events.append(Analyze_Image(image_dir + image))
            sim_events = filter_events(sim_events)
            
            sim_attributes = {}
            for attribute in sim_events[0].keys():
                sim_attributes[attribute] = np.mean([sim_events[i][attribute] for i in range(len(sim_events))])
            
            # add row for simulation to attribute log
            Attribute_df.loc[len(Attribute_df)] = np.nan # add row to attribute log
            Attribute_df.loc[len(Attribute_df)-1, 'Sim'] = sim_name
            for attribute in sim_attributes.keys():
                Attribute_df.loc[len(Attribute_df)-1, attribute] = sim_attributes[attribute]
            
            # score the simulation
            score = Scoring_Function(sim_attributes, ptype, tuning_log)
            
            # save score to parameter file
            param_df.loc[index, 'Score'] = score
        
        Attribute_df.to_csv(automation_dir + 'AttributesLog.csv', index=False)
        
    return param_df

In [None]:
def initialize_sim(sim_name, ptype, tuning_log=tuning_log, param_df=param_df):
    if sim_name not in param_df['Sim'].values:
        E0 = 0
        E1 = 0
        # NEED TO UPDATE THIS IN FUTURE TO SUPPORT MORE THAN P,A,PA EVENTS
        if 'p' in ptype: # proton present
            E0 = float(ptype.split('p')[0])
        if 'a' in ptype: # alpha present
            E1 = float(ptype.split('a')[0].split('p')[-1])
            
        param_df.loc[len(param_df)] = np.nan # add row to parameter file
        
        # set parameters to default values
        param_df.loc[len(param_df)-1, 'Sim'] = sim_name
        param_df.loc[len(param_df)-1, 'Status'] = 0
        param_df.loc[len(param_df)-1, 'P0'] = 'p'
        param_df.loc[len(param_df)-1, 'E0'] = E0
        param_df.loc[len(param_df)-1, 'P1'] = 'a'
        param_df.loc[len(param_df)-1, 'E1'] = E1
        param_df.loc[len(param_df)-1, 'N'] = tuning_log['N']
        param_df.loc[len(param_df)-1, 'Score'] = -1
        for param in tuning_log['FixedParameters'].keys():
            param_df.loc[len(param_df)-1, param] = tuning_log['FixedParameters'][param]
        for param in tuning_log['VarParameters'].keys():
            param_df.loc[len(param_df)-1, param] = tuning_log['VarParameters'][param]
        
        for param in tuning_log['VarParameters'].keys():
            param_df.loc[len(param_df)-1, param] *= random.uniform(1-tuning_log['VarRange'], 1+tuning_log['VarRange'])
        
        # round integer parameters
        for param in tuning_log['IntParams']:
            if param in param_df.columns:
                param_df.loc[len(param_df)-1, param] = int(round(param_df.loc[len(param_df)-1, param]))
        
    else:
        print(f"Simulation {sim_name} already initialized")
    return param_df

In [None]:
# SCORE SIMULATIONS
with open(automation_dir + 'tuning_log.json', 'r') as f:
    tuning_log = json.load(f)
param_df = pd.read_csv(automation_dir + 'parameters.csv')

# Read in the scores from the previous iterations
if iteration > 0:
    param_df = Score_Simulations()
    param_df.to_csv(automation_dir + 'parameters.csv', index=False) # save scores to parameter file

In [None]:
# NAMING CONVENTION:
# T{ptype}{iter}

# update best fit parameters
if len(param_df[param_df['Score'] > -1]) > tuning_log['BestN'] * len(tuning_log['TuningParticles']):
    params = tuning_log['VarParameters'] # copy of variable parameters
    for param in params.keys():
        params[param] = [] # initialize list for each parameter
    
    for ptype in tuning_log['TuningParticles']:
        ptype_df = param_df[param_df['Sim'].str.startswith(f"T{ptype}")]
        ptype_df = ptype_df[ptype_df['Score'] > -1]
        ptype_df = ptype_df.sort_values(by='Score', ascending=True)
        ptype_df = ptype_df.head(tuning_log['BestN'])
        
        for param in params.keys(): # average best fit parameters
            params[param].append(np.mean(ptype_df[param]))
    
    for param in params.keys(): # update tuning_log with best fit parameters
        tuning_log['VarParameters'][param] = np.mean(params[param])
    
    # save tuning_log to json file
    with open(automation_dir + 'tuning_log.json', 'w') as f:
        json.dump(tuning_log, f, indent=4)

In [None]:
# QUEUE NEW SIMULATIONS IF NEEDED
while len(param_df[param_df['Status'] == 0]) < num_sims and len(param_df) < tuning_log['MaxIterations']:
    time.sleep(2) # to avoid duplicate sim names
    for ptype in tuning_log['TuningParticles']:
        nowtime = int(time.time())
        sim_name = f"T{ptype}{nowtime}"
        param_df = initialize_sim(sim_name, ptype, tuning_log, param_df)
    # save parameter file
    param_df.to_csv(automation_dir + 'parameters.csv', index=False)