## <center> Full Project </center>

### <center> Polytechnic University of Leiria </center>

#### <center> Patrícia Isabel Santos Martinho </center>

Libraries

In [None]:

#from benfordslaw import benfordslaw # type: ignore
import numpy as np # type: ignore
import matplotlib.pyplot as plt # type: ignore
from scipy.stats import chisquare
from sklearn.metrics import confusion_matrix, roc_curve, auc
from scipy.stats import chi2, entropy
import pandas as pd

Variable Initialization

In [None]:
m=1000 # features
n=2000 # instances
tB=0.3 # proportion of anomalous rows
tm=0.1 # anomalies in an anomalous row
intensity_uniforme=1 # intensity (uniform noise)
intensity_gauss=10 # intensity (gaussian noise)
prop_outliers=0.05
outliers_band=(100000, 500000) # band of outliers
np.random.seed(1666)
alpha=0.05




In [None]:
qty_tB=int(n*tB) # absolute amount of anomalous rows
print(qty_tB)
qt_BL=n-qty_tB # absolute amount of BL conform rows
print(qt_BL)
A_count=int(m*tm) # absolute amount of anomalies in an anomalous row 
BL_count=int(m*(1-tm)) # absolute amount of BL conform numbers in an anomalous row 

Functions

In [None]:
# Funcion to generator BL conform numbers
def ben_numbers_generation (n):
    uniforme=np.random.uniform(low=0, high=1, size=n)
    bfl_numb=10**uniforme
    return bfl_numb

In [None]:
# Function to get the first digit of a number
def get_first_digit(number):
    num = abs(number)  # Work only with positive values
    if num == 0:
        return 0
    while num < 1:  # If it is a small decimal number, multiply until it has a digit in the whole part
        num *= 10
    while num >= 10:  # If it is a large number, divide until on only one digit
        num //= 10
    return int(num)  # Returns the first digit as an integer


In [None]:
# Function to save the first digits
def first_digit_array (numbers):
    firts_digits=[]
    for number in numbers:
        firts_digits.append(get_first_digit(number))
    firts_digits = [x for x in firts_digits if x != 0]
    return firts_digits

In [None]:
# Function to calculate the frequencies of the first digits
def first_digit_frequency(numbers):
    frequencies = np.zeros(9)  # To store the frequencies of digits 1 through 9
    for number in numbers:
        first_digit = get_first_digit(number)
        frequencies[first_digit - 1] += 1  # Increases the digit count
    return frequencies


In [None]:
# Function to calculate the distribution expected by Benford’s Law
def BL_distribution(n):
    distribution = np.log10(1 + 1 / np.arange(1, 10))  # Benford’s law for digits 1 to 9
    distribution[-1] = 1 - sum(distribution[:-1])  # Adjusts the last probability
    return distribution*n

In [None]:
# Calculate accumulated frequencies from absolute frequencies

def Fr (fr):
    Frs=[]
    Frs.append(fr[0])
    for i in range(1,len(fr)):
        Frs.append(fr[i]+Frs[i-1])
    return Frs


In [None]:
#  Gaussian noise

def sum_gaussian_noise(d, intensity):
    """Adds Gaussian noise to the data"""
    noise = np.random.normal(0, intensity, len(d))
    return d + noise

In [None]:
# Uniform noise

def sum_uniforme_noise(d, intensity):
    """Adds uniform noise to the data"""
    noise = np.random.uniform(-intensity, intensity, len(d))
    return d + noise

In [None]:
# Mixed noise (gaussin noise + uniform noise)

def sum_mix_noise(d, intensity_gauss, intensity_uniforme):
    """Adds a mixture of Gaussian and uniform noise to the data"""
    noise_gauss = np.random.normal(0, intensity_gauss, len(d))
    noise_uniforme = np.random.uniform(-intensity_uniforme, intensity_uniforme, len(d))
    return d + noise_gauss + noise_uniforme

In [None]:
# Outliers

def sum_outliers(d, proportion, outliers_band):
    """Adds moderate or extreme outliers to the data set"""
    n_outliers = int(len(d) * proportion)
    indices = np.random.choice(range(len(d)), n_outliers, replace=False) # replace=falss means without repeating
    for i in indices:
        d[i] = np.random.uniform(*outliers_band)
    return d

In [None]:
# Absolute mean deviation

def calculate_mad(observ, expected):
    return np.mean(np.abs(observ - expected))

In [None]:
# Kolmogorov-smirnov

def calculate_ks (observ, expected):

    # Calculate accumulated frequencies
    obs_acum=np.array(Fr(observ))
    esp_acum=np.array(Fr(expected))
    
    # Calculate distance
    return np.max(np.abs(obs_acum - esp_acum))

In [None]:
# Euclidean distance

def calculate_euclidian (observ, expected):
    return np.sqrt(np.sum((observ - expected) ** 2))

In [None]:
# Hellinger distance

def calculate_hellinger(observ, expected):
    return np.sqrt(0.5 * np.sum((np.sqrt(observ) - np.sqrt(expected))**2))

In [None]:
# Kullback-Leiber divergence

def calculate_kl(observ, expected):
    # Calculate KL Divergence
    kl_value = entropy(observ, expected)  # scipy.stats.entropy calculates KL when we pass two distributions
    
    return kl_value

In [None]:
# Hypothesis tests

def t_hip (test, observations, number_simulations):
        # Expected frequencies for Benford’s Law
    expected = BL_distribution(number_simulations)
    expected_norm=expected/sum(expected)

    # Generate samples that follow Benford’s Law
    simulated_values = []

    for _ in range(number_simulations):
        simulated = np.random.choice(np.arange(1, 10), p=np.array(expected_norm), size=int(sum(observations)))
        simulated_freq = [np.sum(simulated == d) for d in range(1, 10)]
        match test:
            case "mad":
                simulated_mad = calculate_mad(simulated_freq, expected)
                simulated_values.append(simulated_mad)
            case "ks":
                simulated_ks= calculate_ks(simulated_freq,expected)
                simulated_values.append(simulated_ks)
            case "euc":
                simulated_eucl= calculate_euclidian(simulated_freq,expected)
                simulated_values.append(simulated_eucl)
            case "hel":
                simulated_hell= calculate_hellinger(simulated_freq,expected)
                simulated_values.append(simulated_hell)
            case "kl":
                simulated_kl= calculate_kl(simulated_freq,expected)
                simulated_values.append(simulated_kl)

        

     # calculate observed value:
    match test:
        case "mad":
            obs_value = calculate_mad(observations, expected)
        case "ks":
            obs_value= calculate_ks(observations,expected)
        case "euc":
            obs_value = calculate_euclidian(observations, expected)
        case "hel":
            obs_value = calculate_hellinger(observations, expected)
        case "kl":
            obs_value = calculate_kl(observations, expected)
        

    # Calculate p-value
    p_value_test = np.mean(np.array(simulated_values) >= obs_value) # proportion of values higher than expected.

    return(obs_value,p_value_test)

In [None]:

def Fisher(p_values):

    fisher_stat = -2 * sum(np.log(p_values))
    if fisher_stat == float('-inf'):
        print(p_values)
    combined_p_value = 1 - chi2.cdf(fisher_stat, 2 * len(p_values)) # chi2.cdf -> cumulative chi2 distribution
    return combined_p_value

In [None]:
# I did this function to solve situations of incomplete confusion matrices and avoid errors

def conf_matrix(actual_class, predicted):
    # Generate the confusion matrix
    labels = [0, 1]  # Expected classes
    conf_matrix = confusion_matrix(actual_class, predicted, labels=labels)

    tn, fp, fn, tp = 0, 0, 0, 0

    # Direct extraction of values if both classes exist
    if conf_matrix.shape == (2, 2):
        tn, fp, fn, tp = conf_matrix.ravel()
    else:  # If there is only one class in the data
        if 0 in actual_class:
            tn = conf_matrix[0, 0] if conf_matrix.shape[0] > 0 else 0
            fp = conf_matrix[0, 1] if conf_matrix.shape[1] > 1 else 0
        if 1 in actual_class:
            fn = conf_matrix[1, 0] if conf_matrix.shape[0] > 1 else 0
            tp = conf_matrix[1, 1] if conf_matrix.shape[1] > 1 else 0
    
    return tn, fp, fn, tp, conf_matrix

In [None]:
def generate_roc_curve(actual_class, p_values, alpha_values):
    fprs = []
    tprs = []

    p_values = p_values.flatten()  # Ensure that p_values is a one-dimensional vector

    # Calculate overall ROC and AUC before loop (to avoid unnecessary calculations)
    fpr_full, tpr_full, _ = roc_curve(actual_class, p_values)
    auc_value = auc(fpr_full, tpr_full)
    print(f"AUC total: {auc_value}")

    for alpha in alpha_values:  
        predicted = (p_values < alpha).astype(int) # positive values - with anomalies

        # Calculate confusion matrix
        tn, fp, fn, tp = confusion_matrix(actual_class, predicted).ravel()

        # Calculate the rate of false positives and true positives
        fpr = fp / (fp + tn) if (fp + tn) > 0 else 0  
        tpr = tp / (tp + fn) if (tp + fn) > 0 else 0  

        fprs.append(fpr)
        tprs.append(tpr)

    # Generate the ROC curve
    plt.figure(figsize=(6, 6))
    plt.plot(fprs, tprs, marker="o", label="ROC curve")
    plt.plot([0, 1], [0, 1], linestyle="--", color="gray")  # Reference line (random model)
    plt.xlabel("1-Specificity (FPR)")
    plt.ylabel("Sensibility (TPR)")
    plt.legend()
    plt.grid()
    plt.show()

    return np.array(fprs), np.array(tprs), np.array(alpha_values)




In [None]:
# Draw chart of the proportions of the digits

def digit_chart (d,nl):
    # Creation of the figure and axes
    fig, ax = plt.subplots(figsize=(12, 6))

    # Data for the Benford’s Law chart
    prop_ds = np.array(d[nl,:-1])
         
    # Calculate the frequency of the first digits
    xv = range(1, 10)
    yv = BL_distribution(m)
    yv = yv/m

    # Plot the proportions according to Benford’s Law as a line
    ax.plot(xv, yv, marker='o', label="Benford's Law Distribution", linestyle='-', color='blue', alpha=0.6)

    # Notes for the values of Benford’s Law
    for i, value in enumerate(yv):
        ax.annotate(f'{value:.3f}',  # Round the annotation to 3 decimal places
                    xy=(xv[i], value),
                    xytext=(0, 5),  # Offset at the annotation position
                    textcoords='offset points',
                    ha='center',
                    va='bottom')

    # Data for the dataset chart
 
    x = list(range(1, 10)) 
    y = first_digit_frequency(prop_ds)
    y = y/len(prop_ds)

    # Plot the proportions of the dataset as a line
    ax.plot(x, y, marker='o', label=f"Digits frequency in line nr. {nl}", linestyle='-', color='orange', alpha=0.6)

    # Annotations for the dataset values
    for i, value in enumerate(y):
        ax.annotate(f'{value}',
                    xy=(x[i], value),
                    xytext=(0, 3),
                    textcoords='offset points',
                    ha='center',
                    va='bottom')

    # Axis and subtitle settings
    ax.set_xlabel("First Digit", fontsize=14)
    ax.set_ylabel("Frequency", fontsize=14)
    ax.legend(fontsize=14)
    ax.set_xticks(list(range(1, 10)))  # Defining x axis ticks
    ax.set_xticklabels(list(range(1, 10)))  # Labels for the ticks

    # display the graph
    plt.show()

Generate data

In [None]:
uniforme=np.random.uniform(low=0, high=1, size=m)
bfl_numb=10**uniforme 
result=np.append(bfl_numb,0) # follows the law of Benford, label 0
print(len(result))
print("----------------------")

i=1
i2=qt_BL+1
while i<qt_BL:
    bfl_numb=ben_numbers_generation (m)
    bfl_numb=np.append(bfl_numb,0) # 0 --> no anomalies
    bfl_numb=abs(bfl_numb)
    result = np.vstack([result, bfl_numb])
    i=i+1

while i2 in range(qt_BL+1,n+1):
    ben_aux=ben_numbers_generation (A_count)
    #anom = sum_gaussian_noise(ben_aux, intensity_gauss)
    #anom = sum_uniforme_noise(ben_aux, intensity_uniforme)
    #anom = sum_mix_noise(ben_aux, intensity_gauss, intensity_uniforme)
    anom = sum_outliers(ben_aux, prop_outliers, outliers_band)
    anom=np.random.uniform(low=0, high=100000, size=A_count)
    anom=abs(anom)
    ben=np.random.uniform(low=0, high=1, size=BL_count)
    bfl_numb=10**ben
    bfl_numb=abs(bfl_numb)
    bfl_numb=np.append(bfl_numb,anom)
    bfl_numb=np.append(bfl_numb,1) # 1 --> with anomalies
    result = np.vstack([result, bfl_numb])
    i2=i2+1
df_result=pd.DataFrame(result)
df_final = df_result.sample(frac=1, random_state=42).reset_index(drop=True)
numpy_df=array = df_final.to_numpy()




In [None]:
numpy_df

In [None]:
actual_class = numpy_df[:, -1]
actual_class=actual_class.astype(int)
print(actual_class)

In [None]:
counts = np.bincount(actual_class)
print("Negatives", counts[0])
print("Positives", counts[1])

In [None]:
digit_chart (numpy_df,20)

In [None]:
np.savetxt('Dataset.txt', numpy_df, delimiter=',')

Verify compliance with the Benford's Law

In [None]:
expected_distribution = BL_distribution(m)

predicted_chi2=[]
p_values_chi2=[]

predicted_f = []
predicted_ks_1samp=[]
predicted_ks=[]
predicted_MAD=[]
predicted_hel=[]
predicted_euc=[]
predicted_kl=[]
p_values_ks_1samp=[]
p_values_ks=[]
p_values_MAD=[]
p_values_euc=[]
p_values_hel=[]
p_values_kl=[]


for n_row in range(n):
    dfbl = np.array(numpy_df[n_row,:-1])
    p_values=[]

    first_digits = first_digit_array (dfbl)
        
    first_digit_frequencies = first_digit_frequency(dfbl)
    
    # Apply the chi-square test
    chi2_stat, p_value_chi2 = chisquare(first_digit_frequencies, expected_distribution)
    p_value_chi2=1e-15 if p_value_chi2 <(1e-15) else p_value_chi2
    predicted_label_chi2 = 1 if p_value_chi2 < alpha else 0  
    p_values.append(p_value_chi2)
    p_values_chi2.append(p_value_chi2)
    predicted_chi2.append(predicted_label_chi2)
  
    # Apply the absolute mean deviation
    MAD_stat, p_value_MAD=t_hip("mad",first_digit_frequencies,m)
    p_value_MAD=1e-15 if p_value_MAD <(1e-15) else p_value_MAD
    predicted_label_MAD = 1 if p_value_MAD < alpha else 0
    p_values.append(p_value_MAD)
    p_values_MAD.append(p_value_MAD)
    predicted_MAD.append(predicted_label_MAD)
    

    # Apply to the distance from Kolmogorov-smirnov 
    ks_stat, p_value_ks=t_hip("ks",first_digit_frequencies,m)
    p_value_ks=1e-15 if p_value_ks <(1e-15) else p_value_ks
    predicted_label_ks = 1 if p_value_ks < alpha else 0
    p_values.append(p_value_ks)
    p_values_ks.append(p_value_ks)
    predicted_ks.append(predicted_label_ks)

    # Apply the euclidean distance
    euc_stat, p_value_euc=t_hip("euc",first_digit_frequencies,m)
    p_value_euc=1e-15 if p_value_euc <(1e-15) else p_value_euc
    predicted_label_euc = 1 if p_value_euc < alpha else 0
    p_values.append(p_value_euc)
    p_values_euc.append(p_value_euc)
    predicted_euc.append(predicted_label_euc)

     # Apply the distance of Hellinger
    hel_stat, p_value_hel=t_hip("hel",first_digit_frequencies,m)
    p_value_hel=1e-15 if p_value_hel <(1e-15) else p_value_hel
    predicted_label_hel = 1 if p_value_hel < alpha else 0
    p_values.append(p_value_hel)
    p_values_hel.append(p_value_hel)
    predicted_hel.append(predicted_label_hel)

    # Apply the divergence of Kulback-Leibler
    kl_stat, p_value_kl=t_hip("kl",first_digit_frequencies,m)
    p_value_kl=1e-15 if p_value_kl <(1e-15) else p_value_kl
    predicted_label_kl = 1 if p_value_kl < alpha else 0
    p_values.append(p_value_kl)
    p_values_kl.append(p_value_kl)
    predicted_kl.append(predicted_label_kl)


    # Fisher’s combination
    p_value_fisher = Fisher(p_values) 
    predicted_label=1 if p_value_fisher<alpha else 0
    predicted_f.append(predicted_label)
    


In [None]:
# List of metrics
metricas = ["Chi-square", "mean absolute deviation","Kolmogorov-Smirnov", "Euclidean", "Hellinger", "Kullback-Leibler", "Fisher"] 
tns, fps, fns, tps = [], [], [], []


predicteds = [predicted_chi2, predicted_MAD, predicted_ks, predicted_euc,predicted_hel, predicted_kl, predicted_f]

for forecast in predicteds:
    tn, fp, fn, tp, mc = conf_matrix(actual_class, forecast) 
    tns.append(tn)
    fps.append(fp)
    fns.append(fn)
    tps.append(tp)

# Create DataFrame Pandas
df = pd.DataFrame({
    "Metric": metricas,
    "TN": tns,
    "FP": fps,
    "FN": fns,
    "TP": tps
})

# Add evaluation metrics
df["Precision"] = df["TP"] / (df["TP"] + df["FP"])
df["Recall"] = df["TP"] / (df["TP"] + df["FN"])
df["F1-score"] = 2 * (df["Precision"] * df["Recall"]) / (df["Precision"] + df["Recall"])

# Display the table
print("Dataset:")
print(f"Number of features:{m}")
print(f"Number of instances:{n}")
print("  ")
print("Actual class:")
print(f"Positives --> with anomalies: {counts[1]}")
print(f"Negatives --> no anomalies: {counts[0]}")
print("  ")
print(f"Proportion of anomalies in an anomalous row: {tm}")
print("-----------")
print(f"|Alpha={alpha}|")
print("-----------")
print(df)

# Save results in Excel format
df.to_excel(f"Performance.xlsx", index=False)

In [None]:

p_values = p_values_chi2
alpha_values = np.arange(0, 1, 0.001) 
p_values = np.array([p_values])

# statistics
print("Statistics of p_values:")
print("Average:", np.mean(p_values))
print("Standard Deviation:", np.std(p_values))
print("Min:", np.min(p_values))
print("Max:", np.max(p_values))

fprs, tprs, alpha_values = generate_roc_curve(actual_class, p_values, alpha_values)

In [None]:
fprs = np.array(fprs)
tprs = np.array(tprs)
alpha_values = np.array(alpha_values)

# Criterion of Youden
youden_index = np.argmax(tprs - fprs)  
best_alpha_youden = alpha_values[youden_index]

# Point closest to (0,1)
distances = np.sqrt((1 - tprs)**2 + fprs**2)
best_alpha_distance = alpha_values[np.argmin(distances)]

print(f"Best cut-off point by Youden’s criterion: {best_alpha_youden}")
print(f"Best cut-off point by the criterion of least distance: {best_alpha_distance}")

