# Analysis

In [1]:
import scipy
import numpy as np
from typing import Callable as function
class TextColor():
        black = '\033[30m'
        red = '\033[31m'
        green = '\033[32m'
        orange = '\033[33m'
        blue = '\033[34m'
        purple = '\033[35m'
        cyan = '\033[36m'
        lightgrey = '\033[37m'
        darkgrey = '\033[90m'
        lightred = '\033[91m'
        lightgreen = '\033[92m'
        yellow = '\033[93m'
        lightblue = '\033[94m'
        pink = '\033[95m'
        lightcyan = '\033[96m'
        reset = '\033[0m'
save_rep = "E:/DP_database/database"

## Ulity Functions

### Calc functions

In [2]:
#*
# Functions to get true value in case, in case that overflow was noted, in signed values
#*
def get_overflowed_val(val = 0, bnum = 8):
    border = 2**(bnum-1)
    if val >= border:
        val = val % (border*2)
        if val >= border:
            val = -border + (val - border)
    return val
#*
# Functions to get p_value, for current correlation vector
#*
def calc_p_val(corr: float, set_len: int, mode: int = 0) -> float:
  t_stat = corr*( ((set_len-2) / (1 - corr**2))**0.5 )
  if mode == 0:
    df = set_len - 2  # degrees of freedom
    p_value = 2 * stats.t.sf(abs(t_stat), df)  # Two-tailed p-value
    return p_value

In [3]:
#*
# Functions to Hamming weight, or otherwise number of 1 in binary representation vector of target number
#*
def hamming_weight(x, is_int = True):
    if not is_int:
        return np.count_nonzero(x == 1)
    return bin(x).count("1")
#bnum = 8
bnum = 16
secret_range = 2**(bnum)
hw = [hamming_weight(secret_value) for secret_value in range(secret_range)] 
#np.array([[1,2,3],[5,4,6]]).shape

### Visualization Functions

In [4]:
#*
# Visualize comparison between told true value graph, and all others
#*
def compTrueToAll_corrMatrix(corr_map, true_secret:int = 45, xlim:list[int]=None, ylim:list[int]=None, true_last=False, bnum = 8, saveName = None): 
    global save_rep
    secret_corr = corr_map[true_secret]
    corr_map = np.array(corr_map)
    mask = np.ones(corr_map.shape[0], dtype=bool)
    mask[true_secret] = False
    all_false_corr = corr_map[mask, :]
    indx_ColmnMax = all_false_corr.argmax(axis=0) #Return an vector of indexes of max values in each column of matrix
    allFalseVector = np.array([all_false_corr[indx_ColmnMax[i]][i] for i in range(len(indx_ColmnMax))]) #Return a vector of max values for each column in matrix

    image = plt.figure()
    # Comparison to real correlation
    if true_last:
        plt.plot(allFalseVector, color='grey', label='all_not_true', linewidth=0.5)
        plt.plot(secret_corr, color='red', label=f'secret_val_corr = {true_secret}', linewidth=0.5)
    else:
        plt.plot(secret_corr, color='red', label=f'secret_val_corr = {true_secret}', linewidth=0.5)
        plt.plot(allFalseVector, color='grey', label='all_not_true', linewidth=0.5)
    plt.legend(bbox_to_anchor=(0.75, 1.15), ncol=2)
    plt.title("False_All (Grey) - True (Red)")
    plt.xlabel("Time Sample")
    plt.ylabel("Correlation")
    if xlim is not None:
        plt.xlim(xlim)
    if ylim is not None:
        plt.ylim(ylim)
    plt.show()
    plt.close()

    if saveName is not None:
        image.savefig(f'{save_rep}/figures/{saveName}.png')

In [5]:
#*
# Plot either one or many graphs into one output figure, with choosen parameters
#*
def plot_mult(corr_list, _type = "norm", mult=True, xlim:list[int]=None, ylim:list[int]=None,
              graph_title="Multiple Correlation Traces", ylabel="Correlation", color=None, borders = None):
    global save_rep
    if not mult: #len(corr_list.shape) >  1:
        corr_list = [corr_list]
    image = plt.figure()
    plt.title(graph_title)
    plt.xlabel("Index")
    plt.ylabel(ylabel)
    if xlim is not None:
        plt.xlim(xlim)
    if ylim is not None:
        plt.ylim(ylim)
    for corr in corr_list:
        if color is None:
            plt.plot(corr)
        else:
            plt.plot(corr, color=color)
    if borders is not None:
        for i in borders:
            plt.axvline(x = i, color = 'orange')
    plt.show()
    plt.close()  # Close the figure to free memory
    os.makedirs('./figures', exist_ok=True)
    image.savefig(f'{save_rep}/figures/{_type}.png')

In [6]:
#*
# Dynamic SPA
#*
def createDiffWave(waves, name, showPlots=True):
    waves = np.array(waves)
    diff_waves = []
    square_waves = []
    avg_wave = np.mean(waves,axis=0)
    
    for i in waves:
        diff_waves.append(np.subtract(i, avg_wave))
    diff_waves = np.array(diff_waves)
    diff_avg = np.mean(diff_waves,axis=0)
    square_diff =  np.square(diff_waves)
    dim1 = len(square_diff)
    var = np.sum(square_diff, axis=0) / dim1
    if showPlots:
        plot_mult(np.array(waves[0]), _type = f"wave0_{name}", mult=False, graph_title="Waves[0]", ylabel="Power")
        plot_mult(np.array(avg_wave), _type = f"avg_wave_{name}", mult=False, graph_title="Avg wave", ylabel="Power", color="green")
        plot_mult(np.array(diff_avg), _type = f"difference_wave_{name}", mult=False, graph_title="Difference wave", ylabel="Power", color="purple")
        plot_mult(np.array(var), _type = f"variation_{name}", mult=False, graph_title="Variation wave", ylabel="Power", color="orange")
        plot_mult(np.array(var**(0.5)), _type = f"variation_{name}", mult=False, graph_title="Standard deviation wave", ylabel="Power", color="brown")
    return diff_avg, avg_wave, var, waves

In [7]:
def show_extraction_method_info(vals_locMaxVector: list[int], indx_ColmnMax: list[int], method_name: str = f'xth-level degree', color: str = "blue"):
    global save_rep
    fig = plt.figure()
    plt.plot(vals_locMaxVector, color=color)
    plt.title(f'{method_name} Local maxims graph')
    plt.xlabel("Time Sample")
    plt.ylabel("Secret Value")
    plt.show()
    fig.savefig(f'{save_rep}/figures/get_weights/methods/{method_name}.png')
    plt.close()

    
    indx_localMax = []
    for i in range(len(vals_locMaxVector)):
        val_toAppend = 0
        if vals_locMaxVector[i] > 0:
            val_toAppend = indx_ColmnMax[i]
        indx_localMax.append(val_toAppend)
    local_maxims = [int(indx_ColmnMax[i]) for i in range(len(indx_ColmnMax)) if vals_locMaxVector[i] > 0]
    uniques_set = [ x for i, x in enumerate(local_maxims) if x not in local_maxims[:i]]
    print(f"Number of uniques: {len(uniques_set)}")
    print(f"Number of local maxims: {len(local_maxims)}")
    print(f"Uniquess: {TextColor.pink}{uniques_set}{TextColor.reset}")
    print(f"Local maxims: {TextColor.orange}{local_maxims}{TextColor.reset}")
    fig = plt.figure()
    plt.plot(indx_localMax, color=color)
    plt.title(f"{method_name} indexes graph")
    plt.xlabel("Time Sample")
    plt.ylabel("Index")
    plt.show()
    plt.close()

### Intermediate Values of multiplication, functions

In [8]:
#*
# Return vector of resulting intermediate values
#*
def Abs_8bit_intermediateVal(secret_value: int, known_input: list[int]): # Abs_8-bit
    H = np.uint32(known_input *  secret_value) % 256
    return np.array(H)
def Abs_32bit_intermediateVal(secret_value: int, known_input: list[int]):
    H = np.uint32(known_input *  secret_value)
    return np.array(H)
def HW_8bit_intermediateVal(secret_value: int, known_input: list[int]):
    global hw
    H = [hw[np.uint32(known_input[i] *  secret_value) % 256] for i in range(len(known_input))]
    return np.array(H)
def HW_32bit_intermediateVal(secret_value: int, known_input: list[int]):
    global hw
    H = [hamming_weight(np.uint32(known_input[i] *  secret_value)) for i in range(len(known_input))]
    return np.array(H)

### CPA calc functions

In [9]:
#*
# Agregate Functions: For calculation of correlation above, waves dataset, with hypotetical created traces based of known_input
#*
def general_CPA(known_input : list[int], hyp_leakage_cacl: function, waves = np.array([]),
                ith_weight: int = 0, calc_p_value: bool = False, bnum:int=8): # Vanilla ANN CPA, for one time_sample a
    # Verification of parameters:
    n_traces = len(known_input)
    trace_len = len(waves[0])

    # Calculating statistics for traces in waves set
    qsum_L_list = []
    root_qsum_L_list = []
    l_diff_list = []
    for time_sample in range(trace_len):
      # Calculation preparations: target sets extraction
      L = waves[:,time_sample]
      L_mean = np.mean(L)
      # Calculation
      l_diff = L - L_mean  # Vectorized
      qsum_L = np.sum(np.square(l_diff))  # Sum of squared differences
      # Tidy up
      qsum_L_list.append(qsum_L)
      root_qsum_L_list.append(qsum_L ** 0.5)
      l_diff_list.append(l_diff)


    # Preparational calculations, for case of quantization
    corr_all_Tsamples = []
    p_val_all_Tsamples = []
    secret_range = 2**(bnum)
    for secret_value in trange(secret_range, desc='Calculating Correlations for the Secret-Key: '): # For current WeightHypothesis do
        #H = [calc_intermediate_val3(secret_value, known_input[j]) for j in range(n_traces)]
        H = hyp_leakage_cacl(secret_value=secret_value, known_input=known_input)
        H_mean = np.mean(H)
        h_diff = H - H_mean
        qsum_H = np.sum(np.square(h_diff))
        root_q_sum_H = (qsum_H ** 0.5)

        # For current WeightHypothesis, create an Correlation, this vector needs to be created for each time sample:
        corr_Tsamples = []
        for time_sample in range(trace_len):
            # Calculation preparations: target sets extraction
            l_diff = l_diff_list[time_sample]
            sum_HL = np.sum(h_diff * l_diff)  # Dot product
            # Calculation
            divider = root_q_sum_H * root_qsum_L_list[time_sample]
            corr = sum_HL / divider if divider != 0 else 0
            # Tidy up
            corr_Tsamples.append(corr)
        corr_all_Tsamples.append(corr_Tsamples)
        
        p_val_Tsamples = []
        if calc_p_value:
            lenght = n_traces-2
            for corr in corr_Tsamples:
                t_stat = corr*( (lenght / (1 - corr**2))**0.5 )
                df = n_traces - 2  # degrees of freedom
                p_value = 2 * stats.t.sf(abs(t_stat), df)  # Two-tailed p-value
                # Tidy up
                p_val_Tsamples.append(p_value)
            p_val_all_Tsamples.append(corr_Tsamples)
    return np.array(corr_all_Tsamples), np.array(p_val_all_Tsamples)

In [10]:
def Abs_8bit_ANN_CPA(known_input : list[int], waves = np.array([]), n_traces: int = None, trace_len: int = None, ith_weight: int = 0, calc_p_value: bool = False): # Vanilla ANN CPA, for one time_sample a
    corr_all_Tsamples, p_val_all_Tsamples = general_CPA(known_input=known_input, hyp_leakage_cacl=Abs_8bit_intermediateVal,
                                    waves=waves, ith_weight=ith_weight, calc_p_value=calc_p_value)
    return corr_all_Tsamples, p_val_all_Tsamples

def Abs_32bit_ANN_CPA(known_input : list[int], waves = np.array([]), n_traces: int = None, trace_len: int = None, ith_weight: int = 0, calc_p_value: bool = False): # Vanilla ANN CPA, for one time_sample a
    corr_all_Tsamples, p_val_all_Tsamples = general_CPA(known_input=known_input, hyp_leakage_cacl=Abs_32bit_intermediateVal,
                                    waves=waves, ith_weight=ith_weight, calc_p_value=calc_p_value)
    return corr_all_Tsamples, p_val_all_Tsamples

def HW_8bit_ANN_CPA(known_input : list[int], waves = np.array([]), n_traces: int = None, trace_len: int = None, ith_weight: int = 0, calc_p_value: bool = False): # Vanilla ANN CPA, for one time_sample a
    corr_all_Tsamples, p_val_all_Tsamples = general_CPA(known_input=known_input, hyp_leakage_cacl=HW_8bit_intermediateVal,
                                    waves=waves, ith_weight=ith_weight, calc_p_value=calc_p_value)
    return corr_all_Tsamples, p_val_all_Tsamples

def HW_32bit_ANN_CPA(known_input : list[int], waves = np.array([]), n_traces: int = None, trace_len: int = None, ith_weight: int = 0, calc_p_value: bool = False): # Vanilla ANN CPA, for one time_sample a
    corr_all_Tsamples, p_val_all_Tsamples = general_CPA(known_input=known_input, hyp_leakage_cacl=HW_32bit_intermediateVal,
                                    waves=waves, ith_weight=ith_weight, calc_p_value=calc_p_value)    
    return corr_all_Tsamples, p_val_all_Tsamples

In [11]:
#*
# Agregate Functions: using opensource libraries, such as scipy, or np for calculating correlation,
# as oposed to above functions using specific functions for correlation calculations
#*
def corr_2TraceMatrixes(traces: np.ndarray, inputs: list[int],
                        trace_len: int = 24000, n_traces: int = 1000,
                        xth_secret_val: int = 0, secret_range: int = 256,
                        corr_func: function = np.corrcoef): # function can be also sc.pearsonr
  #hw = [hamming_weight(secret_value) for secret_value in range(secret_range)]
  leak_model = np.array([[calc_intermediate_val(secret_val, inputs[j]) for j in range(n_traces)] for secret_val in range(secret_range)])
  corr_all = []
  for i in trange(secret_range, desc='Calculating Correlations for secret key'):
    corr_curr = []
    to_corr_array = leak_model[i, :]
    for j in range(trace_len):
      corr_curr.append( corr_func(traces[:,j], to_corr_array))
    corr_all.append(corr_curr)
  return np.array(corr_all)
from typing import Callable as function
import scipy.stats as sc
def sc_pearson_2TraceMatrixes(traces: np.ndarray, inputs: list[int],
                        trace_len: int = 24000, n_traces: int = 1000,
                        xth_secret_val: int = 0, secret_range: int = 256): # function can be also sc.pearsonr
  #hw = [hamming_weight(secret_value) for secret_value in range(secret_range)]
  leak_model = np.array([[calc_intermediate_val(secret_val, inputs[j]) for j in range(n_traces)] for secret_val in range(secret_range)])
  corr_all = []
  for i in trange(secret_range, desc='Calculating Correlations for secret key'):
    corr_curr = []
    #to_corr_array = np.array([leak_model[i,:] for n in range(trace_len)]).transpose()
    #corr_all.append(scipy.stats.pearsonr(traces[:,0:trace_len], to_corr_array, axis=0))
    for j in range(trace_len):
      corr_curr.append(sc.pearsonr(traces[:,j], leak_model[i,:]))
    #corr_all.append(sc.pearsonr(traces[:,0:trace_len], to_corr_array, axis=0))
    corr_all.append(corr_curr)
  return np.array(corr_all)

### Extract weights from correlation map

In [12]:
#*
# Agregate Functions: for extracting relevant peaks of correlation matrix, with several fitting parameters
#*
def get_weight(correlation_matrix: np.array, min_treshold = 0.0, peak_range = 0.3, show_info=False, lastIndex = 0, map_2dgrMaxs = None, mountain_half_dist = 20, level = 5):
    #FIND index of highest value for each column (correlation of time sample)
    indx_ColmnMax = correlation_matrix.argmax(axis=0) #Return an vector of indexes of max values in each column of matrix
    
    #print(f"Input Vector: {correlation_matrix[1]}")

    #CREATE an array from previous finds
    colmn_MaxVector = np.array([correlation_matrix[indx_ColmnMax[i]][i] for i in range(len(indx_ColmnMax))]) #Return a vector of max values for each column in matrix
    for i in range(len(colmn_MaxVector)):
        if colmn_MaxVector[i] < min_treshold:
            colmn_MaxVector[i] = 0
    #FIND GLOBAL MAX
    indx_globMax = np.argmax(colmn_MaxVector)
    val_globMax = colmn_MaxVector[indx_globMax]
    vec_len = len(colmn_MaxVector)
    #print(f"Max Vector: {colmn_MaxVector}")


    if map_2dgrMaxs is None:
        print(f"Peak range: {peak_range}")
        print(f"{len([i for i in colmn_MaxVector if i != 0])}")
        vals_locMaxVector0 = []
        tmp_indices = []
        for i in range(len(colmn_MaxVector)):
            tmp_indices.append(i)
            vals_locMaxVector0.append(0)
        for i in range(level):
            tmp_array = []
            for indx in tmp_indices:
                tmp_array.append(colmn_MaxVector[indx])
            indices, props = scipy.signal.find_peaks(tmp_array)
            for indx in range(len(indices)):
                indices[indx] = tmp_indices[indices[indx]]
            tmp_indices = indices
        for indx in range(len(colmn_MaxVector)):
            if val_globMax-peak_range <= colmn_MaxVector[indx]:
                vals_locMaxVector0[indx] = colmn_MaxVector[indx]
        
        vals_locMaxVector1 = []
        #FIND 1. local maxims and zero-out all values in tresshold range to global_correlation
        for indx in range(vec_len): #shoud create vector with values which are local-maxims, other values are zeroed out
            value = colmn_MaxVector[indx]
            val_toAppend = 0
            if (value + peak_range) >= val_globMax and ((indx+1) < vec_len) and (value > colmn_MaxVector[indx+1]):
                val_toAppend = value
            vals_locMaxVector1.append(val_toAppend)
        secret_corr = 0 #FINAL correlation
        secret_colmn = 0 #Column/Time sample of Final correlation
    
        #FIND 2. local maxims
        vals_locMaxVector2 = []
        last_val = vals_locMaxVector1[0]
        last_indx = 0
        zero_counter = 0
        for indx in range(vec_len): #shoud create vector with values which are local-maxims, other values are zeroed out
            value = vals_locMaxVector1[indx]
            vals_locMaxVector2.append(0)
            if value != 0:#Cross the desert, and on start of the next mountain, reminiscence about last mountain and save it to the memoar 
                if zero_counter > mountain_half_dist:
                    vals_locMaxVector2[last_indx] = last_val
                    #Reset
                    last_val = 0
                    last_indx = indx
                else:
                    if last_val < value:
                        last_val = value
                        last_indx = indx
                zero_counter = 0
            else:
                zero_counter += 1
        vals_locMaxVector2[last_indx] = last_val



        #FIND 3. local maxims
        vals_locMaxVector3 = []
        last_val = vals_locMaxVector1[0]
        last_indx = 0
        zero_counter = 0
        for indx in range(vec_len): #shoud create vector with values which are local-maxims, other values are zeroed out
            value = vals_locMaxVector1[indx]
            vals_locMaxVector3.append(0)
            if value > (last_val-(peak_range/2)):#Cross the desert, and on start of the next mountain, reminiscence about last mountain and save it to the memoar 
                if zero_counter > mountain_half_dist:
                    vals_locMaxVector3[last_indx] = last_val
                    #Reset
                    last_val = 0
                    last_indx = indx
                else:
                    if  last_val > value:
                        vals_locMaxVector3[last_indx] = last_val
                    last_val = value
                    last_indx = indx
                zero_counter = 0
            else:
                zero_counter += 1
        vals_locMaxVector3[last_indx] = last_val


        if show_info:    
            print(f" Index of global max: {indx_globMax} and global_max_val: {val_globMax}")
            show_extraction_method_info(vals_locMaxVector1, indx_ColmnMax, method_name = f'1th degree', color = "red")
            show_extraction_method_info(vals_locMaxVector0, indx_ColmnMax, method_name = f'{level}-level', color = "green")
            show_extraction_method_info(vals_locMaxVector2, indx_ColmnMax, method_name = f'2th-var degree', color = "blue")
            show_extraction_method_info(vals_locMaxVector3, indx_ColmnMax, method_name = f'3th-var degree', color = "purple")
    else:
        vals_locMaxVector2 = map_2dgrMaxs
    
    secret_corr = 0 #FINAL correlation
    secret_colmn = 0 #Column/Time sample of Final correlation
    nextIndex = lastIndex
    found = False
    print(f"In range {lastIndex} - {len(indx_ColmnMax)}")
    for indx in range(lastIndex, len(indx_ColmnMax)): #Find first local maxim in the peak_range from global maxim
        value = vals_locMaxVector2[indx]
        if value != 0:
            if not found:
                secret_corr = value
                secret_colmn = indx
                found = True
                continue
            nextIndex =  indx - round((indx - secret_colmn) / 2)
            break
    secret_val = indx_ColmnMax[secret_colmn]

    
    print(f" Found the soonest secret value is {secret_val}, at the time sample {secret_colmn}, with correlation {secret_corr = :.3f}")
    return secret_val, secret_colmn, secret_corr, vals_locMaxVector2, colmn_MaxVector, nextIndex, vals_locMaxVector2
#results_directory = {}
#secret_value, time_sample, correlation, localMax_vector = get_weight(np.array(ncorr_all), show_info=True)

## Finish

In [13]:
print("✔️ The Analyser succesfuly runned.")

✔️ The Analyser succesfuly runned.
