In [None]:
import json
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal as signal
from scipy.interpolate import interp1d
from scipy.signal import find_peaks, peak_widths,peak_prominences
import bisect
import time 
import pathlib

In [None]:
def low_pass_filter(wavelengths,intensities,fc=15):
    """
    Use a low pass filter to deal with the read_in data.
    Args:
        wavelengths: the input wavelength.
        intensities: the corresponding intensities of UV-Vis.
        fc: frequency of low pass filter.
    Returns:
        series: the intensities passing through the low-pass filter.
    """
    fs = wavelengths.shape[0]  # Sampling frequency
    w = fc / (fs / 2) # Normalize the frequency
    b, a = signal.butter(5, w, 'low')
    series = signal.filtfilt(b, a, intensities)
    return series

In [None]:
def obtain_prominence(x,a=1,b=70,c=0.4,d=100,e=1,f=0.1,threshold=0.05):
    """
    Definte a function which is more sensitive when x is small.
    Args:
        x: the peak prominence to be processed by this function.
        a,b,c,d,e,f: constant the tune the shape of this function.
        threshold: the threshold from where this function begin to behave like a linear function.
    Returns:
        The processed peak prominence after this function, which will be further used to calculate the scores.
    """
    if x<threshold:
        return (np.tanh((x-threshold)*d)+e)*f
    else:
        return (a*x+c*(1/(1+np.exp(-b*x))-0.5))/(a+c*(1/(1+np.exp(-b))-0.5))

In [None]:
def peak_binary(x,b=100,threshold=0.05):
    """
    Use a tanh function to binarize peaks accoridng to its promiencen.
    Args:
        x: the peak prominence to be processed by this function.
        b: a variable to tune the shape of this function.
        threshold: the threshold after which the function behave like a linear function.
    Returns:
        the processed peak prominence.
    """
    return (np.tanh((x-threshold)*b)+1)/2

In [None]:
def normalizedata(series_original):
    """
    Definte a function which normalize the input data into range (0,1).
        
    """
    return (series_original-np.min(series_original))/(np.max(series_original)-np.min(series_original))

In [None]:
def calcualte_smoothness(x):
    """
    Calculate the smootheness of the spectrum. 
    The smoothness is defined byy the absolute difference between the original spectrum and the spectrum after low-pass filter.
    Args:
        x: the UV-Vis spectrum
    Returns:
        The quantity measuring the smootheness of this spectrum.
    """
    return np.std(np.diff(x))/abs(np.mean(np.diff(x)))

In [None]:
def read_in_UV_Vis(base_sample,index,lower=400,upper=950,fc=15,color="red",plot_flag=True,normalize=False):
    #Read in json file
    with open(base_sample+f"00%02d/uv.json"%(index)) as json_data:
        d = json.load(json_data)
        #read wavelength and intensities
        wavelengths = np.array(d['wavelength'])
        series_original = np.array(d['absorbances'])
    #trim data
    series_original = series_original[(wavelengths>lower) & (wavelengths < upper)]
    wavelengths = wavelengths[(wavelengths>lower) & (wavelengths < upper)]
    #trim data in the range of lower to upper
    series_original=normalizedata(series_original[(wavelengths>lower) & (wavelengths < upper)])
    series=low_pass_filter(wavelengths,series_original,fc=fc)
    roughness=abs(series-series_original).mean()
    
    series=normalizedata(series)
    
    if plot_flag and normalize==True:
        plt.plot(wavelengths,series,c="black")
        plt.plot(wavelengths,series_original,c=color)
    
    UV_inter = interp1d(wavelengths, series, kind='cubic',fill_value='extrapolate' )
    return UV_inter,roughness

In [None]:
def plot_1D(region,boundary,roughness,series,peaks,prominences,wavelengths):    
    #plot part
    plt.figure()
        
    plt.plot(wavelengths,series)
    plt.plot(wavelengths[peaks], series[peaks], "x")
    plt.vlines(boundary, ymin=0, ymax=1,color="r",linestyles ="dashed")
    for region_temp in region:
        plt.vlines(region, ymin=0, ymax=1,color="b")

In [None]:
def obtain_scores(base_sample,index,boundary1,boundary2,near_width=50,lower=400,upper=950,num=101,False_width=10000,plot_flag=True):
    """
    Calculate the defined UV-Vis scores of a sample.
    Args:
        base_sample: the directory of the sample
        index: the index of the sample
        boundary1: the boundaries to diecretize the UV-Vis region for one peak system
        boundary2: the boundaries to discretize the UV-Vis spectrum for two peak system
        near_width: the width to define the nearby region
        lower: the lower wavelength boundary of UV-Vis
        upper: the upper wavelength boundary of UV-Vis
        num: the sampling number in wavelength [lower,upper]
        False_width: the width this function returns when there's no peak in the system
    Returns:
        
   """
    UV_sample,roughness=read_in_UV_Vis(base_sample,index,plot_flag=plot_flag)

    #define the wavelength and got the UV-Vis spectrum
    wavelengths=np.linspace(lower,upper,num)

    series=UV_sample(wavelengths)

    #find peaks in the data
    peaks, _ = find_peaks(series,prominence=0.02)
    if len(peaks) == 0:
        print("None peaks are found!")
        plt.close()
        return []
    #find prominence of individual peaks
    prominences,left_basis,right_basis = peak_prominences(series, peaks)
    results_half = peak_widths(series, peaks, rel_height=0.5)
    results_full = peak_widths(series, peaks, rel_height=1)
    
    # if the peak number are larger than 2
    if len(prominences) >=2 :
        #obtain the largest two peaks
        peak_index = prominences.argsort()[-2:][::-1]
        left_basis = left_basis[peak_index]
        right_basis = right_basis[peak_index]
        
        peak_position1 = wavelengths[peaks[peak_index[0]]]
        peak_position2 = wavelengths[peaks[peak_index[1]]]
        
        #judge the grid this sample belongs to 
        peak_class1 = bisect.bisect_left(boundary2, peak_position1)
        peak_class2 = bisect.bisect_left(boundary2, peak_position2)
        sample_class_index = [peak_class1,peak_class2]
        
        # get the prominence compared to left and right side
        prominence_left = np.array([series[peaks[peak_index[i]]] - series[left_basis[i]] for i in range(2)])
        prominence_right = np.array([series[peaks[peak_index[i]]] - series[right_basis[i]] for i in range(2)])
        
        #obtian the original domain
        region = [[peak_position1-near_width,peak_position1+near_width],
                  [peak_position2-near_width,peak_position2+near_width]]
                
        #select the domain for the highest peak
        region1 = [[peak_position1-near_width,peak_position1+near_width]]
        region2 = [[peak_position2-near_width,peak_position2+near_width]]
        
        # get wirdth
        width1 = results_half[0][peak_index[0]].item()*(wavelengths[1]-wavelengths[0])
        width2 = results_half[0][peak_index[1]].item()*(wavelengths[1]-wavelengths[0])
        
#         print(results_half[0][peak_index[0]].item()*(wavelengths[1]-wavelengths[0]))
#         print(results_half[0][peak_index[1]].item()*(wavelengths[1]-wavelengths[0]))
        
        # no amplification
        width1_amp = width1*1
        width2_amp = width2*1
        
        background1 = series[peaks[peak_index[0]]] - prominences[0]
        background2 = series[peaks[peak_index[1]]] - prominences[1]
        
        #calculate the absorption band for two absorption bands and record as scrore1 
        #calculate the absorption band for the absorption band defined by more prominent peak and record as score2
        absorption_band=0
        for i in range(len(region)):
            region_temp = region[i]
            absorption_band = absorption_band + series[(wavelengths > region_temp[0]) & (wavelengths < region_temp[1])].sum()
            
        score2 = series[(wavelengths > region1[0][0]) & (wavelengths < region1[0][1])].sum()  
        score2 = score2/series.sum()
        score4 = series[(wavelengths > region2[0][0]) & (wavelengths < region2[0][1])].sum()
        score4 = score4/series.sum()
        

    if len(prominences) >=2:
        if plot_flag:
            #plot the absorption domains
            plot_1D(region,boundary2,roughness,series,peaks,prominences,wavelengths)
            #plot the absorption domains
            contour_heights = series[peaks] - prominences
            if len(results_half[0])>0:
                plt.scatter(wavelengths[_['left_bases']],series[_['left_bases']],c='black')
                plt.scatter(wavelengths[_['right_bases']],series[_['right_bases']],c='black')
                plt.vlines(x=wavelengths[peaks], ymin=contour_heights, ymax=series[peaks])
                plt.hlines(*(results_half[1],wavelengths[np.around(results_half[2]).astype("int")],wavelengths[np.around(results_half[3]).astype("int")]), color="C2")
                plt.hlines(*(results_full[1],wavelengths[np.around(results_full[2]).astype("int")],wavelengths[np.around(results_full[3]).astype("int")]), color="C3")
            plt.show()
            plt.close()
            
            plt.figure()
            plt.plot(wavelengths,series,c="black")
            plt.fill_between(wavelengths,series, color='blue', 
                 alpha=0.5) 
            for region_temp in region:
                plt.fill_between(wavelengths[(wavelengths>region_temp[0]) & (wavelengths<region_temp[1])],series[(wavelengths>region_temp[0]) & (wavelengths<region_temp[1])], color="blue", 
                                 alpha=0.5) 
            plt.show()
            plt.close()
            
    #return the results
    if len(prominences) >=2 :
        return np.array([2,
                         [score2,score4],
                         [width1,width2],
                         [width1_amp,width2_amp],
                         [background1,background2],
                         sample_class_index,
                         roughness,
                         prominences.max()])
    else:
        return []

# Process UV-Vis data

In [None]:
for generation_num in range(10):
    base_sample=f"../data/custom/MAP_elite_generation_{generation_num}/"
    while True:
        if pathlib.Path(base_sample+'/flag2.txt').is_file():
            break
        time.sleep(2)
    data_output_total=[]
    pHs = []
    for i in range(24):
        plt.plot()
        print(generation_num,i)
        data_output=obtain_scores(base_sample,
                              i,
                              boundary1=np.concatenate((np.linspace(400,600,9),np.linspace(600,950,8)[1:])),
                              boundary2=np.concatenate((np.linspace(400,600,9),np.linspace(600,950,8)[1:])),
                              near_width=50,
                              lower=400,
                              upper=950,
                              num=1101,
                              False_width=10000)
        data_output_total.append(data_output)
        if len(data_output)>0:
            if data_output[0] == 2:
                print(-0.002*(data_output[3][0] + data_output[3][1])+abs(data_output[1][0] - data_output[1][1]))
                print(-0.002*(data_output[3][0] + data_output[3][1])-abs(data_output[1][0] - data_output[1][1]))
                print(data_output[-3])
                print(data_output)
        plt.show()
        plt.close()

        with open(base_sample+'%04d'%i+"/pH_operation.json") as json_file:
            data = json.load(json_file)
    #         print(data[f"{len(data)-1}"][-1])
        pHs.append(data[f"{len(data)-1}"][-2])
        with open(base_sample+'%04d'%i+"/params.json") as json_file:
            data = json.load(json_file)
            print(data)
    np.save(base_sample+"data_total",data_output_total)
    np.save(base_sample+"pHs",pHs)
    
    # After process the data, record the flag so that algorithim can generate new experiments
    with open(base_sample+'/flag3.txt', 'w') as f:
        json.dump(1, f,indent=4)

# Check pools

In [None]:
pool=[]
total_generations = 10
for generation_num in range(total_generations):
    base_directory="../data/custom/"
    path=base_directory+"MAP_elite_generation_%d"%generation_num+"/pool_absorption%d.npz"%generation_num
    pool_temp = np.load(path,allow_pickle=True)["arr_0"]
    pool.append(pool_temp[pool_temp[:,-1].argsort()])
plt.xlabel("batch number")
plt.ylabel("elite number")
plt.scatter(np.arange(len(pool)),np.array([len(pool_temp) for pool_temp in pool]),c = "black")
plt.plot(np.array([len(pool_temp) for pool_temp in pool]),c = "black")

fitness_total = []
for grid_index in range(len(pool[-1][:,-1])):
    fitness_temp = []
    for generation_num in range(total_generations):
        fitness_temp.append((pool[generation_num][pool[generation_num][:,-1] == pool[-1][:,-1][grid_index]])[:,-2])
    for index_temp in range(len(fitness_temp)):
        if len(fitness_temp[index_temp]) == 0:
            fitness_temp[index_temp] = None
    fitness_total.append(fitness_temp)
best_grids = np.unique(pool[-1],axis=0)

In [None]:
for grid_index in range(len(pool[-1][:,-1])):
    print(fitness_total[grid_index])
    if fitness_total[grid_index][-2] != None:
        print(fitness_total[grid_index][-1] - fitness_total[grid_index][-2])
    plt.scatter(np.arange(len(fitness_total[grid_index])),fitness_total[grid_index])
    plt.plot(np.arange(len(fitness_total[grid_index])),fitness_total[grid_index])
    
plt.xlabel("generation")
plt.ylabel("fitness")