# Under sampling method

In [None]:
!pip install imblearn

## Load modules

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn import tree
from sklearn.metrics import confusion_matrix

In [None]:
from imblearn.under_sampling import RandomUnderSampler
from imblearn.under_sampling import TomekLinks
from imblearn.under_sampling import CondensedNearestNeighbour
from imblearn.under_sampling import OneSidedSelection

## Define plot_representation function

In [None]:
def plot_representation(X, y, X_syn=None, y_syn=None, method='real'):
    # 정상 데이터, 이상 데이터 추출
    y_normal, y_anomaly = y[y==0].dropna(), y[y==1].dropna()
    X_normal, X_anomaly = X.loc[list(y_normal.index), :], X.loc[list(y_anomaly.index), :]
    
    # Sampling 결과 비교
    # method == 'real': 실제 데이터만 시각화
    # method != 'real': Sampling 결과와 실제 데이터 시각화 비교
    if method.lower() != 'real':
        fig = plt.figure(figsize=(16, 8))
        ax1 = fig.add_subplot(1, 2, 1)
    else:
        fig = plt.figure(figsize=(8, 8))
        ax1 = fig.add_subplot(1, 1, 1)

    # create_dataset 으로 생성된 데이터 시각화
    # 실제 데이터 사용시 2차원으로 축소 후 본 함수 사용
    # column 명을 정확히 기입해주어야함
    ax1.scatter(X_normal['X1'], X_normal['X2'], alpha=0.3, c='blue', label='Normal')
    ax1.scatter(X_anomaly['X1'], X_anomaly['X2'], alpha=0.3, c='red', label='Anomaly')
    
    ax1.spines['top'].set_visible(False)
    ax1.spines['right'].set_visible(False)
    ax1.get_xaxis().tick_bottom()
    ax1.get_yaxis().tick_left()

    plt.xlim((-5, 5))
    plt.ylim((-5, 5))
    plt.legend()
    plt.tight_layout()
    plt.title('Real', fontsize=20)

    if X_syn is not None:
        ax2 = fig.add_subplot(1, 2, 2)
        y_syn_normal, y_syn_anomaly = y_syn[y_syn==0].dropna(), y_syn[y_syn==1].dropna()
        X_syn_normal, X_syn_anomaly = X_syn.loc[list(y_syn_normal.index), :], X_syn.loc[list(y_syn_anomaly.index), :]

        ax2.scatter(X_syn_normal['X1'], X_syn_normal['X2'], alpha=0.3, c='blue', label='Normal')
        ax2.scatter(X_syn_anomaly['X1'], X_syn_anomaly['X2'], alpha=0.3, c='red', label='Anomaly')

        ax2.spines['top'].set_visible(False)
        ax2.spines['right'].set_visible(False)
        ax2.get_xaxis().tick_bottom()
        ax2.get_yaxis().tick_left()

        plt.xlim((-5, 5))
        plt.ylim((-5, 5))
        plt.legend()
        plt.tight_layout()
        plt.title(method, fontsize=20)
        
    plt.show()

## Create_Dataset function: generate example dataset
### https://scikit-learn.org/stable/modules/generated/sklearn.datasets.make_classification.html

In [None]:
def create_dataset(n_samples=2000,
                weights=(0.8, 0.2),
                n_classes=2,
                class_sep=1,
                n_cluster=1,
                random_state=0):
    df = make_classification(n_samples=n_samples, n_features=2,
                            n_informative=2, n_redundant=0, n_repeated=0,
                            n_classes=n_classes,
                            n_clusters_per_class=n_cluster,
                            weights=list(weights),
                            class_sep=class_sep, random_state=random_state)
    
    X = pd.DataFrame(df[0], columns=['X1', 'X2'])
    y = pd.DataFrame(df[1], columns=['y'])
    return X, y

In [None]:
X_syn, y_syn = create_dataset(n_samples=5000, weights=(0.9, 0.1), n_cluster=1, random_state=711)

In [None]:
X_syn.shape, y_syn.shape

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_syn,y_syn, random_state=2022)

In [None]:
plot_representation(X_train, y_train, X_test, y_test, method='Test')

In [None]:
dt_tree = tree.DecisionTreeClassifier(random_state=2022)
dt_tree.fit(X_train, y_train)

y_pred_test = dt_tree.predict(X_test)
cm_baseline = confusion_matrix(y_test, y_pred_test)

In [None]:
cm_baseline

In [None]:
print("Recall for anomaly observations in test dataset: %.4f"%(cm_baseline[1, 1]/cm_baseline[1, :].sum()))
print("Precision for anomaly observations in test dataset: %.4f"%(cm_baseline[1, 1]/cm_baseline[:, 1].sum()) )

## 'Imblearn' package
### https://imbalanced-learn.org/stable/

### Random Under Sampling(RUS)

In [None]:
rus = RandomUnderSampler()
X_resampled, y_resampled = rus.fit_resample(X_train, y_train)

plot_representation(X_train, y_train, X_resampled, y_resampled, 'RUS')

In [None]:
dt_tree = tree.DecisionTreeClassifier(random_state=2022)
rus_tree = dt_tree.fit(X_resampled, y_resampled)

y_pred_test = rus_tree.predict(X_test)
cm_rus = confusion_matrix(y_test, y_pred_test)

In [None]:
cm_rus

In [None]:
print("Recall for anomaly observations in test dataset: %.4f"%(cm_rus[1, 1]/cm_rus[1, :].sum()))
print("Precision for anomaly observations in test dataset: %.4f"%(cm_rus[1, 1]/cm_rus[:, 1].sum()) )

### TomeLinks

In [None]:
tl = TomekLinks()
X_resampled, y_resampled = tl.fit_resample(X_train, y_train)

plot_representation(X_train, y_train, X_resampled, y_resampled, 'TomekLinks')

In [None]:
tl_tree = tree.DecisionTreeClassifier(random_state=2022)
tl_tree = tl_tree.fit(X_resampled, y_resampled)

y_pred_test = tl_tree.predict(X_test)
cm_tl = confusion_matrix(y_test, y_pred_test)

In [None]:
cm_tl

In [None]:
print("Recall for anomaly observations in test dataset: %.4f"%(cm_tl[1, 1]/cm_tl[1, :].sum()))
print("Precision for anomaly observations in test dataset: %.4f"%(cm_tl[1, 1]/cm_tl[:, 1].sum()) )

### Condensed Nearest Neighbor

In [None]:
cnn = CondensedNearestNeighbour(n_jobs=4)
X_resampled, y_resampled = cnn.fit_resample(X_train, y_train)

plot_representation(X_train, y_train, X_resampled, y_resampled, 'Condensed Nearest Neighbour')

In [None]:
cnn_tree = tree.DecisionTreeClassifier(random_state=2022)
cnn_tree = cnn_tree.fit(X_resampled, y_resampled)

y_pred_test = cnn_tree.predict(X_test)
cm_cnn = confusion_matrix(y_test, y_pred_test)

In [None]:
cm_cnn

In [None]:
print("Recall for anomaly observations in test dataset: %.4f"%(cm_cnn[1, 1]/cm_cnn[1, :].sum()))
print("Precision for anomaly observations in test dataset: %.4f"%(cm_cnn[1, 1]/cm_cnn[:, 1].sum()) )

### One-sided selection

In [None]:
oss = OneSidedSelection()
X_resampled, y_resampled = oss.fit_resample(X_train, y_train)

plot_representation(X_train, y_train, X_resampled, y_resampled, 'One-sided selection')

In [None]:
dt_tree = tree.DecisionTreeClassifier(random_state=2022)
oss_tree = dt_tree.fit(X_resampled, y_resampled)

y_pred_test = oss_tree.predict(X_test)
cm_oss = confusion_matrix(y_test, y_pred_test)

In [None]:
cm_oss

In [None]:
print("Recall for anomaly observations in test dataset: %.4f"%(cm_oss[1, 1]/cm_oss[1, :].sum()))
print("Precision for anomaly observations in test dataset: %.4f"%(cm_oss[1, 1]/cm_oss[:, 1].sum()) )