In [42]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_validate
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.feature_selection import SelectKBest, SelectPercentile, f_classif
from nltk.corpus import stopwords
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import MultinomialNB
from sklearn.ensemble import RandomForestClassifier
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering

In [43]:
class EmailPreprocess:
    def __init__(self, filename):
        self.filename = filename
        self.data = None
        self.X = None
        self.y = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        self.scaler = None
        self.feature_selector = None
        self.feature_selection_methods = ['KBest', 'Percentile']
        
    def load_data(self):
        self.data = pd.read_csv(self.filename)

    def check_missing_values(self):
        cols = self.data.columns
        cols_with_missing_values = [col for col in cols if self.data[col].isna().sum()]
        if cols_with_missing_values:
            for col in cols_with_missing_values:
                self.data[col] = self.data[col].fillna(0)   
        
    def split_dataset(self):
        self.X = self.data.iloc[:, 1:-1] # 提取标志
        self.y = self.data.iloc[:, -1] # 提取标签

    def feature_scaling(self, method='MinMax'):
        """
        Метод для масштабирования признаков.
        method: 'MinMax', 'Standard'
        """
        if (method == 'Standard'):
            self.scaler = StandardScaler().fit(self.X)
        elif (method == 'MinMax'):
            self.scaler = MinMaxScaler().fit(self.X)
        else:
            return NotImplemented
        self.X = pd.DataFrame(self.scaler.transform(self.X), columns=self.X.columns) # 缩放功能

    def train_test_split(self, test_size=0.2, random_state=42):
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(self.X, self.y, test_size=test_size, random_state=random_state)

    def feature_selection(self, method='KBest', k=10):
        """
        Метод для отбора k лучших признаков.
        method: 'Kbest', 'Percentile'.
        Записывает лучше признаки в атрибут X_best_features
        """
        if (method == 'Percentile'):
            percentile = k/self.X.shape[1]*100
            self.feature_selector = SelectPercentile(f_classif, percentile=percentile).fit(self.X, self.y)
        elif (method == 'KBest'):
            self.feature_selector = SelectKBest(f_classif, k=k).fit(self.X, self.y)
        else:
            return NotImplemented
        self.X_best_feats = pd.DataFrame(self.feature_selector.transform(self.X), columns=self.feature_selector.get_feature_names_out()) # 最佳功能

    def text_cleaning(self):
        """
        Метод для очистки стоп слов 
        """
        STOPWORDS = stopwords.words('english')
        for word in STOPWORDS:
            if word in self.X.columns:
                self.X = self.X.drop(word, axis=1)

    def preprocess(self):
        self.load_data()
        self.check_missing_values()
        self.split_dataset()
        self.text_cleaning()
        self.feature_scaling()
        self.train_test_split()



In [44]:
class EmailClassifier(EmailPreprocess):
    def __init__(self, filename):
        super().__init__(filename)
        self.preprocess()
        self.best_fitted_model = None
        self.best_roc_auc_score = None  # 我们将根据roc auc选择最佳型号
        self.clusterer = None

    def find_best_model_and_params(self):
        """
        Метод для поиска лучшей модели и ее параметров.
        Записывает лучшую модель в self.best_fitted_model.
        """
        models = [KNeighborsClassifier(), LogisticRegression(), MultinomialNB(), RandomForestClassifier()]  
        self.best_roc_auc_score = 0

        for model in models:
            scores = cross_validate(model, self.X, self.y, cv=3, scoring=('accuracy', 'f1', 'precision', 'recall', 'roc_auc'))
            roc_auc_score = max(scores['test_roc_auc'])
            if roc_auc_score > self.best_roc_auc_score:
                self.best_roc_auc_score = roc_auc_score
                self.best_fitted_model = model

        self.best_fitted_model.fit(self.X_train, self.y_train)

    def evaluate_test_metrics(self):
        """
        Метод для оценки метрик лучшей модели на тестовом наборе данных.
        Возвращает словарь с метриками.
        """
        if self.best_fitted_model is None:
            raise ValueError("Лучшая модель не найдена. Сначала запустите find_best_model_and_params()")
        
        y_pred = self.best_fitted_model.predict(self.X_test)
        accuracy = accuracy_score(self.y_test, y_pred)
        f1 = f1_score(self.y_test, y_pred)
        precision = precision_score(self.y_test, y_pred)
        recall = recall_score(self.y_test, y_pred)
        return {
        'accuracy': accuracy,
        'f1': f1,
        'precision': precision,
        'recall': recall
        }
    
    def cluster_emails(self, method='kmeans'):
        """
        Метод для кластеризации электронных писем на два кластера.
        method: 'kmeans', 'hierarchical' или 'dbscan'.
        Возвращает два значения: процент данных, соответствующих спаму в каждом
        кластере.
        """
        if method == 'kmeans':
            self.clusterer = KMeans(n_clusters=2).fit(self.X)
        elif method == 'dbscan':
            self.clusterer = DBSCAN(eps=4).fit(self.X)
        elif method == 'hierarchal':
            self.clusterer = AgglomerativeClustering(n_clusters=2).fit(self.X)
        else:
            return NotImplemented

        y_and_clusters = pd.concat([self.y, pd.Series(self.clusterer.labels_, name='Cluster')], axis=1)
        cond0 = (y_and_clusters['Cluster'] == 0) & (y_and_clusters['Prediction'] == 1)
        cond1 = (y_and_clusters['Cluster'] == 1) & (y_and_clusters['Prediction'] == 1)

        return (round(y_and_clusters[cond0].shape[0]/y_and_clusters[y_and_clusters['Cluster'] == 0].shape[0] * 100, 2), 
                round(y_and_clusters[cond1].shape[0]/y_and_clusters[y_and_clusters['Cluster'] == 1].shape[0] * 100, 2))



In [45]:
clf = EmailClassifier('emails.csv')
clf.find_best_model_and_params()


In [46]:
print(clf.evaluate_test_metrics())

{'accuracy': 0.9729468599033816, 'f1': 0.9533333333333334, 'precision': 0.9407894736842105, 'recall': 0.9662162162162162}


In [47]:
clf.cluster_emails()

(28.46, 95.24)