In [2]:
import pandas as pd
from pydantic import BaseModel
import random

In [3]:
# got from https://www.tensorflow.org/datasets/catalog/kddcup99
feature_names = ["duration", "protocol_type", "service", "flag", 'src_bytes', 'dst_bytes', 'land', 'wrong_fragment', 'urgent', 'hot', 'num_failed_logins', 'logged_in', 'num_compromised', 'root_shell', 'su_attempted', 'num_root', 'num_file_creations', 'num_shells', 'num_access_files', 'num_outbound_cmds', 'is_hot_login', 'is_guest_login', 'count', 'srv_count', 'serror_rate', 'srv_serror_rate', 'rerror_rate', 'srv_rerror_rate', 'same_srv_rate', 'diff_srv_rate', 'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count', 'dst_host_same_srv_rate', 'dst_host_diff_srv_rate', 'dst_host_same_src_port_rate', 'dst_host_srv_diff_host_rate', 'dst_host_serror_rate', 'dst_host_srv_serror_rate', 'dst_host_rerror_rate', 'dst_host_srv_rerror_rate', 'label']

train_df = pd.read_csv("kddcup.data.gz", names=feature_names, compression='gzip') # http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz
test_df = pd.read_csv("corrected.gz", names=feature_names, compression='gzip') # http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz

In [4]:
# fix label names
train_df["label"] = train_df["label"].apply(lambda x: x[:-1])
test_df["label"] = test_df["label"].apply(lambda x: x[:-1])

In [5]:
train_df.head()

Unnamed: 0,duration,protocol_type,service,flag,src_bytes,dst_bytes,land,wrong_fragment,urgent,hot,...,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate,label
0,0,tcp,http,SF,215,45076,0,0,0,0,...,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,normal
1,0,tcp,http,SF,162,4528,0,0,0,0,...,1,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,normal
2,0,tcp,http,SF,236,1228,0,0,0,0,...,2,1.0,0.0,0.5,0.0,0.0,0.0,0.0,0.0,normal
3,0,tcp,http,SF,233,2032,0,0,0,0,...,3,1.0,0.0,0.33,0.0,0.0,0.0,0.0,0.0,normal
4,0,tcp,http,SF,239,486,0,0,0,0,...,4,1.0,0.0,0.25,0.0,0.0,0.0,0.0,0.0,normal


In [6]:
discrete = ["protocol_type", "service", "flag"]
boolean = ["logged_in", "root_shell", "su_attempted", "is_hot_login", "is_guest_login"]
continuous = ["duration", "src_bytes", "dst_bytes", "wrong_fragment", "urgent", "hot", "num_compromised", 
              "num_root", "num_file_creations", "num_shells", "num_access_files", "num_outbound_cmds",
              "count", "serror_rate", "rerror_rate", "same_srv_rate", "diff_srv_rate", "srv_count", "srv_serror_rate", "srv_rerror_rate", "srv_diff_host_rate"]
labels = list(set(train_df["label"]))

In [7]:
discrete_values = {key: list(set(train_df[key])) for key in discrete}
continuous_min_max = {key: (train_df[key].min(), train_df[key].max()) for key in continuous}

In [8]:
class Chromosome(BaseModel):
    discrete: dict
    boolean: dict
    continuous: dict
    label: str

    @classmethod
    def generate_random(cls, label: str) -> "Chromosome":
        discrete_dict = {}
        for feature, values in discrete_values.items():
            discrete_dict[feature] = random.choice(values)

        boolean_dict = {}
        for feature in boolean:
            boolean_dict[feature] = random.choice([0, 1])

        continuous_dict = {}
        for feature, values in continuous_min_max.items():
            min_val = values[0]
            max_val = values[1]
            val_0 = random.uniform(min_val, max_val)
            val_1 = random.uniform(min_val, max_val)
            if val_0 < val_1:
                new_values = (val_0, val_1)
            else:
                new_values = (val_1, val_0)
            continuous_dict[feature] = new_values

        return cls(discrete=discrete_dict, boolean=boolean_dict, continuous=continuous_dict, label=label)
    
    def get_expression(self):
        expr_discrete = " & ".join([f"{key} == '{value}'" for key, value in self.discrete.items()])
        expr_boolean = " & ".join([f"{key} == {value}" for key, value in self.boolean.items()])
        expr_continuous = " & ".join([f"{key} >= {value[0]} & {key} <= {value[1]}" for key, value in self.continuous.items()])
        expression = " & ".join(
            [
                expr_discrete, 
                expr_boolean, 
                expr_continuous
            ]
        )
        return expression     


In [9]:
def check_chromosome(chromosome: Chromosome, df: pd.DataFrame):
    same_label_df: pd.DataFrame = df[df["label"] == chromosome.label]
    other_label_df: pd.DataFrame = df[df["label"] != chromosome.label]

    expression = chromosome.get_expression()
    
    true_positives = same_label_df.query(expression).shape[0]
    false_positives = same_label_df.shape[0] - true_positives

    true_negatives = other_label_df.query(expression).shape[0]
    false_negatives = other_label_df.shape[0] - true_negatives

    return {
        "TRUE_POSITIVE": true_positives,
        "FALSE_POSITIVE": false_positives,
        "FALSE_NEGATIVE": false_negatives,
        "TRUE_NEGATIVE": true_negatives
    }

In [10]:
c = Chromosome.generate_random(label="normal")
c.get_expression()

"protocol_type == 'icmp' & service == 'pop_3' & flag == 'S0' & logged_in == 0 & root_shell == 0 & su_attempted == 0 & is_hot_login == 0 & is_guest_login == 0 & duration >= 24918.487469770924 & duration <= 32625.815268880608 & src_bytes >= 644533628.4080403 & src_bytes <= 933169127.381886 & dst_bytes >= 440262973.7900939 & dst_bytes <= 692304440.3522764 & wrong_fragment >= 0.7701327180849414 & wrong_fragment <= 1.8327023644854508 & urgent >= 12.095239913790403 & urgent <= 13.309022719850951 & hot >= 5.261035035551147 & hot <= 17.246957229951654 & num_compromised >= 641.0655631665069 & num_compromised <= 5319.323352758933 & num_root >= 204.69961755276285 & num_root <= 1988.3505274788586 & num_file_creations >= 0.3702282971520816 & num_file_creations <= 28.429284139079822 & num_shells >= 0.30584327068305295 & num_shells <= 1.8091613575648842 & num_access_files >= 2.6108463899000647 & num_access_files <= 8.747037405399258 & num_outbound_cmds >= 0.0 & num_outbound_cmds <= 0.0 & count >= 190

In [11]:
d = Chromosome(
    discrete={
        "protocol_type": "tcp",
        "service": "http",
        "flag": "SF"   
    },
    boolean={
        "logged_in": 1,
        "root_shell": 0,
        "su_attempted": 0
    },
    continuous={
        "duration": (0, 300) 
    },
    label="normal"
)
d.get_expression()

"protocol_type == 'tcp' & service == 'http' & flag == 'SF' & logged_in == 1 & root_shell == 0 & su_attempted == 0 & duration >= 0 & duration <= 300"

In [12]:
check_chromosome(d, train_df)

{'TRUE_POSITIVE': 564103,
 'FALSE_POSITIVE': 408678,
 'FALSE_NEGATIVE': 3923543,
 'TRUE_NEGATIVE': 2107}