In [None]:
# Nhập thư viện
import numpy as np
import pandas as pd

In [None]:
# Tạo hàm lấy dữ liệu
def loadCsv(filename) -> pd.DataFrame:
    data = pd.read_csv(filename)
    return data

filename = '/content/drug200.csv'
data = loadCsv(filename)
data.head()

Unnamed: 0,Age,Sex,BP,Cholesterol,Na_to_K,Drug
0,23,F,HIGH,HIGH,25.355,DrugY
1,47,M,LOW,HIGH,13.093,drugC
2,47,M,LOW,HIGH,10.114,drugC
3,28,F,NORMAL,HIGH,7.798,drugX
4,61,F,LOW,HIGH,18.043,DrugY


In [112]:
# Tạo hàm chia dữ liệu thành train và test, tạo X_train/y_train và X_test/y_test.
def splitTrainTest(data, ratio_test):
    np.random.seed(28)  # Không thay đổi mỗi lần chạy
    index_permu = np.random.permutation(len(data))  # Xáo trộn index
    data_permu = data.iloc[index_permu]  # Lấy lại dữ liệu tương ứng với index xáo trộn
    len_test = int(len(data_permu) * ratio_test)  # Kích cỡ tập test
    test_set = data_permu.iloc[:len_test, :]  # Tập test lấy phần đầu
    train_set = data_permu.iloc[len_test:, :]  # Tập train lấy phần còn lại
    # Chia tập dữ liệu thành (X_train, y_train: Lấy tất cả cột trừ dòng cuối), (X_test, y_test: Chỉ lấy cột cuối)
    X_train = train_set.iloc[:, :-1]  # Tất cả cột trừ cột cuối
    y_train = train_set.iloc[:, -1]  # Cột cuối
    X_test = test_set.iloc[:, :-1]  # Tất cả cột trừ cột cuối
    y_test = test_set.iloc[:, -1]  # Cột cuối
    return X_train, y_train, X_test, y_test

In [113]:
# Hàm lấy tần số của từ: 1. lấy túi từ (bag words), 2. lấy tần số từ
def get_words_frequency(data_X):
    bag_words = np.concatenate([i[0].split(' ') for i in data_X.values], axis=None)
    bag_words = np.unique(bag_words)  # Loại bỏ các giá trị trùng và lấy giá trị duy nhất trong mảng bag_words
    matrix_freq = np.zeros((len(data_X), len(bag_words)), dtype=int)  # Tạo ma trận 0 có kích cỡ [số dòng data_X(dòng) x số từ trong túi từ(cột)]

    word_freq = pd.DataFrame(matrix_freq, columns=bag_words)  # Tạo frame với matrix_freq, cột là các từ trong túi từ
    for id, text in enumerate(X_train.values.reshape(-1)):
        for j in bag_words:  # Đối với mỗi id (dòng), ta lặp qua các từ trong túi (cột)
            word_freq.at[id, j] = text.split(' ').count(j)  # Đếm từ đó có trong biến text và gán tại vị trí [id, j]
    return word_freq, bag_words  # Trả lại biến tần số từ, --> DataFrame(cột là các từ trong túi từ)

def transform(data_test, bags):  # bags là bag_words được return từ hàm get_words_frequency, data_test dạng frame
    matrix_0 = np.zeros((len(data_test), len(bags)), dtype=int)
    frame_0 = pd.DataFrame(matrix_0, columns=bags)
    for id, text in enumerate(data_test.values.reshape(-1)):
        for j in bags:
            frame_0.at[id, j] = text.split(' ').count(j)
    return frame_0

In [114]:
# Trước khi tính khoảng cách thì đổi tập test sang số.
def encode_features(X):
    # Mã hóa các cột phân loại thành số
    X_encoded = pd.DataFrame()

    for column in X.columns:
        if X[column].dtype == 'object':
            X_encoded[column] = X[column].astype('category').cat.codes
        else:
            X_encoded[column] = X[column]

    return X_encoded

In [115]:
# Tạo hàm tính khoảng cách: Euclid, Manhattan, Cosine
def cosine_distance(train_X_number_arr, test_X_number_arr):
    dict_kq = dict()  # Tạo dictionary trống
    for id, arr_test in enumerate(test_X_number_arr, start=1):
        q_i = np.sqrt(sum(arr_test**2))  # Căn của tổng ([q_i]^2), dùng để tính mẫu
        for j in train_X_number_arr:
            _tu = sum(j * arr_test)  # Tính tử: tổng q[i]*dj[i]
            d_j = np.sqrt(sum(j**2))  # Tính mẫu
            _mau = d_j * q_i  # Tính mẫu
            kq = _tu / _mau  # Kết quả: khoảng cách của mỗi dòng trong test_X với các dòng trong train_X

            # Nếu index có trong dict_kq rồi thì ta thêm giá trị kq vào, nếu chưa thì ta tạo khoá id với giá trị kq.
            if id in dict_kq:
                dict_kq[id].append(kq)
            else:
                dict_kq[id] = [kq]

    return dict_kq  # Trả lại Dictionary với key: dòng trong tập test, value: các giá trị đã được tính khoảng cách với các dòng trong tập train

In [116]:
# Lớp KNN
class KNNText:
    # Hàm tạo
    def __init__(self, k):  # k là số điểm dữ liệu gần nhất
        self.k = k  # Số lượng hàng xóm gần nhất
        self.X_train = None
        self.y_train = None

    # Hàm fit
    def fit(self, X_train, y_train):
        self.X_train = X_train
        self.y_train = y_train

    # Hàm predict
    def predict(self, X_test):
        self.X_test = X_test

        # Trước khi tính khoảng cách thì đổi tập test sang số.
        X_test_encoded = encode_features(self.X_test)  # Mã hóa X_test
        X_train_encoded = encode_features(self.X_train)  # Mã hóa X_train

        _distance = cosine_distance(X_train_encoded.values, X_test_encoded.values)  # Tính khoảng cách tất cả các dòng trong tập test với tập train --> dict

        # Reset index y_train bắt đầu từ 0
        self.y_train.index = range(len(self.y_train))

        _distance_frame = pd.concat([pd.DataFrame(_distance), pd.DataFrame(self.y_train).reset_index(drop=True)], axis=1)

        target_predict = dict()  # Tạo dict trống
        for i in range(1, len(self.X_test) + 1):  # Lặp qua các dòng trong X_test thông qua index, bắt đầu từ 1
            # Lấy frame con chỉ hai cột i và target rồi sắp xếp cột i tăng dần, sau đó lấy k hàng đầu
            k_nearest = _distance_frame[i - 1].nsmallest(self.k)  # Lấy k giá trị nhỏ nhất

            # Đếm tần số giá trị trong cột target rồi lấy phần tử có tần số cao nhất gán cho phần tử của X_test
            most_common = k_nearest.value_counts().idxmax()  # Giá trị có tần số cao nhất

            # Thêm key và giá trị gán tương ứng vào target_predict
            target_predict[i] = [most_common]

        return target_predict  # Trả lại dict đã dự đoán

    def score(self, y_test, y_pred):
        correct = sum(y_test.values == y_pred.values())
        return correct / len(y_test)