In [1]:
!pip install dtaidistance



In [2]:
!pip install fastdtw



In [3]:
import numpy as np
import pandas as pd
from dtaidistance.dtw import distance as dtw_distance
import random
from tslearn.metrics import dtw
from tqdm import tqdm
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, f1_score, recall_score, confusion_matrix
from imblearn.metrics import specificity_score
import seaborn as sns
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import LSTM, Dense, Conv1D, Flatten
from google.colab import drive
from sklearn.preprocessing import LabelEncoder

In [4]:
from google.colab import drive

# Google Drive 마운트
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
df_X_train = pd.read_csv('/content/drive/My Drive/PhalangesOutlinesCorrect/X_train_Worms.csv')
df_y_train = pd.read_csv('/content/drive/My Drive/PhalangesOutlinesCorrect/y_train_Worms.csv')
df_X_test = pd.read_csv('/content/drive/MyDrive/PhalangesOutlinesCorrect/X_test_Worms.csv')
df_y_test = pd.read_csv('/content/drive/MyDrive/PhalangesOutlinesCorrect/y_test_Worms.csv')

In [6]:
X_train = df_X_train.values
y_train = df_y_train.values.reshape(-1)  # 1차원 배열로 변환
X_test = df_X_test.values
y_test = df_y_test.values.reshape(-1)

In [7]:
# 1~5. 클래스별 증강이 필요한 개수 계산 및 목표 수 설정
from collections import defaultdict

class_counts = pd.Series(y_train).value_counts()
total_samples = len(X_train)
target_total = total_samples * 2
avg_target = target_total / len(class_counts)

non_aug_classes = {}
aug_classes = {}
for cls, count in class_counts.items():
    if count >= avg_target:
        non_aug_classes[cls] = count
    else:
        aug_classes[cls] = count

remaining_target = target_total - sum(non_aug_classes.values())
class_targets = {cls: int(remaining_target / len(aug_classes)) for cls in aug_classes}


In [8]:
print("==== 증강 제외 클래스 ====")
for cls, count in non_aug_classes.items():
    print(f"Class {cls}: {count}개")

print("\n==== 증강 대상 클래스 및 목표 샘플 수 ====")
for cls in aug_classes:
    print(f"Class {cls}: 기존 {aug_classes[cls]}개 → 목표 {class_targets[cls]}개 → 생성 {class_targets[cls] - aug_classes[cls]}개")


==== 증강 제외 클래스 ====
Class 1: 76개

==== 증강 대상 클래스 및 목표 샘플 수 ====
Class 4: 기존 32개 → 목표 71개 → 생성 39개
Class 2: 기존 31개 → 목표 71개 → 생성 40개
Class 3: 기존 25개 → 목표 71개 → 생성 46개
Class 5: 기존 17개 → 목표 71개 → 생성 54개


In [9]:
class_targets

{4: 71, 2: 71, 3: 71, 5: 71}

In [10]:
# 3. 모든 클래스 샘플들끼리의 dtw 거리 계산

def compute_dtw_matrix_fast(X_train):
    n_samples = len(X_train)
    dtw_matrix = np.zeros((n_samples, n_samples))

    for i in tqdm(range(n_samples), desc="Computing DTW matrix"):
        for j in range(i + 1, n_samples):
            dist = dtw(X_train[i], X_train[j])
            dtw_matrix[i, j] = dist
            dtw_matrix[j, i] = dist
    return dtw_matrix

In [11]:
# 2. dtw 기반 이웃 탐색(k=3으로 가정): dtw 거리가 가까운 이웃 3개의 샘플 찾기
def find_dtw_neighbors(dtw_matrix, k=3):
    n_samples = dtw_matrix.shape[0]
    neighbors_dict = {}

    for i in range(n_samples):
        # 자기 자신 제외하고 정렬 (argsort는 오름차순)
        neighbors = np.argsort(dtw_matrix[i])
        nearest_neighbors = [idx for idx in neighbors if idx != i][:k]
        neighbors_dict[i] = nearest_neighbors

    return neighbors_dict

In [12]:
# 이웃 중 2개 이상이 다른 클래스라면 → 안전샘플 (True)
# 그렇지 않으면 → 안전하지 않음 (False)
def is_safe_sample(sample_idx, neighbors_dict, y_train):
    own_class = y_train[sample_idx]
    neighbor_classes = [y_train[neighbor] for neighbor in neighbors_dict[sample_idx]]
    num_different = sum([1 for cls in neighbor_classes if cls != own_class])
    return num_different >= 2

# 클래스별 안전샘플 목록 구축
def get_safe_samples_by_class(neighbors_dict, y_train):
    safe_samples = defaultdict(list)
    for idx in range(len(y_train)):
        if is_safe_sample(idx, neighbors_dict, y_train):
            cls = y_train[idx]
            safe_samples[cls].append(idx)
    return dict(safe_samples)

In [13]:
#5. 구한 안전샘플과 자신들의 이웃 중 본인 클래스 제외하고 다른 클래스 샘플 중 랜덤으로 하나의 시계열 선택
#6. 그 사이에 시계열 선형 보간(각 클래스 안전샘플과 다른 클래스의 시계열 사이에서 증강 데이터 생성)

def select_random_different_class_neighbor(sample_idx, neighbors_dict, y_train):
    own_class = y_train[sample_idx]
    different_class_neighbors = [
        neighbor for neighbor in neighbors_dict[sample_idx]
        if y_train[neighbor] != own_class
    ]
    if different_class_neighbors:
        return random.choice(different_class_neighbors)
    else:
        return None

def interpolate_timeseries(x1, x2, alpha=0.5):
    return (1 - alpha) * x1 + alpha * x2

def generate_augmented_sample(X_train, idx1, idx2, alpha=0.5):
    x1 = X_train[idx1]
    x2 = X_train[idx2]
    return interpolate_timeseries(x1, x2, alpha)

In [14]:
def dtw_tsmote_augmentation(X_train, y_train, class_targets, k=3, alpha=0.5, random_seed=42):
    # 1) DTW 거리 행렬 계산
    print("DTW 거리 행렬 계산 중...")
    dtw_matrix = compute_dtw_matrix_fast(X_train)

    # 2) 이웃 탐색
    neighbors_dict = find_dtw_neighbors(dtw_matrix, k=k)

    # 3) 안전샘플 찾기
    safe_samples_by_class = get_safe_samples_by_class(neighbors_dict, y_train)

    # 증강 결과 저장용 리스트
    X_aug_list = []
    y_aug_list = []

    # 4) 클래스별 증강 수행
    print("클래스별 증강 수행 중...")
    unique_classes = np.unique(y_train)
    for cls in unique_classes:
        current_count = sum(y_train == cls)
        target_count = class_targets.get(cls, current_count)
        n_to_generate = target_count - current_count
        if n_to_generate <= 0:
            print(f"Class {cls}: 증강 불필요 (기존 {current_count}개, 목표 {target_count}개)")
            continue

        print(f"Class {cls}: 증강 {n_to_generate}개 생성 (안전샘플 수: {len(safe_samples_by_class.get(cls, []))})")

        safe_samples = safe_samples_by_class.get(cls, [])
        if len(safe_samples) == 0:
            print(f"  주의: 안전샘플 없음으로 증강 불가")
            continue

        for _ in range(n_to_generate):
            # 1) 안전샘플 랜덤 선택
            safe_idx = random.choice(safe_samples)
            # 2) 다른 클래스 이웃 랜덤 선택
            neighbor_idx = select_random_different_class_neighbor(safe_idx, neighbors_dict, y_train)
            if neighbor_idx is None:
                # 이웃 중 다른 클래스가 없으면 그냥 안전샘플 복제 (또는 건너뜀)
                # 여기서는 복제 처리
                augmented_ts = X_train[safe_idx].copy()
            else:
                # 3) 보간 증강
                augmented_ts = generate_augmented_sample(X_train, safe_idx, neighbor_idx, alpha=alpha)

            X_aug_list.append(augmented_ts)
            y_aug_list.append(cls)

    # 5) 원본 데이터와 증강 데이터 합치기
    X_augmented = np.vstack([X_train] + X_aug_list)
    y_augmented = np.hstack([y_train] + y_aug_list)

    print(f"증강 완료: 원본 {len(X_train)} → 증강 후 {len(X_augmented)}")
    return X_augmented, y_augmented

In [15]:
# X_train, y_train은 numpy ndarray 또는 비슷한 형태여야 함
X_train = np.array(X_train)  # (n_samples, seq_len)
y_train = np.array(y_train)  # (n_samples, )

# 증강 실행
X_synthetic, y_synthetic = dtw_tsmote_augmentation(X_train, y_train, class_targets, k=3, alpha=0.5)


DTW 거리 행렬 계산 중...


Computing DTW matrix: 100%|██████████| 181/181 [02:19<00:00,  1.30it/s]

클래스별 증강 수행 중...
Class 1: 증강 불필요 (기존 76개, 목표 76개)
Class 2: 증강 40개 생성 (안전샘플 수: 23)
Class 3: 증강 46개 생성 (안전샘플 수: 11)
Class 4: 증강 39개 생성 (안전샘플 수: 18)
Class 5: 증강 54개 생성 (안전샘플 수: 13)
증강 완료: 원본 181 → 증강 후 360





In [16]:
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder

# Label 인코딩
le = LabelEncoder()
y_synthetic_encoded = le.fit_transform(y_synthetic)
y_test_encoded = le.transform(y_test)
num_classes = len(np.unique(y_synthetic_encoded))

# One-hot encoding
y_synthetic_cat = to_categorical(y_synthetic_encoded, num_classes=num_classes)
y_test_cat = to_categorical(y_test_encoded, num_classes=num_classes)


In [17]:
def predict_with_logistic_regression(X_train, y_train, X_test):
    model = LogisticRegression()
    model.fit(X_train, y_train)
    return model.predict(X_test)

def predict_with_cart(X_train, y_train, X_test):
    model = DecisionTreeClassifier()
    model.fit(X_train, y_train)
    return model.predict(X_test)

def predict_with_knn(X_train, y_train, X_test, k=3):
    model = KNeighborsClassifier(n_neighbors=k)
    model.fit(X_train, y_train)
    return model.predict(X_test)

def predict_with_xgboost(X_train, y_train, X_test, label_encoder=None):
    from xgboost import XGBClassifier

    model = XGBClassifier(use_label_encoder=False, eval_metric='mlogloss')
    model.fit(X_train, y_train)
    pred_y = model.predict(X_test)

    if label_encoder is not None:
        pred_y = label_encoder.inverse_transform(pred_y)

    return pred_y

def predict_with_lstm(X_train, y_train, X_test, num_classes):
    model = Sequential()
    model.add(LSTM(50, activation='relu', input_shape=(X_train.shape[1], 1)))
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    X_train_reshaped = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
    X_test_reshaped = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

    model.fit(X_train_reshaped, y_train, epochs=50, batch_size=32, verbose=0)
    pred_prob = model.predict(X_test_reshaped)
    return np.argmax(pred_prob, axis=1)


def predict_with_cnn(X_train, y_train, X_test, num_classes):
    model = Sequential()
    model.add(Conv1D(64, kernel_size=3, activation='relu', input_shape=(X_train.shape[1], 1)))
    model.add(Flatten())
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    X_train_reshaped = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
    X_test_reshaped = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

    model.fit(X_train_reshaped, y_train, epochs=50, batch_size=32, verbose=0)
    pred_prob = model.predict(X_test_reshaped)
    return np.argmax(pred_prob, axis=1)


In [18]:
predictions = {
    "Logistic Regression": predict_with_logistic_regression(X_synthetic, y_synthetic_encoded, X_test),
    "CART": predict_with_cart(X_synthetic, y_synthetic_encoded, X_test),
    "KNN": predict_with_knn(X_synthetic, y_synthetic_encoded, X_test),
    "XGBoost": predict_with_xgboost(X_synthetic, y_synthetic_encoded, X_test),
    "LSTM": predict_with_lstm(X_synthetic, y_synthetic_cat, X_test, num_classes),
    "CNN": predict_with_cnn(X_synthetic, y_synthetic_cat, X_test, num_classes)
}


# 성능 지표 저장을 위한 리스트
results = []
for model_name, pred_y in predictions.items():
    accuracy = accuracy_score(y_test_encoded, pred_y)
    recall = recall_score(y_test_encoded, pred_y, average='macro')
    f1 = f1_score(y_test_encoded, pred_y, average='macro')
    specificity = specificity_score(y_test_encoded, pred_y, average='macro')
    conf_matrix = confusion_matrix(y_test_encoded, pred_y)
    results.append([accuracy, f1, recall, specificity, conf_matrix])

STOP: TOTAL NO. OF ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
Parameters: { "use_label_encoder" } are not used.

  super().__init__(**kwargs)


[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 250ms/step


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step


In [19]:
from collections import Counter

# 증강 전 클래스별 샘플 수
original_class_distribution = Counter(y_train)
print("증강 전 클래스별 샘플 수:")
for cls, count in original_class_distribution.items():
    print(f"클래스 {cls}: {count}개")

# 증강 후 클래스별 샘플 수
augmented_class_distribution = Counter(y_synthetic)
print("\n증강 후 클래스별 샘플 수:")
for cls, count in augmented_class_distribution.items():
    print(f"클래스 {cls}: {count}개")

증강 전 클래스별 샘플 수:
클래스 1: 76개
클래스 2: 31개
클래스 3: 25개
클래스 4: 32개
클래스 5: 17개

증강 후 클래스별 샘플 수:
클래스 1: 76개
클래스 2: 71개
클래스 3: 71개
클래스 4: 71개
클래스 5: 71개


In [20]:
# 성능 지표를 DataFrame으로 변환
results_df = pd.DataFrame(results, columns=["Accuracy", "F1", "Recall", "Specificity", "Confusion Matrix"], index=predictions.keys())
model_results = results_df.T
# 결과를 출력
print("\n모델 성능 비교 결과:")
print(model_results)

# 결과를 CSV 파일로 저장
model_results.to_csv("/content/drive/My Drive/PhalangesOutlinesCorrect/results/(다중)bsmote_soda_models_worms_result.csv")


모델 성능 비교 결과:
                                                Logistic Regression  \
Accuracy                                                   0.324675   
F1                                                         0.314598   
Recall                                                     0.362914   
Specificity                                                0.824118   
Confusion Matrix  [[10, 4, 5, 10, 4], [3, 2, 3, 3, 2], [3, 2, 3,...   

                                                               CART  \
Accuracy                                                    0.38961   
F1                                                          0.35617   
Recall                                                     0.397249   
Specificity                                                0.850539   
Confusion Matrix  [[13, 0, 4, 6, 10], [3, 2, 1, 5, 2], [0, 0, 4,...   

                                                                KNN  \
Accuracy                                                   0.