In [None]:
import numpy as np
import pandas as pd
import re
import nltk
import string
nltk.download('stopwords')
nltk.download('punkt_tab')
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from nltk.tokenize import word_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer, HashingVectorizer
from sklearn.model_selection import train_test_split
from bs4 import BeautifulSoup
from sklearn.metrics import confusion_matrix,accuracy_score, f1_score, precision_score,recall_score,classification_report
import time
from sklearn.svm import SVC
from google.colab import drive
import pickle

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


In [None]:
class HardMarginSVM:
    """
    Optimized Hard Margin SVM implementation using gradient descent

    Attributes
    -------------
    eta : float
        Learning rate
    epoch : int
        Number of epochs
    random_state : int
        Random seed
    is_trained : bool
        Training completion flag
    num_samples : int
        Number of training samples
    num_features : int
        Number of features
    w : NDArray[float]
        Parameter vector: (num_features, ) ndarray
    b : float
        Bias parameter
    alpha : NDArray[float]
        Lagrange multipliers: (num_samples, ) ndarray
    """
    def __init__(self, eta=0.001, epoch=1000, random_state=42):
        self.eta = eta
        self.epoch = epoch
        self.random_state = random_state
        self.is_trained = False
        self.support_vectors = None

    def fit(self, X, y):
        """
        Fit parameter vector to training data

        Parameters
        --------------
        X : NDArray[NDArray[float]]
            Training data: (num_samples, num_features) matrix
        y : NDArray[float]
            Training labels: (num_samples) ndarray
        """
        # Convert sparse matrix to dense if needed
        if hasattr(X, "toarray"):
            X = X.toarray()

        self.num_samples = X.shape[0]
        self.num_features = X.shape[1]

        y_unique = np.unique(y)
        if len(y_unique) != 2:
            raise ValueError("Binary classification requires exactly 2 classes")

        if set(y_unique) == {0, 1}:
            y = np.where(y == 0, -1, 1)

        self.w = np.zeros(self.num_features)
        self.b = 0


        rgen = np.random.RandomState(self.random_state)
        self.alpha = rgen.uniform(low=0.0, high=0.01, size=self.num_samples)
        for i in range(self.epoch):
            self._cycle(X, y)

        sv_indices = np.where(self.alpha != 0)[0]

        self.support_vectors = sv_indices

        self.w = np.zeros(self.num_features)
        for i in sv_indices:
            self.w += self.alpha[i] * y[i] * X[i]

        bias_sum = 0
        for i in sv_indices:
            bias_sum += y[i] - np.dot(self.w, X[i])

        self.b = bias_sum / len(sv_indices)

        self.is_trained = True
        return self

    def predict(self, X):
        """
        Return predictions

        Parameters
        --------------
        X : NDArray[NDArray[float]]
            Data to classify: (any, num_features) matrix

        Returns
        ----------
        result : NDArray[int]
            Classification results 0 or 1: (any, ) ndarray
        """
        if not self.is_trained:
            raise Exception('Model not trained yet')

        # Convert sparse matrix to dense if needed
        if hasattr(X, "toarray"):
            X = X.toarray()

        decision_values = X @ self.w + self.b

        result = np.where(decision_values >= 0, 1, 0)
        return result

    def _cycle(self, X, y):
        """
        One gradient descent cycle

        Parameters
        --------------
        X : NDArray[NDArray[float]]
            Training data: (num_samples, num_features) matrix
        y : NDArray[float]
            Training labels: (num_samples) ndarray
        """
        y = y.reshape([-1, 1])

        XXT = X @ X.T
        H = (y @ y.T) * XXT

        grad = np.ones(self.num_samples) - H @ self.alpha

        self.alpha += self.eta * grad

        self.alpha = np.clip(self.alpha, 0, None)


class LinearSVM:
    def __init__(self, C=1.0, max_iter=1000, lr=0.001, tolerance=1e-5):
        self.C = C
        self.max_iter = max_iter
        self.lr = lr
        self.tolerance = tolerance
        self.w = None
        self.b = 0

    def fit(self, X, y):
        # Convert labels to -1, 1 if they're 0, 1
        y_binary = np.where(y <= 0, -1, 1)

        n_samples, n_features = X.shape

        self.w = np.zeros(n_features)

        alpha = np.zeros(n_samples)

        # Pre-compute Gram matrix to avoid recalculation in the loop
        # K[i,j] = y_i * y_j * (x_i · x_j)
        K = np.dot(X, X.T) * np.outer(y_binary, y_binary)

        # SGD optimization
        for iteration in range(self.max_iter):
            alpha_prev = alpha.copy()

            # Vectorized margin calculation
            margins = 1 - K.dot(alpha)

            # Update all alphas in one step
            mask = margins > 0
            alpha[mask] += self.lr * margins[mask]

            # Apply box constraint
            alpha = np.clip(alpha, 0, self.C)

            # Check convergence
            if np.max(np.abs(alpha - alpha_prev)) < self.tolerance:
                break

        # Calculate weights
        self.w = np.dot(X.T, alpha * y_binary)

        # Calculate bias using support vectors
        sv_indices = alpha > 1e-5
        if np.any(sv_indices):
            self.b = np.mean(y_binary[sv_indices] - np.dot(X[sv_indices], self.w))

    def predict(self, X):
        """Predict class labels for samples in X."""
        return np.where(np.dot(X, self.w) + self.b >= 0, 1, 0)

    def get_parameters(self):
      print(f'w: {self.w}')
      print(f'b: {self.b}')

    def decision_function(self, X):
        """Return distance of samples to the decision boundary."""
        return np.dot(X, self.w) + self.b

In [None]:
a = [i for i in range(1000, 10000, 500)]
print(a)

[1000, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 5500, 6000, 6500, 7000, 7500, 8000, 8500, 9000, 9500]


In [None]:
def calc_time_svm():
  results = []
  for i in a:
    df = pd.read_csv(f"https://media.githubusercontent.com/media/PTIT-Projects/ttcs-svm-spam-email/refs/heads/main/dataset/sampled_dataset{i}.csv")

    # viet thuong
    df['text'] = df['text'].str.lower()

    # xoa ky tu khong phai ASCII
    df['text'] = df['text'].apply(lambda x: re.sub(r'[^\x00-\x7F]+', '', x) if isinstance(x, str) else x)

    # xoa khoang trang
    df['text'] = df['text'].apply(lambda x: re.sub(r'^\s+|\s+$', '', x).strip() if isinstance(x, str) else x)

    # xoa html, xml
    def remove_html_xml(text):
        try:
            soup = BeautifulSoup(text, 'html.parser')
            return soup.get_text()
        except:
            return text

    df['text'] = df['text'].apply(remove_html_xml)

    # xoa ky tu dac biet
    def remove_special_characters(word):
        return word.translate(str.maketrans('', '', string.punctuation))

    df['text'] = df['text'].apply(remove_special_characters)

    # xoa url
    def remove_urls(text):
        return re.sub(r'http\S+|www\S+|\S+\.(com|net|org|edu|gov|mil|int|info|biz|co)\S+', '', text)

    df['text'] = df['text'].apply(remove_urls)

    # xoa dia chi email
    def remove_emails(text):
        return re.sub(r'\S+@\S+', '', text)

    df['text'] = df['text'].apply(remove_emails)

    # tach thanh cac tu
    df['text'] = df['text'].apply(word_tokenize)

    # xoa tu dung(tieng anh)
    ENGLISH_STOP_WORDS = set(stopwords.words('english'))

    def remove_stop_words(words):
        return [word for word in words if word not in ENGLISH_STOP_WORDS]

    df['text'] = df['text'].apply(remove_stop_words)

    # cat goc tu
    stemmer = PorterStemmer()

    def stem_words(words):
        return [stemmer.stem(word) for word in words]

    df['text'] = df['text'].apply(stem_words)

    # noi cac tu thanh chuoi
    df['text'] = df['text'].apply(' '.join)

    # dataset trainning voi test
    X = df['text']
    y = df['label']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1)

    # tfidf
    # vectorizer = TfidfVectorizer()
    # X_train_tfidf = vectorizer.fit_transform(X_train)
    # X_test_tfidf = vectorizer.transform(X_test)

    # # hashing_vectorizer
    hashing_vectorizer = HashingVectorizer(n_features=1000)
    X_train_hashed = hashing_vectorizer.fit_transform(X_train)
    X_test_hashed = hashing_vectorizer.transform(X_test)
    # X_train_dense = X_train_tfidf.toarray()
    # X_test_dense = X_test_tfidf.toarray()
    result = []
    svm_base = SVC(kernel = 'linear')
    start_time = time.time()
    svm_base.fit(X_train_hashed, y_train)
    end_time = time.time()
    y_pred = svm_base.predict(X_test_hashed)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    result.append({
            'class_name': svm_base.__class__.__name__,
            'time': end_time - start_time,
            'accuracy_score': accuracy,
            'f1_score': f1
        })
    results_df = pd.DataFrame(result)
    results_df.to_csv(f'base_svm_tfidf_{i}.csv', index=False)
    with open('linear_svm.pkl', 'wb') as model_file:
        pickle.dump(svm_base, model_file)

    # Save the vectorizer
    with open('hashing_vectorizer.pkl', 'wb') as vectorizer_file:
        pickle.dump(hashing_vectorizer, vectorizer_file)
  return results

In [None]:
calc_time_svm()

KeyboardInterrupt: 