In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from sklearn.preprocessing import StandardScaler
import os

In [None]:
# 재현성을 위한 시드 설정
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
# UCI HAR Dataset 로드 함수
def load_uci_har_dataset(data_path='UCI HAR Dataset'):
    """UCI HAR Dataset 로드"""

    def load_X(file_path):
        with open(file_path, 'r') as f:
            data = [list(map(float, line.strip().split())) for line in f]
        return np.array(data)

    def load_y(file_path):
        with open(file_path, 'r') as f:
            data = [int(line.strip()) for line in f]
        return np.array(data) - 1  # 0-based indexing

    # Train data
    X_train = load_X(os.path.join(data_path, 'train', 'X_train.txt'))
    y_train = load_y(os.path.join(data_path, 'train', 'y_train.txt'))

    # Test data
    X_test = load_X(os.path.join(data_path, 'test', 'X_test.txt'))
    y_test = load_y(os.path.join(data_path, 'test', 'y_test.txt'))

    return X_train, y_train, X_test, y_test

In [None]:
# Squeeze-and-Excitation Block
class SEBlock(layers.Layer):
    def __init__(self, ratio=8, **kwargs):
        super(SEBlock, self).__init__(**kwargs)
        self.ratio = ratio

    def build(self, input_shape):
        channels = input_shape[-1]
        self.squeeze = layers.GlobalAveragePooling1D()
        self.excitation = keras.Sequential([
            layers.Dense(channels // self.ratio, activation='relu'),
            layers.Dense(channels, activation='sigmoid')
        ])
        super(SEBlock, self).build(input_shape)

    def call(self, inputs):
        se = self.squeeze(inputs)
        se = self.excitation(se)
        se = layers.Reshape((1, inputs.shape[-1]))(se)
        return layers.Multiply()([inputs, se])

    def get_config(self):
        config = super().get_config()
        config.update({'ratio': self.ratio})
        return config

In [None]:
# Attention Layer (Self-Attention)
class SelfAttention(layers.Layer):
    def __init__(self, units=128, **kwargs):
        super(SelfAttention, self).__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.W1 = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer='glorot_uniform',
            trainable=True,
            name='attention_W1'
        )
        self.W2 = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer='glorot_uniform',
            trainable=True,
            name='attention_W2'
        )
        self.V = self.add_weight(
            shape=(self.units, 1),
            initializer='glorot_uniform',
            trainable=True,
            name='attention_V'
        )
        super(SelfAttention, self).build(input_shape)

    def call(self, x):
        # x shape: (batch, timesteps, features)
        score = tf.nn.tanh(tf.tensordot(x, self.W1, axes=1))
        attention_weights = tf.nn.softmax(tf.tensordot(score, self.V, axes=1), axis=1)
        context_vector = attention_weights * x
        context_vector = tf.reduce_sum(context_vector, axis=1)
        return context_vector

    def get_config(self):
        config = super().get_config()
        config.update({'units': self.units})
        return config

In [None]:
# Inception-style Multi-Scale Block
def inception_block(x, filters=32):
    """Inception-style parallel convolutions"""

    # 1x1 conv
    conv1 = layers.Conv1D(filters, 1, padding='same', activation='relu')(x)
    conv1 = layers.BatchNormalization()(conv1)

    # 1x1 -> 3x3 conv
    conv3 = layers.Conv1D(filters, 1, padding='same', activation='relu')(x)
    conv3 = layers.Conv1D(filters, 3, padding='same', activation='relu')(conv3)
    conv3 = layers.BatchNormalization()(conv3)

    # 1x1 -> 5x5 conv
    conv5 = layers.Conv1D(filters, 1, padding='same', activation='relu')(x)
    conv5 = layers.Conv1D(filters, 5, padding='same', activation='relu')(conv5)
    conv5 = layers.BatchNormalization()(conv5)

    # 3x3 MaxPool -> 1x1 conv
    pool = layers.MaxPooling1D(3, strides=1, padding='same')(x)
    pool_conv = layers.Conv1D(filters, 1, padding='same', activation='relu')(pool)
    pool_conv = layers.BatchNormalization()(pool_conv)

    # Concatenate all
    concat = layers.Concatenate()([conv1, conv3, conv5, pool_conv])

    return concat

In [None]:
# Enhanced Residual Block with SE
def enhanced_residual_block(x, filters=64, kernel_size=3):
    """Residual block with Squeeze-and-Excitation"""
    shortcut = x

    x = layers.Conv1D(filters, kernel_size, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Dropout(0.2)(x)

    x = layers.Conv1D(filters, kernel_size, padding='same')(x)
    x = layers.BatchNormalization()(x)

    # SE Block
    x = SEBlock(ratio=8)(x)

    # Match dimensions
    if shortcut.shape[-1] != filters:
        shortcut = layers.Conv1D(filters, 1, padding='same')(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    x = layers.Add()([x, shortcut])
    x = layers.Activation('relu')(x)

    return x


In [None]:
# 최적화된 메인 모델
def build_optimized_har_model(input_shape=(51, 11), num_classes=6):
    """
    최적화된 HAR 모델: Inception + SE-ResNet + BiLSTM + Multi-Head Attention
    목표: 96%+ 정확도
    """

    inputs = layers.Input(shape=input_shape)

    # Initial feature extraction with Inception
    x = inception_block(inputs, filters=24)  # 24*4 = 96 filters
    x = layers.Dropout(0.25)(x)

    # SE-ResNet blocks
    x = enhanced_residual_block(x, filters=64, kernel_size=3)
    x = layers.Dropout(0.25)(x)

    x = enhanced_residual_block(x, filters=96, kernel_size=3)
    x = layers.Dropout(0.25)(x)

    x = enhanced_residual_block(x, filters=128, kernel_size=3)
    x = layers.Dropout(0.3)(x)

    # Bidirectional LSTM layers
    x = layers.Bidirectional(
        layers.LSTM(96, return_sequences=True, dropout=0.3, recurrent_dropout=0.2)
    )(x)

    x = layers.Bidirectional(
        layers.LSTM(64, return_sequences=True, dropout=0.3, recurrent_dropout=0.2)
    )(x)

    # Self-Attention
    attention_output = SelfAttention(units=128)(x)

    # Global pooling (both average and max)
    avg_pool = layers.GlobalAveragePooling1D()(x)
    max_pool = layers.GlobalMaxPooling1D()(x)

    # Combine attention and pooling
    x = layers.Concatenate()([attention_output, avg_pool, max_pool])

    # Dense layers with strong regularization
    x = layers.Dense(256, activation='relu', kernel_regularizer=keras.regularizers.l2(5e-4))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.5)(x)

    x = layers.Dense(128, activation='relu', kernel_regularizer=keras.regularizers.l2(5e-4))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.4)(x)

    # Output layer
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=inputs, outputs=outputs)

    return model

In [None]:
# Cosine Annealing with Warm Restarts
class CosineAnnealingScheduler(keras.callbacks.Callback):
    def __init__(self, T_0=10, T_mult=2, eta_max=0.001, eta_min=1e-6):
        super(CosineAnnealingScheduler, self).__init__()
        self.T_0 = T_0
        self.T_mult = T_mult
        self.eta_max = eta_max
        self.eta_min = eta_min
        self.T_cur = 0
        self.T_i = T_0

    def on_epoch_begin(self, epoch, logs=None):
        if self.T_cur == self.T_i:
            self.T_cur = 0
            self.T_i *= self.T_mult

        lr = self.eta_min + 0.5 * (self.eta_max - self.eta_min) * \
             (1 + np.cos(np.pi * self.T_cur / self.T_i))

        self.model.optimizer.learning_rate.assign(lr)
        self.T_cur += 1

        if epoch % 10 == 0:
            print(f"\nEpoch {epoch}: Learning rate = {lr:.6f}")

In [None]:
# 모델 학습 함수
def train_model(X_train, y_train, X_test, y_test):
    """모델 학습 및 평가"""

    print("데이터 전처리 중...")

    # Reshape: 561 features -> 51 timesteps × 11 features
    n_timesteps = 51
    n_features = 11

    X_train_reshaped = X_train[:, :n_timesteps*n_features].reshape(-1, n_timesteps, n_features)
    X_test_reshaped = X_test[:, :n_timesteps*n_features].reshape(-1, n_timesteps, n_features)

    # 정규화
    scaler = StandardScaler()
    X_train_reshaped = scaler.fit_transform(X_train_reshaped.reshape(-1, n_features)).reshape(-1, n_timesteps, n_features)
    X_test_reshaped = scaler.transform(X_test_reshaped.reshape(-1, n_features)).reshape(-1, n_timesteps, n_features)

    # One-hot encoding
    y_train_cat = keras.utils.to_categorical(y_train, num_classes=6)
    y_test_cat = keras.utils.to_categorical(y_test, num_classes=6)

    # Class weights (클래스 불균형 대응)
    from sklearn.utils.class_weight import compute_class_weight
    class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
    class_weight_dict = dict(enumerate(class_weights))

    print(f"Train shape: {X_train_reshaped.shape}")
    print(f"Test shape: {X_test_reshaped.shape}")
    print(f"Class weights: {class_weight_dict}")

    # 모델 구축
    print("\n모델 구축 중...")
    model = build_optimized_har_model(input_shape=(n_timesteps, n_features), num_classes=6)

    # 모델 요약
    model.summary()

    # 파라미터 수 확인
    total_params = model.count_params()
    print(f"\n총 파라미터 수: {total_params:,} (제한: 1,000,000)")

    if total_params > 1000000:
        print("⚠️ 경고: 파라미터 수가 100만을 초과했습니다!")

    # Optimizer
    optimizer = keras.optimizers.AdamW(
        learning_rate=0.001,
        weight_decay=1e-4
    )

    # Compile with label smoothing
    model.compile(
        optimizer=optimizer,
        loss=keras.losses.CategoricalCrossentropy(label_smoothing=0.1),
        metrics=['accuracy']
    )

    # Callbacks
    cosine_scheduler = CosineAnnealingScheduler(
        T_0=10,
        T_mult=2,
        eta_max=0.001,
        eta_min=1e-6
    )

    reduce_lr = ReduceLROnPlateau(
        monitor='val_accuracy',
        factor=0.5,
        patience=10,
        min_lr=1e-7,
        verbose=1
    )

    early_stop = EarlyStopping(
        monitor='val_accuracy',
        patience=30,
        restore_best_weights=True,
        verbose=1
    )

    checkpoint = ModelCheckpoint(
        'best_har_model_optimized.keras',
        monitor='val_accuracy',
        save_best_only=True,
        verbose=1
    )

    # 학습
    print("\n모델 학습 시작...")
    history = model.fit(
        X_train_reshaped, y_train_cat,
        batch_size=128,
        epochs=200,
        validation_data=(X_test_reshaped, y_test_cat),
        class_weight=class_weight_dict,
        callbacks=[cosine_scheduler, reduce_lr, early_stop, checkpoint],
        verbose=1
    )

    # 최종 평가
    print("\n최종 평가:")
    test_loss, test_accuracy = model.evaluate(X_test_reshaped, y_test_cat, verbose=0)
    print(f"Test Accuracy: {test_accuracy*100:.2f}%")
    print(f"Test Loss: {test_loss:.4f}")

    # 클래스별 성능 분석
    y_pred = model.predict(X_test_reshaped)
    y_pred_classes = np.argmax(y_pred, axis=1)

    from sklearn.metrics import classification_report, confusion_matrix

    print("\n=== Classification Report ===")
    class_names = ['WALKING', 'WALKING_UPSTAIRS', 'WALKING_DOWNSTAIRS',
                   'SITTING', 'STANDING', 'LAYING']
    print(classification_report(y_test, y_pred_classes, target_names=class_names))

    print("\n=== Confusion Matrix ===")
    cm = confusion_matrix(y_test, y_pred_classes)
    print(cm)

    # 클래스별 정확도
    print("\n=== 클래스별 정확도 ===")
    for i, name in enumerate(class_names):
        class_acc = cm[i, i] / cm[i, :].sum() * 100
        print(f"{name}: {class_acc:.2f}%")

    return model, history


In [None]:
# 메인 실행
if __name__ == "__main__":
    # 데이터 로드
    print("UCI HAR Dataset 로드 중...")
    data_path = '/content/drive/MyDrive/Colab Notebooks/AI-study/data/UCI HAR Dataset'
    X_train, y_train, X_test, y_test = load_uci_har_dataset(data_path)

    print(f"원본 데이터 크기:")
    print(f"X_train: {X_train.shape}, y_train: {y_train.shape}")
    print(f"X_test: {X_test.shape}, y_test: {y_test.shape}")

    # 모델 학습
    model, history = train_model(X_train, y_train, X_test, y_test)

    print("\n학습 완료! 최고 모델이 'best_har_model_optimized.keras'로 저장되었습니다.")

UCI HAR Dataset 로드 중...
원본 데이터 크기:
X_train: (7352, 561), y_train: (7352,)
X_test: (2947, 561), y_test: (2947,)
데이터 전처리 중...
Train shape: (7352, 51, 11)
Test shape: (2947, 51, 11)
Class weights: {0: np.float64(0.9994562262098967), 1: np.float64(1.1419695557626592), 2: np.float64(1.2427315753887762), 3: np.float64(0.9528252980819077), 4: np.float64(0.8918000970402717), 5: np.float64(0.8708836768538261)}

모델 구축 중...



총 파라미터 수: 676,698 (제한: 1,000,000)

모델 학습 시작...

Epoch 0: Learning rate = 0.001000
Epoch 1/200




[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1s/step - accuracy: 0.2514 - loss: 2.5073
Epoch 1: val_accuracy improved from -inf to 0.26977, saving model to best_har_model_optimized.keras
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m104s[0m 1s/step - accuracy: 0.2524 - loss: 2.5020 - val_accuracy: 0.2698 - val_loss: 1.7514 - learning_rate: 0.0010
Epoch 2/200
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1s/step - accuracy: 0.4233 - loss: 1.7445
Epoch 2: val_accuracy improved from 0.26977 to 0.49610, saving model to best_har_model_optimized.keras
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 1s/step - accuracy: 0.4245 - loss: 1.7421 - val_accuracy: 0.4961 - val_loss: 1.3784 - learning_rate: 9.7555e-04
Epoch 3/200
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1s/step - accuracy: 0.6255 - loss: 1.3677
Epoch 3: val_accuracy did not improve from 0.49610
[1m58/58[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m 