In [1]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA
import pandas as pd
import numpy as np
import math
import random
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import mutual_info_classif
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
import crossovers

In [2]:
def process_data(sms_data_str):
    """
    convert `sms_data_str` into a pandas dataframe
    """
    data_arr = []

    data_records = sms_data_str.split('\n')[:-1]
    for data in data_records:
        label = None
        sample = None
        match data[:3]:
            case 'ham':
                label = 'legitimate'
                sample = data[4:] 
            case 'spa':
                label = 'spam'
                sample = data[5:] 
            case _:
                label = 'N/A'
            
        data_arr.append([label, sample])
        
    data_arr = np.array(data_arr)
    data_label = data_arr[:, 0]
    data_records = data_arr[:, 1]
    
    return data_records, data_label

def tfidf_vectorizer(records):
    vectorizer = TfidfVectorizer(
        lowercase=True,
        token_pattern=r'\b[A-Za-z]+\b', 
        norm=None
    )
    
    records_transformed = vectorizer.fit_transform(records)

    return records_transformed.toarray(), vectorizer.get_feature_names_out()

def feature_extraction(X, n_components=5):
    reduction_pca = PCA(
        n_components=n_components,
        whiten=False
    )
    data_reduced = reduction_pca.fit_transform(X)
    return data_reduced

def feature_selection(df_records, labels, n_components=5):
    feature_selection_model = SelectKBest(mutual_info_classif, k=n_components) 
    ## make a selection over the best features
    selected_record_features = feature_selection_model.fit_transform(df_records, labels)
    
    return selected_record_features, feature_selection_model.get_feature_names_out()

In [3]:
sms_data_str = None
with open('SMSSpamCollection') as file:
    sms_data_str = file.read()

In [4]:
records, labels = process_data(sms_data_str)
records_vectorized, feature_names = tfidf_vectorizer(records)

## one hot encoding labels
labels = np.array([0 if y == 'legitimate' else 1 for y in labels] )

## reducing dimension
records_dim_reduced = feature_extraction(records_vectorized)

In [5]:
records_dim_reduced[:5]

array([[-1.8563665 ,  0.28427212, -1.18541585,  0.82135553,  0.71934382],
       [-2.78400944,  0.52103475, -1.74213779,  0.50020848, -0.74615047],
       [ 0.48298531, -0.03899019,  2.01194267, -6.53525822,  1.00704096],
       [-1.83559363,  1.1388219 , -3.93114467, -0.18593274, -1.99660069],
       [ 0.27699672, -0.77845352,  0.11742281,  1.32176379, -0.75921494]])

In [6]:
records_vectorized = pd.DataFrame(records_vectorized, columns=feature_names)

records_selection, feature_name_selection = feature_selection(records_vectorized,labels=labels)

In [7]:
## for better visualization
df = pd.DataFrame(records_selection, columns=feature_name_selection)
df['labels'] = labels
df

Unnamed: 0,call,free,i,to,txt,labels
0,0.0000,0.000000,0.000000,0.000000,0.00000,0
1,0.0000,0.000000,0.000000,0.000000,0.00000,0
2,0.0000,4.187968,0.000000,6.584244,4.51406,1
3,0.0000,0.000000,0.000000,0.000000,0.00000,0
4,0.0000,0.000000,1.992194,2.194748,0.00000,0
...,...,...,...,...,...,...
5569,3.3125,0.000000,0.000000,0.000000,0.00000,1
5570,0.0000,0.000000,0.000000,2.194748,0.00000,0
5571,0.0000,0.000000,0.000000,0.000000,0.00000,0
5572,0.0000,4.187968,3.984388,2.194748,0.00000,0


In [36]:
#Divide our dataset to spam and ham parts for feeding to the model!
# X_train, X_test, y_train, y_test = train_test_split(records_selection, labels)
X_train, X_test, y_train, y_test = train_test_split(records_selection, labels,test_size=0.33, random_state=42)

spam_dataset = df[df['labels']==1]
ham_dataset = df[df['labels']==0]

spam_dataset = []
spam_labels = []
ham_dataset = []
ham_labels = []


for i in range(len(X_train)):
    if y_train[i] == 1:
        spam_dataset.append(X_train[i])
        spam_labels.append(y_train[i])
    else:
        ham_dataset.append(X_train[i])
        ham_labels.append(y_train[i])


In [39]:
class Chromosome:
    def __init__(self, m=None, s=None, cf=None):
        # rule = [[['x1','h'],['x2','l'],.......,['label','0']],[...],...]
        self.rules = []
        # determine the function of rules tri,sigmoid,...
        self.functions = []
        # determine the m,s values for the rules
        self.s = s
        self.m = m
        # determine the cf for the chromosome
        self.cf = cf
        # to do
        # add don't care as linguistic_terms
        # number of features in rules can vary -> 1 feature to 5 feature

In [148]:
def find_total_range(data):
    min_feature = np.min(data)
    max_feature = np.max(data)
    return min_feature,max_feature

def s_m_init(min_feature,max_feature):
    step = math.ceil((max_feature-min_feature) /5)
    m_list = []
    s_list = []
    # 0 23 4 , 0 , 4, 8, 12 , 16 , 20
    if int(min_feature == 0):
        min_feature = 0.1
    for i in range(int(min_feature), int(max_feature), step):
        m_list.append(random.uniform(i, i+step))

    total_dist = 0 
    # min_dist = 10000000
    max_dist = -1
    for i in range(0,len(m_list)-1):
        max_dist = max(max_dist, m_list[i+1]-m_list[i])
        # min_dist = min(min_dist, m_list[i+1]-m_list[i])
        total_dist += m_list[i+1]-m_list[i]

    avg_dist = total_dist / step
    for i in range(int(min_feature), int(max_feature), step):
        s_list.append(random.uniform(random.uniform(0.001,avg_dist),random.uniform(avg_dist,max_dist)))


    return m_list, s_list


In [149]:
def function(func, x, m , s):
    if func == 'tri':
       left = (x - m + s) / s
       right = (m + s - x) / s
       return max(min(left, right), 0)
    
    elif func == 'rect-trap':
        return max( min ((x-m+s)/s, 1),0)
    
    elif func == 'gaussian':
        return math.exp((-1/2)*((x-m)/s)**2)
    
    elif func == 'sigmoid':
        return 1 / (1 + math.exp(-((x-m)/s)))

In [150]:
## TODO: build a fuzzy rule-based model for (records, label)
initial_functions = ['tri', 'rect-trap', 'gaussian', 'sigmoid']
linguistic_terms = ['vl', 'l', 'm', 'h', 'vh', 'X']


def get_individuals(individuals, population_size, number_of_rules):
    min_feature, max_feature = find_total_range(X_train)
    for _ in range(population_size):
        chromosome = Chromosome()

        # determine s, m of chromosome
        s, m = s_m_init(min_feature, max_feature)
        chromosome.m = m
        chromosome.s = s
        
        # determine functions for linguistic_terms
        # number_of_linguistic_terms = random.randint(3,5)
        number_of_linguistic_terms = 5
        for _ in range(number_of_linguistic_terms):
            random_function_index = random.randint(0, len(initial_functions) - 1)
            chromosome.functions.append(initial_functions[random_function_index])
        
        # determine rules of chromosome
        for _ in range(number_of_rules):        
            rule = []
            for i in range(5):
                linguistic_term_selection = linguistic_terms[random.randint(0, len(linguistic_terms) - 1)]
                rule.append([f'x{i+1}', linguistic_term_selection])
            rule.append(['label', random.randint(0, 1)])
            # update our chromosome
            chromosome.rules.append(rule)
            
        # add chromosome to individuals list
        individuals.append(chromosome)
        
    return individuals

In [151]:
np.seterr(divide='ignore', invalid='ignore')
def calculate_membership(chromosome, features):
    output = [0, 0]
    for rule in chromosome.rules:
        value = 1
        x_num = 0 
        for i in range(len(features)):
            # x ['x1', 'l']
            if rule[i][1] == 'X':
                x_num += 1
                value *= 1
                continue
            index = linguistic_terms.index(rule[i][1])
            m = chromosome.m[index]
            s = chromosome.s[index]
            tmp = function(chromosome.functions[index], features[i], m, s)
            if random.randint(0, 1) == 0:
                value *= (1 - tmp)
            else : value *= tmp
        if x_num == len(features):
            value = 0
        if rule[5][1] == 0:
            output[0] += value
        else:
            output[1] += value
    return output

def calculate_cf(chromosome, gc, true_label):
    # print('calculate cf:',gc,true_label)
    if gc[0] == 0 and gc[1] == 0:
        chromosome.cf = 0
    else:
        if true_label == 0:
            f_c = gc[0]
            f_neg = gc[1]
        else:
            f_c = gc[1]
            f_neg = gc[0]
        cf = (f_c - f_neg) / (f_c + f_neg)
        chromosome.cf = cf
        
def select_random_input(mode):

    if mode == 1:
        index = random.randint(0, len(spam_dataset) - 1)
        features = spam_dataset[index]
        label = spam_labels[index]
        # features = spam_dataset.iloc[index][:-1]
        # label = spam_dataset.iloc[index][-1]

    else:
        ham_index = random.randint(0, len(ham_dataset) - 1)
        # features = ham_dataset.iloc[ham_index][:-1]
        # label = ham_dataset.iloc[ham_index][-1]
        features = ham_dataset[ham_index]
        label = ham_labels[ham_index]


    return features,label

In [152]:
def make_crossover(individuals):

    P_cro_labels = 0.5

    cf_list = []
    for chromosome in individuals:
        cf_list.append(chromosome.cf)

    weighted_cf = []
    for i in range(len(cf_list)):
        weighted_cf.append(cf_list[i] / np.sum(cf_list))
    
    # select 2 chromosomes as parents
    parent1 = random.randint(0, len(individuals) - 1)
    parent2 = random.randint(0, len(individuals) - 1)

    # make child elements
    # new rules

    final_child_rules1 = []
    final_child_rules2= []

    for rule_ind in range (len(individuals[parent1].rules)):
        child1_rules = []
        child2_rules = []
        #Extract the seocond part (e.g: [x1,l] => l)
        var_terms1 = []
        var_terms2 = []
        for each in individuals[parent1].rules[rule_ind]:
            var_terms1.append(each[1])
        for each in individuals[parent2].rules[rule_ind]:
            var_terms2.append(each[1])

        # print('var_terms:', var_terms1, var_terms2)
        rules1, rules2 = crossovers.two_point_crossover(var_terms1[:-1], var_terms2[:-1])


        if random.random() <= P_cro_labels:
            rules1.append(var_terms2[-1])
            rules2.append(var_terms1[-1])
        else:
            rules2.append(var_terms2[-1])
            rules1.append(var_terms1[-1])


        for index , each in enumerate(individuals[parent1].rules[rule_ind]):
            child1_rules.append([each[0],rules1[index]])
        for index , each in enumerate(individuals[parent2].rules[rule_ind]):
            child2_rules.append([each[0],rules2[index]])

        final_child_rules1.append(child1_rules)
        final_child_rules2.append(child2_rules)
        
    # new m
    m1, m2 = crossovers.two_point_crossover(individuals[parent1].m, individuals[parent2].m)
    # new s
    s1, s2 = crossovers.two_point_crossover(individuals[parent1].s, individuals[parent2].s)
    
    function1, function2 = crossovers.two_point_crossover(individuals[parent1].functions, individuals[parent2].functions)
    
    child1 = Chromosome(m1, s1)
    child2 = Chromosome(m2, s2)


    child1.rules = final_child_rules1
    child2.rules = final_child_rules2


    child1.functions = function1
    child2.functions = function2


    return child1, child2

In [153]:
def mutataion(chromosome):
    for rule in chromosome.rules:
        for x in rule:
            if x[1] == 'X' and random.randint(0, 1) == 1:
                random_function_index = random.randint(0, len(initial_functions) - 1)
                x[1] = initial_functions[random_function_index]
    for j in len(chromosome.functions):
        if random.randint(0, 1) == 0:
            chromosome.functions[j] = initial_functions[random.randint(0, len(initial_functions) - 1)]


    
    for i in range(len(chromosome.m)-1):
        min_dist = min(min_dist, chromosome.m[i+1]- chromosome.m[i])
    

In [111]:
def calculate_accuracy(mode, dic):
    our_labels = []
    if mode == 'test':
        for i in range(len(X_test)):
            features = X_test[i]
            # label = labels[i]
            total_gc = [0, 0]
            for j in [0,1]:
                for chromosome in dic[j]:
                    gc = calculate_membership(chromosome, features)
                    total_gc[0] += gc[0]
                    total_gc[1] += gc[1] 

            predicted_label = np.argmax(total_gc)
            our_labels.append(predicted_label)
    else:
        for i in range(len(X_train)):
            features = X_train[i]
            # label = labels[i]
            total_gc = [0, 0]
            for j in [0,1]:
                for chromosome in dic[j]:
                    gc = calculate_membership(chromosome, features)
                    total_gc[0] += gc[0]
                    total_gc[1] += gc[1] 

            predicted_label = np.argmax(total_gc)
            our_labels.append(predicted_label)
    return our_labels

In [112]:
# def pick_top_chromosomes(dic, i):
#     l = dic[i]
#     l.sort(key=lambda x: x.cf, reverse=True)
#     for j in range(len(l)):
#         if l[j].cf < 1:
#             l = l[:j]
#             break
    

def EA(generations, P_mut, P_cro, population_size, number_of_rules):

    individuals = []
    individuals = get_individuals(individuals, population_size, number_of_rules)

    # find cf of initial individuals
    # select random input
    features, label = select_random_input(random.randint(0, 1))

    # calculate cf
    for chromosome in individuals:
        gc = calculate_membership(chromosome, features)
        calculate_cf(chromosome, gc, label)
    
    dic = {}
    dic[0] = list(individuals[:population_size // 2])
    dic[1] = list(individuals[population_size // 2:])

    for i in [0, 1]:
        # evolutionary algorithm
        for _ in range(generations):
            # select random input
            features, label = select_random_input(i)
            # making childs
            for _ in range(population_size):
                if random.random() < P_cro:
                    child1, child2 = make_crossover(dic[i])
                    for chromosome in [child1, child2]:
                        gc = calculate_membership(chromosome, features)
                        calculate_cf(chromosome, gc, label)
                    for chromosome in [child1, child2]:
                        dic[i].append(chromosome)
                if random.random() < P_mut:
                    # todo
                    # implement mutation function
                    pass
            
            l = dic[i]
            random.shuffle(l)
            l.sort(key=lambda x: x.cf, reverse=True)
            l = l[:population_size // 2]
            dic[i] = l
    return dic

In [147]:
epochs = 1

total_test = 0.0
total_train = 0.0

dic = EA(15, 0.1, 0.9, 50, 1)

for _ in range(epochs):
    # dic = EA(15, 0.1, 0.9, 50, 1)

    our_labels_train = calculate_accuracy('train', dic)
    our_labels_test = calculate_accuracy('test', dic)
    
    total_test += np.mean(np.array(y_test) == np.array(our_labels_test))
    total_train += np.mean(np.array(y_train) == np.array(our_labels_train))

# for i in [0,1]:
#     for chromosome in dic[i]:
#         print(chromosome.rules[0])

# x1 = f1_score(y_train,our_labels_train)
# x2 = f1_score(y_test,our_labels_test)

print('Accuracy Train: ',(total_test / epochs) * 100)
print('Accuracy Test: ', (total_train / epochs) * 100)

Accuracy Train:  83.65760869565216
Accuracy Test:  84.26084627745047
