# Лабараторная 5
## Выбор признаков

### Был выбран датасет твитов с времени пандемии Covid19
Признаки:
- UserName
- ScreenName
- Location
- TweetAt
- OriginalTweet
- Sentiment

Были выбраны классификаторы:
- LogisticRegression
- RandomForestClassifier
- SVC

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.base import clone
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.svm import LinearSVC, SVC
from sklearn.preprocessing import OneHotEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import accuracy_score, adjusted_rand_score, silhouette_score
from sklearn.feature_selection import f_classif, SelectKBest, SelectFromModel, SequentialFeatureSelector, VarianceThreshold

from filter_method import manual_chi2
from wrapper_method import manual_rfe
from build_data import get_processed_data
from inner_method import rf_embedded_selection
from dimenshion_reduce import apply_pca, apply_tsne
from lib_realizations import filter_lib, inner_lib, wrapper_lib
from plot_graps import plot_data_with_class, plot_data_with_cluster

### Загрузка и предобработка данных

In [None]:
def load_data(file_path):
    return pd.read_csv(file_path)

Предобработка данных, 5 классов в данном датасете, векторизация текста через TF-IDF

In [None]:
def preprocess_data(df):
    custom_mapping = {'Neutral': 0, 'Positive': 1, 'Negative': 2,
                      'Extremely Positive': 3, 'Extremely Negative': 4}
    df['Sentiment'] = df['Sentiment'].map(custom_mapping)

    y = df['Sentiment']
    
    X_text = df['OriginalTweet']
    X_location = df['Location'].fillna('Unknown')
    X_date = pd.to_datetime(df['TweetAt'], errors='coerce', dayfirst=True)

    df['DayOfWeek'] = X_date.dt.dayofweek
    df['IsWeekend'] = (X_date.dt.dayofweek >= 5).astype(int)

    vectorizer = TfidfVectorizer()
    X_text_vectorized = vectorizer.fit_transform(X_text).toarray()

    encoder = OneHotEncoder(handle_unknown='ignore')
    X_location_encoded = encoder.fit_transform(X_location.values.reshape(-1, 1)).toarray()

    X_time = df[['DayOfWeek', 'IsWeekend']].values

    X = np.hstack((X_text_vectorized, X_location_encoded, X_time))
    print(f"X shape: {X.shape}")

    text_features = vectorizer.get_feature_names_out()
    location_features = encoder.get_feature_names_out(['Location'])
    time_features = ['DayOfWeek', 'IsWeekend']

    all_feature_names = np.array(list(text_features) + list(location_features) + time_features, dtype=object)

    return X, y, all_feature_names

Разделение данных на тренировочную и валидационную выборку

In [None]:
def split_data(X, y, test_size=0.2, random_state=42):
    return train_test_split(X, y, test_size=test_size, random_state=random_state, stratify=y)

In [None]:
def get_processed_data(file_path='Corona_NLP_test.csv'):
    df = load_data(file_path)
    X, y, all_feature_names = preprocess_data(df)

    X_train, X_val, y_train, y_val = split_data(X, y)

    return X_train, y_train.values, X_val, y_val.values, all_feature_names

#### 1. Загрузка данных, создание классификаторов, расчет качества без FS

In [None]:
def evaluate_classifiers(classifiers, X_train, y_train, X_val, y_val):
    accuracies = []
    for name, clf in classifiers:
        clf.fit(X_train, y_train)
        y_pred = clf.predict(X_val)
        acc = accuracy_score(y_val, y_pred)
        accuracies.append(acc)
        print(f"{name}: {acc:.4f}")
    return accuracies

In [None]:
print("1. Load data...")
X_train, y_train, X_val, y_val, all_feature_names = get_processed_data("Corona_NLP_test.csv")

classifiers = [
        ("LogisticRegression", LogisticRegression(max_iter=1000, random_state=42)),
        ("RandomForest", RandomForestClassifier(n_estimators=50, random_state=42, n_jobs=-1)),
        ("SVC", SVC(random_state=42))
    ]

print("\n=== Accuracy without FS ===")
accuracies_before_fs = evaluate_classifiers(classifiers, X_train, y_train, X_val, y_val)

fs_accuracies = {"No_FS": accuracies_before_fs}

## Filter method (Custom)

In [None]:
def manual_chi2(X, y):
    n_samples, n_features = X.shape
    classes = np.unique(y)
    n_classes = len(classes)

    class_counts = np.array([(y == c).sum() for c in classes])
    counts = np.zeros((n_classes, n_features), dtype=int)

    for i in range(n_samples):
        row_features = np.where(X[i] != 0)[0]
        class_idx = np.searchsorted(classes, y[i])
        for f in row_features:
            counts[class_idx, f] += 1

    total_presence = counts.sum(axis=0)
    total_absence = n_samples - total_presence
    chi2_scores = np.zeros(n_features, dtype=float)

    for f in range(n_features):
        observed_presence = counts[:, f]
        observed_absence = class_counts - observed_presence
        if total_presence[f] == 0 or total_presence[f] == n_samples:
            chi2_scores[f] = 0
            continue

        expected_presence = class_counts * (total_presence[f] / n_samples)
        expected_absence = class_counts * (total_absence[f] / n_samples)
        chi2_val = 0.0

        for c_idx in range(n_classes):
            if expected_presence[c_idx] > 0:
                chi2_val += ((observed_presence[c_idx] - expected_presence[c_idx]) ** 2) / expected_presence[c_idx]
            if expected_absence[c_idx] > 0:
                chi2_val += ((observed_absence[c_idx] - expected_absence[c_idx]) ** 2) / expected_absence[c_idx]

        chi2_scores[f] = chi2_val

    return chi2_scores

Работа с фильтрующим методом

In [None]:
print("\n2. Filter method (chi^2)...")
chi2_scores = manual_chi2(X_train, y_train)
chi2_sorted_indices = np.argsort(chi2_scores)[::-1]

top_k_chi2 = 30
selected_indices_chi2 = chi2_sorted_indices[:top_k_chi2]

X_train_chi2 = X_train[:, selected_indices_chi2]
X_val_chi2 = X_val[:, selected_indices_chi2]

top_30_chi2_features = all_feature_names[selected_indices_chi2]

print("\nTop 30 wia chi^2:")
for f in top_30_chi2_features:
    print(f)

print("\n=== Accuracy after chi^2 FS ===")
accuracies_chi2 = evaluate_classifiers(classifiers, X_train_chi2, y_train, X_val_chi2, y_val)
fs_accuracies["Chi2"] = accuracies_chi2

## Embedded method

In [None]:
def rf_embedded_selection(X, y, num_features=30, n_estimators=100, random_state=42, max_depth=None):
    rf = RandomForestClassifier(n_estimators=n_estimators, random_state=random_state, max_depth=max_depth)
    rf.fit(X, y)
    importances = rf.feature_importances_
    sorted_indices = np.argsort(importances)[::-1]
    selected_indices = sorted_indices[:num_features]
    mask = np.zeros(X.shape[1], dtype=bool)
    mask[selected_indices] = True
    return mask, selected_indices

Работа с встроенным методом

In [None]:
print("\n3. Embedded method (Random Forest FS)...")
mask_embedded, ranking_embedded = rf_embedded_selection(
    X_train, y_train, num_features=30, n_estimators=100, random_state=42
)

X_train_embedded = X_train[:, mask_embedded]
X_val_embedded = X_val[:, mask_embedded]

top_30_embedded_indices = ranking_embedded[:30]
top_30_embedded_features = all_feature_names[top_30_embedded_indices]
print("\nTop 30 wia embedded method (RF):")
for f in top_30_embedded_features:
    print(f)
    
print("\n=== Accuracy afrer embedded method (RF) ===")
accuracies_embedded = evaluate_classifiers(classifiers, X_train_embedded, y_train, X_val_embedded, y_val)
fs_accuracies["Embedded_RF"] = accuracies_embedded

## Wrapper method

In [None]:
def manual_rfe(X, y, base_estimator, n_features_to_select=30):
    selected_features = list(range(X.shape[1]))
    ranking = []

    while len(selected_features) > n_features_to_select:
        clf = clone(base_estimator)
        clf.fit(X[:, selected_features], y)

        if hasattr(clf, 'feature_importances_'):
            importances = clf.feature_importances_
        elif hasattr(clf, 'coef_'):
            importances = np.abs(clf.coef_).flatten()
        else:
            raise ValueError("Base classifier did not support feature_importances_ or coef_.")

        least_important_index = np.argmin(importances)
        least_important_feature = selected_features[least_important_index]

        ranking.append(least_important_feature)

        selected_features.pop(least_important_index)

    mask = np.zeros(X.shape[1], dtype=bool)
    mask[selected_features] = True

    clf_final = clone(base_estimator)
    clf_final.fit(X[:, selected_features], y)

    if hasattr(clf_final, 'feature_importances_'):
        final_importances = clf_final.feature_importances_
    elif hasattr(clf_final, 'coef_'):
        final_importances = np.abs(clf_final.coef_).flatten()
    else:
        raise ValueError("Base classifier did not support feature_importances_ or coef_.")

    sorted_indices = np.argsort(final_importances)[::-1]
    sorted_features = [selected_features[i] for i in sorted_indices]
    ranking = sorted_features + ranking

    ranking = np.array(ranking)

    return mask, ranking

Работа с методом оберткой

In [None]:
print("\n4. Wrapper method (Custom RFE)...")
base_clf = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
n_features_to_select = 30

mask_wr, ranking_wr = manual_rfe(
    X_train, y_train,
    base_estimator=base_clf,
    n_features_to_select=n_features_to_select
)

X_train_wr = X_train[:, mask_wr]
X_val_wr = X_val[:, mask_wr]

top_30_wr_features = all_feature_names[ranking_wr[:30]]
print("\nTop 30 wia wrapper method (RFE):")
for f in top_30_wr_features:
    print(f)
    
print("\n=== Accuracy after custom_RFE ===")
accuracies_wr = evaluate_classifiers(classifiers, X_train_wr, y_train, X_val_wr, y_val)
fs_accuracies["Wrapper_RFE"] = accuracies_wr

## Библиотечные методы

Убираем констаные признаки

In [None]:
def remove_constant_features(X, feature_names=None, selector=None):
    if selector is None:
        selector = VarianceThreshold(threshold=0)
        X_new = selector.fit_transform(X)
        if feature_names is not None:
            support_mask = selector.get_support()
            updated_feature_names = feature_names[support_mask]
            return X_new, updated_feature_names, selector
        else:
            return X_new, None, selector
    else:
        X_new = selector.transform(X)
        return X_new, feature_names, selector

1. Filter Lib

In [None]:
def filter_lib(X_train, y_train, X_val, y_val, all_feature_names, classifiers, evaluate_classifiers, k=30):
    X_train, all_feature_names, vt_selector = remove_constant_features(X_train, all_feature_names)
    X_val, _, _ = remove_constant_features(X_val, all_feature_names, vt_selector)

    selector = SelectKBest(f_classif, k=k)
    X_train_new = selector.fit_transform(X_train, y_train)
    selected_indices = selector.get_support(indices=True)
    top_features = all_feature_names[selected_indices]

    print("\nTop 30 features wia filter method (f_classif):")
    for feature in top_features[:30]:
        print(feature)

    X_val_new = X_val[:, selected_indices]

    accuracies_after_filter = evaluate_classifiers(classifiers, X_train_new, y_train, X_val_new, y_val)
    return accuracies_after_filter, selected_indices, top_features, X_train_new, X_val_new

2. Embedded lib

In [None]:
def inner_lib(X_train, y_train, X_val, y_val, all_feature_names, classifiers, evaluate_classifiers, threshold='mean'):
    X_train, all_feature_names, vt_selector = remove_constant_features(X_train, all_feature_names)
    X_val, _, _ = remove_constant_features(X_val, all_feature_names, vt_selector)

    lsvc = LinearSVC(C=0.5, penalty="l1", random_state=42, max_iter=1000)
    model = SelectFromModel(lsvc, threshold=threshold)
    X_train_new = model.fit_transform(X_train, y_train)
    selected_indices = model.get_support(indices=True)
    top_features = all_feature_names[selected_indices]

    print("\nTop 30 features wia embedded method (LinearSVC + L1):")
    for f in top_features[:30]:
        print(f)

    X_val_new = X_val[:, selected_indices]
    accuracies_after_inner = evaluate_classifiers(classifiers, X_train_new, y_train, X_val_new, y_val)
    return accuracies_after_inner, selected_indices, top_features, X_train_new, X_val_new

3. Wrapper lib

In [None]:
def wrapper_lib(X_train, y_train, X_val, y_val, all_feature_names, classifiers, evaluate_classifiers, n_features=30):

    X_train, all_feature_names, vt_selector = remove_constant_features(X_train, all_feature_names)
    X_val, _, _ = remove_constant_features(X_val, all_feature_names, vt_selector)
    
    base_estimator = LogisticRegression(max_iter=1000, random_state=42)
    sfs = SequentialFeatureSelector(base_estimator, n_features_to_select=n_features, direction='forward', n_jobs=-1)
    X_train_new = sfs.fit_transform(X_train, y_train)
    selected_indices = sfs.get_support(indices=True)
    top_features = all_feature_names[selected_indices]

    print("\nTop 30 features wia wrapper method (SFS):")
    for f in top_features[:30]:
        print(f)

    X_val_new = X_val[:, selected_indices]
    accuracies_after_wrapper = evaluate_classifiers(classifiers, X_train_new, y_train, X_val_new, y_val)
    return accuracies_after_wrapper, selected_indices, top_features, X_train_new, X_val_new

Реализация использования библиотечных методов

In [None]:
print("\n=== Lib FS methods ===")


accuracies_filter, filter_indices, filter_features, X_train_filter, X_val_filter = filter_lib(
    X_train, y_train, X_val, y_val, all_feature_names, classifiers, evaluate_classifiers, k=30
)
fs_accuracies["Filter_f_classif_lib"] = accuracies_filter


accuracies_inner, inner_indices, inner_features, X_train_inner, X_val_inner = inner_lib(
    X_train, y_train, X_val, y_val, all_feature_names, classifiers, evaluate_classifiers, threshold='mean'
)
fs_accuracies["Inner_L1SVC_lib"] = accuracies_inner


accuracies_wrapper, wrapper_indices, wrapper_features, X_train_wrapper, X_val_wrapper = wrapper_lib(
    X_train, y_train, X_val, y_val, all_feature_names, classifiers, evaluate_classifiers, n_features=30
)
fs_accuracies["Wrapper_SFS_lib"] = accuracies_wrapper

### Реализация выбора лучшего FS

In [None]:
print("\nChoose best FS method...")
average_accuracies = {method: np.mean(acc) for method, acc in fs_accuracies.items()}
for method, avg_acc in average_accuracies.items():
    print(f"{method}: average accuracy = {avg_acc:.4f}")
best_fs_method = max(average_accuracies, key=average_accuracies.get)
print(f"\nBest FS method: {best_fs_method} with accuracy {average_accuracies[best_fs_method]:.4f}")

if best_fs_method == "No_FS":
    X_train_best_fs = X_train
elif best_fs_method == "Chi2_manual":
    X_train_best_fs = X_train_chi2
elif best_fs_method == "Embedded_RF_manual":
    X_train_best_fs = X_train_embedded
elif best_fs_method == "Wrapper_RFE_manual":
    X_train_best_fs = X_train_wr
elif best_fs_method == "Filter_f_classif_lib":
    X_train_best_fs = X_train_filter
elif best_fs_method == "Inner_L1SVC_lib":
    X_train_best_fs = X_train_inner
elif best_fs_method == "Wrapper_SFS_lib":
    X_train_best_fs = X_train_wrapper
else:
    raise ValueError("Unexpecded FS method")

## Кластеризация

In [None]:
print("\nClusterization...")
kmeans_before = KMeans(n_clusters=5, random_state=42)
kmeans_before.fit(X_train)
labels_before = kmeans_before.labels_

kmeans_after = KMeans(n_clusters=5, random_state=42)
kmeans_after.fit(X_train_best_fs)
labels_after = kmeans_after.labels_

Оценка качества кластеризации

In [None]:
ari_before = adjusted_rand_score(y_train, labels_before)
sil_before = silhouette_score(X_train, labels_before)
ari_after = adjusted_rand_score(y_train, labels_after)
sil_after = silhouette_score(X_train_best_fs, labels_after)

print("\nClusterization quality:")
print(f"Before FS: ARI={ari_before:.4f}, Silhouette={sil_before:.4f}")
print(f"After FS ({best_fs_method}): ARI={ari_after:.4f}, Silhouette={sil_after:.4f}")

## Уменьшение размерности 
- PCA

In [None]:
def apply_pca(X, n_components=2, random_state=42):
    pca = PCA(n_components=n_components, random_state=random_state)
    X_pca = pca.fit_transform(X)
    return X_pca

- TSNE

In [None]:
def apply_tsne(X, n_components=2, random_state=42, perplexity=30):
    tsne = TSNE(n_components=n_components, random_state=random_state, perplexity=perplexity, init='pca')
    X_tsne = tsne.fit_transform(X)
    return X_tsne

Применение уменьшения размерности на данных, визуализация

In [None]:
X_train_pca = apply_pca(X_train)
X_train_best_fs_pca = apply_pca(X_train_best_fs)

plot_data_with_class(X_train_pca, y_train, title=f"Original classes (PCA, BEFORE FS)")
plot_data_with_class(X_train_best_fs_pca, y_train, title=f"Original classes (PCA, AFTER FS: {best_fs_method})")

plot_data_with_cluster(X_train_pca, labels_before, title="Clusters (PCA, BEFORE FS)")
plot_data_with_cluster(X_train_best_fs_pca, labels_after, title=f"Clusters (PCA, AFTER FS: {best_fs_method})")

In [None]:
X_train_tsne = apply_tsne(X_train)
X_train_best_fs_tsne = apply_tsne(X_train_best_fs)

plot_data_with_class(X_train_tsne, y_train, title="Original classes (t-SNE, BEFORE FS)")
plot_data_with_class(X_train_best_fs_tsne, y_train, title=f"Original classes (t-SNE, AFTER FS: {best_fs_method})")

plot_data_with_cluster(X_train_tsne, labels_before, title="Clusters (t-SNE, BEFORE FS)")
plot_data_with_cluster(X_train_best_fs_tsne, labels_after, title=f"Clusters (t-SNE, AFTER FS: {best_fs_method})")

Визуализация:

In [None]:
def plot_data_with_class(X_2d, y, title="Original classes", xlabel="Dim 1", ylabel="Dim 2"):
    unique_classes = np.unique(y)
    plt.figure(figsize=(6, 5))
    for cls in unique_classes:
        mask = (y == cls)
        cluster_size = np.sum(mask)
        plt.scatter(X_2d[mask, 0], X_2d[mask, 1], label=f"Class {cls} (N={cluster_size})", s=10)

    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
def plot_data_with_cluster(X_2d, labels, title="Clusters", xlabel="Dim 1", ylabel="Dim 2"):
    unique_clusters = np.unique(labels)
    plt.figure(figsize=(6, 5))

    for cluster in unique_clusters:
        mask = (labels == cluster)
        cluster_size = np.sum(mask)
        plt.scatter(X_2d[mask, 0], X_2d[mask, 1], s=10, label=f"Cluster {cluster} (N={cluster_size})")

    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.legend()
    plt.tight_layout()
    plt.show()