In [13]:
import numpy as np
import pandas as pd
from numpy.random import default_rng

encode_num = [7, 7, 7, 7, 7, 7, 7]

#二進位轉十進位
def bin2dec(bin):
    result = 0
    #here
    for i in range(len(bin)-1, -1, -1):
        result += pow(2, len(bin)-i-1) * bin[i]
    return result

def generate_rule(Qbits):
    rule = []
    index = 0
    for i in range(len(encode_num)):
        temp = []
        if Qbits[index] == 1:
            temp.append(">")
        else:
            temp.append("<=")
        temp.append(bin2dec(Qbits[index+1:index+encode_num[i]]))
        rule.append(temp)
        index += encode_num[i]
    return rule

#fitness的計算根據問題會不一樣，這裡使用ABS當範例
def fitness(QTS, data):
    for i in range(QTS.population_size):
        QTS.fitness[i] = 0
        rule = generate_rule(QTS.Population[i])
        for d in data:
            token = True
            for j in range(len(encode_num)):
                if rule[j][0] == ">":
                    if d[j+1] <= rule[j][1]:
                        token = False
                        break
                elif d[j+1] > rule[j][1]:
                    token = False
                    break
            if (token == True and d[-1] == 1) or (token == False and d[-1] == 0):
                QTS.fitness[i] += 1
        QTS.fitness[i] /= len(data)
                       
class QTS:
    #初始化
    def __init__(self, population_size = 10, max_cycle = 100, Qbit_num = 10, theta = 0.01):
        self.population_size = population_size
        self.max_cycle = max_cycle
        self.Qbit_num = Qbit_num
        self.theta = theta
        self.Population = np.zeros((population_size, Qbit_num))
        self.Q_matrix = [0.5 for i in range(Qbit_num)]
        self.fitness = [None for i in range(population_size)]
        self.best_index = -1
        self.worst_index = -1
        self.best = [None for i in range(Qbit_num)]
        self.dice = default_rng(500)
    
    #設定旋轉角度
    def set_theta(self, new_theta):
        self.theta = new_theta
    
    #生成新的Population
    def generate_new_population(self):
        for i in range(self.population_size):
            self.Population[i] = self.dice.random(self.Qbit_num)
            #here
            self.Population[i] = np.where(self.Population[i] > self.Q_matrix, 1, 0)
    
    #找出最佳及最差的粒子(min為最佳)
    def selection_find_min(self):
        self.best_index = self.fitness.index(min(self.fitness))
        self.worst_index = self.fitness.index(max(self.fitness))
        #here
        if self.best[-1] == None:
            self.best = self.Population[self.best_index]
            self.best = np.append(self.best, min(self.fitness))
        elif min(self.fitness) < self.best[-1]:
            self.best = self.Population[self.best_index]
            self.best = np.append(self.best, min(self.fitness))
    
    #找出最佳及最差的粒子(max為最佳)
    def selection_find_max(self):
        self.best_index = self.fitness.index(max(self.fitness))
        self.worst_index =self.fitness.index(min(self.fitness))
        #here
        if self.best[-1] == None:
            self.best = self.Population[self.best_index]
            self.best = np.append(self.best, max(self.fitness))
        elif max(self.fitness) > self.best[-1]:
            self.best = self.Population[self.best_index]
            self.best = np.append(self.best, max(self.fitness))
    
    #更新Qmatrix
    def update(self):
        for i in range(self.Qbit_num):
            if self.Population[self.best_index][i] != self.Population[self.worst_index][i]:
                if self.Population[self.best_index][i] == 1:
                    self.Q_matrix[i] -= self.theta
                else:
                    self.Q_matrix[i] += self.theta
                    
    #更新Qmatrix, Gbest
    #here
    def update_G(self):
        for i in range(self.Qbit_num):
            if self.best[i] != self.Population[self.worst_index][i]:
                if self.best[i] == 1:
                    self.Q_matrix[i] -= self.theta
                else:
                    self.Q_matrix[i] += self.theta
    
    #簡易測試
    def run(self):
        for i in range(self.max_cycle):
            self.generate_new_population()
            fitness(self)
            self.selection_find_min()
            self.update()
            print(self.fitness)
            print(self.best)

In [14]:
#範例

Trained_data = pd.read_csv("0209_bin_data_DOS.csv" , sep = "," , encoding = 'utf-8')
data = np.array(Trained_data)

QTS_1 = QTS(10, 2000, sum(encode_num), 0.0004)
for i in range(QTS_1.max_cycle):
    QTS_1.generate_new_population()
    fitness(QTS_1, data)
    QTS_1.selection_find_max()
    QTS_1.update()
print(QTS_1.best)

[0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         1.
 0.         0.         0.         1.         1.         1.
 1.         1.         0.         0.         1.         1.
 0.         0.         0.         1.         0.         1.
 0.         1.         1.         0.         0.         0.
 0.         1.         0.         0.         1.         1.
 0.         1.         0.         0.         0.         1.
 1.         0.92839234]


In [15]:
#範例

Trained_data = pd.read_csv("0209_bin_data_DOS.csv" , sep = "," , encoding = 'utf-8')
data = np.array(Trained_data)

QTS_G = QTS(10, 2000, sum(encode_num), 0.0004)
for i in range(QTS_G.max_cycle):
    QTS_G.generate_new_population()
    fitness(QTS_G, data)
    QTS_G.selection_find_max()
    QTS_G.update_G()
print(QTS_G.best)

In [13]:

data = np.array(Trained_data)
QTS_test = QTS(10, 10000, sum(encode_num), 0.004)
QTS_test.generate_new_population()
rules = []
bin2dec(QTS_test.Population[0][1:17])

45217.0

In [None]:
basic_tested_data.describe()

Unnamed: 0,duration,src_bytes,dst_bytes,wrong_fragment,urgent,attack_state
count,22544.0,22544.0,22544.0,22544.0,22544.0,22544.0
mean,218.859076,10395.45,2056.019,0.008428,0.00071,0.569242
std,1407.176612,472786.4,21219.3,0.142599,0.036473,0.495193
min,0.0,0.0,0.0,0.0,0.0,0.0
25%,0.0,0.0,0.0,0.0,0.0,0.0
50%,0.0,54.0,46.0,0.0,0.0,1.0
75%,0.0,287.0,601.0,0.0,0.0,1.0
max,57715.0,62825650.0,1345927.0,3.0,3.0,1.0


In [11]:
Trained_data = pd.read_csv("0209_bin_data_DOS.csv" , sep = "," , encoding = 'utf-8')
Trained_data = Trained_data.iloc[:,1:]
data = np.array(Trained_data)

In [12]:
Trained_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 113270 entries, 0 to 113269
Data columns (total 8 columns):
 #   Column                    Non-Null Count   Dtype  
---  ------                    --------------   -----  
 0   src_bytes                 113270 non-null  float64
 1   diff_srv_rate             113270 non-null  float64
 2   same_srv_rate             113270 non-null  float64
 3   dst_bytes                 113270 non-null  float64
 4   count                     113270 non-null  float64
 5   dst_host_srv_serror_rate  113270 non-null  float64
 6   dst_host_serror_rate      113270 non-null  float64
 7   abnormal                  113270 non-null  int64  
dtypes: float64(7), int64(1)
memory usage: 6.9 MB


In [46]:
Trained_data = pd.read_csv("basic_trained_data.csv" , sep = "," , encoding = 'utf-8')
Test_data = pd.read_csv("basic_tested_data.csv" , sep = "," , encoding = 'utf-8')
data = np.array(Trained_data)

array([1, 0, 0, 0, 0, 0, 1], dtype=int64)

In [78]:
f.close()

In [34]:
encode_num

[17, 32, 32, 3, 5]