In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### 1. 공통: 데이터 전처리 & 기본 세팅 (Keras 기준)

In [13]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader

# 1) 데이터 불러오기
df = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/WISDM.csv", header=None)
df.columns = ["idx", "user", "activity", "timestamp", "x", "y", "z"]

# 2) 필요없는 열 정리 (idx는 버려도 됨)
df = df[["user", "activity", "timestamp", "x", "y", "z"]]

# 3) 결측치 제거 (있다면)
df = df.dropna()

# 4) activity를 숫자 레이블로 변환
activities, y_all = np.unique(df["activity"], return_inverse=True)
activity_to_idx = {a: i for i, a in enumerate(activities)}
print("활동 레이블 매핑:", activity_to_idx)

df["activity_id"] = y_all

# 5) user, timestamp 기준으로 정렬 (안전하게)
df = df.sort_values(["user", "timestamp"]).reset_index(drop=True)

# 6) 시퀀스 생성 함수 (슬라이딩 윈도우)
def create_sequences(df, window_size=128, step=64):
    X_list = []
    y_list = []

    for user_id, user_df in df.groupby("user"):
        user_df = user_df.reset_index(drop=True)
        signals = user_df[["x", "y", "z"]].values
        labels = user_df["activity_id"].values

        for start in range(0, len(user_df) - window_size + 1, step):
            end = start + window_size
            window_x = signals[start:end]            # (window_size, 3)
            window_y = labels[start:end]
            # 윈도우 안에서 가장 많이 나온 활동을 레이블로 사용
            label = np.bincount(window_y).argmax()

            X_list.append(window_x)
            y_list.append(label)

    X = np.stack(X_list)    # (N, window_size, 3)
    y = np.array(y_list)    # (N,)
    return X, y

X, y = create_sequences(df, window_size=128, step=64)
print("X shape:", X.shape, "y shape:", y.shape)

# 7) train/val/test 분할
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.3, random_state=42, stratify=y
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

print("train:", X_train.shape, "val:", X_val.shape, "test:", X_test.shape)

# 8) PyTorch Dataset 정의
class WisdmDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X).float()      # (N, seq_len, 3)
        self.y = torch.from_numpy(y).long()

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        # 모델에 넣기 편하게 (batch, channels, seq_len)로 바꾸기 위해 여기선 (seq_len, 3) 그대로 넘기고,
        # 모델 안에서 permute 해도 되고, 여기서 미리 permute 해도 됨.
        return self.X[idx], self.y[idx]

train_dataset = WisdmDataset(X_train, y_train)
val_dataset   = WisdmDataset(X_val, y_val)
test_dataset  = WisdmDataset(X_test, y_test)

batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset,  batch_size=batch_size, shuffle=False)

num_classes = len(activities)
seq_len = X.shape[1]
input_dim = 3


활동 레이블 매핑: {'Downstairs': 0, 'Jogging': 1, 'Sitting': 2, 'Standing': 3, 'Upstairs': 4, 'Walking': 5}
X shape: (16331, 128, 3) y shape: (16331,)
train: (11431, 128, 3) val: (2450, 128, 3) test: (2450, 128, 3)


### 2. 공통 학습 루프 (하나 만들어두고 모델만 바꿔 끼우기)

In [14]:
import torch.nn as nn
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def train_model(model, train_loader, val_loader, num_epochs=10):
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)

    for epoch in range(num_epochs):
        # ---- train ----
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for X_batch, y_batch in train_loader:
            X_batch = X_batch.to(device)   # (B, seq_len, 3)
            y_batch = y_batch.to(device)

            optimizer.zero_grad()
            outputs = model(X_batch)       # (B, num_classes)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * X_batch.size(0)
            _, predicted = outputs.max(1)
            total += y_batch.size(0)
            correct += predicted.eq(y_batch).sum().item()

        train_loss = running_loss / total
        train_acc  = correct / total

        # ---- validation ----
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                X_batch = X_batch.to(device)
                y_batch = y_batch.to(device)

                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)

                val_loss += loss.item() * X_batch.size(0)
                _, predicted = outputs.max(1)
                val_total += y_batch.size(0)
                val_correct += predicted.eq(y_batch).sum().item()

        val_loss /= val_total
        val_acc  = val_correct / val_total

        print(f"[Epoch {epoch+1}] "
              f"train_loss={train_loss:.4f}, train_acc={train_acc:.4f} | "
              f"val_loss={val_loss:.4f}, val_acc={val_acc:.4f}")

    return model

###3. 모델 1 – 1D CNN

In [15]:
class CNN1D(nn.Module):
    def __init__(self, input_dim=3, num_classes=6, seq_len=128):
        super().__init__()
        # 입력: (B, seq_len, 3) → (B, 3, seq_len)로 바꿔서 Conv1d에 넣을 것
        self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=32, kernel_size=5, padding=2)
        self.bn1   = nn.BatchNorm1d(32)
        self.conv2 = nn.Conv1d(32, 64, kernel_size=5, padding=2)
        self.bn2   = nn.BatchNorm1d(64)
        self.pool  = nn.MaxPool1d(kernel_size=2)
        self.relu  = nn.ReLU()

        # seq_len이 절반으로 줄어듦 (pool 1번)
        self.fc = nn.Linear(64 * (seq_len // 2), num_classes)

    def forward(self, x):
        # x: (B, seq_len, 3) → (B, 3, seq_len)
        x = x.permute(0, 2, 1)
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.pool(self.relu(self.bn2(self.conv2(x))))
        x = x.flatten(start_dim=1)
        out = self.fc(x)
        return out

cnn_model = CNN1D(input_dim=input_dim, num_classes=num_classes, seq_len=seq_len)
# cnn_model = train_model(cnn_model, train_loader, val_loader, num_epochs=10)

In [29]:
cnn_model = train_model(cnn_model, train_loader, val_loader, num_epochs=10)

[Epoch 1] train_loss=0.5407, train_acc=0.8030 | val_loss=0.2956, val_acc=0.9012
[Epoch 2] train_loss=0.2570, train_acc=0.9105 | val_loss=0.3032, val_acc=0.8935
[Epoch 3] train_loss=0.1871, train_acc=0.9355 | val_loss=0.2212, val_acc=0.9151
[Epoch 4] train_loss=0.1309, train_acc=0.9553 | val_loss=0.1417, val_acc=0.9518
[Epoch 5] train_loss=0.1065, train_acc=0.9656 | val_loss=0.2237, val_acc=0.9110
[Epoch 6] train_loss=0.0812, train_acc=0.9731 | val_loss=0.1370, val_acc=0.9535
[Epoch 7] train_loss=0.0637, train_acc=0.9815 | val_loss=0.1304, val_acc=0.9531
[Epoch 8] train_loss=0.0506, train_acc=0.9856 | val_loss=0.1447, val_acc=0.9514
[Epoch 9] train_loss=0.0390, train_acc=0.9898 | val_loss=0.1444, val_acc=0.9494
[Epoch 10] train_loss=0.0365, train_acc=0.9904 | val_loss=0.2155, val_acc=0.9298


###4. 모델 2 – LSTM

In [16]:
class LSTMModel(nn.Module):
    def __init__(self, input_dim=3, hidden_dim=64, num_layers=1, num_classes=6):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True
        )
        self.fc = nn.Linear(hidden_dim * 2, num_classes)

    def forward(self, x):
        # x: (B, seq_len, 3)
        out, (hn, cn) = self.lstm(x)   # out: (B, seq_len, 2*hidden)
        # 마지막 타임스텝 상태 사용
        last_hidden = out[:, -1, :]   # (B, 2*hidden)
        logits = self.fc(last_hidden)
        return logits

lstm_model = LSTMModel(input_dim=input_dim, hidden_dim=64, num_layers=1, num_classes=num_classes)
# lstm_model = train_model(lstm_model, train_loader, val_loader)

In [30]:
lstm_model = train_model(lstm_model, train_loader, val_loader)

[Epoch 1] train_loss=1.1051, train_acc=0.6099 | val_loss=0.9452, val_acc=0.6996
[Epoch 2] train_loss=0.7784, train_acc=0.7289 | val_loss=0.6639, val_acc=0.7588
[Epoch 3] train_loss=0.6292, train_acc=0.7781 | val_loss=0.5616, val_acc=0.7951
[Epoch 4] train_loss=0.5656, train_acc=0.7932 | val_loss=0.5118, val_acc=0.8102
[Epoch 5] train_loss=0.5301, train_acc=0.8074 | val_loss=0.6509, val_acc=0.7518
[Epoch 6] train_loss=0.4686, train_acc=0.8262 | val_loss=0.4127, val_acc=0.8420
[Epoch 7] train_loss=0.4096, train_acc=0.8445 | val_loss=0.4062, val_acc=0.8388
[Epoch 8] train_loss=0.3732, train_acc=0.8549 | val_loss=0.3520, val_acc=0.8661
[Epoch 9] train_loss=0.3428, train_acc=0.8658 | val_loss=0.3314, val_acc=0.8665
[Epoch 10] train_loss=0.3106, train_acc=0.8774 | val_loss=0.3065, val_acc=0.8812


###5. 모델 3 – GRU

In [17]:
class GRUModel(nn.Module):
    def __init__(self, input_dim=3, hidden_dim=64, num_layers=1, num_classes=6):
        super().__init__()
        self.gru = nn.GRU(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True
        )
        self.fc = nn.Linear(hidden_dim * 2, num_classes)

    def forward(self, x):
        out, hn = self.gru(x)          # out: (B, seq_len, 2*hidden)
        last_hidden = out[:, -1, :]    # (B, 2*hidden)
        logits = self.fc(last_hidden)
        return logits

gru_model = GRUModel(input_dim=input_dim, hidden_dim=64, num_layers=1, num_classes=num_classes)
# gru_model = train_model(gru_model, train_loader, val_loader)

In [31]:
gru_model = train_model(gru_model, train_loader, val_loader)

[Epoch 1] train_loss=0.9648, train_acc=0.6694 | val_loss=0.6469, val_acc=0.7747
[Epoch 2] train_loss=0.6098, train_acc=0.7859 | val_loss=0.5542, val_acc=0.8069
[Epoch 3] train_loss=0.5291, train_acc=0.8035 | val_loss=0.4671, val_acc=0.8216
[Epoch 4] train_loss=0.4527, train_acc=0.8207 | val_loss=0.4025, val_acc=0.8371
[Epoch 5] train_loss=0.3837, train_acc=0.8464 | val_loss=0.3487, val_acc=0.8551
[Epoch 6] train_loss=0.3544, train_acc=0.8590 | val_loss=0.3795, val_acc=0.8469
[Epoch 7] train_loss=0.3053, train_acc=0.8753 | val_loss=0.3061, val_acc=0.8792
[Epoch 8] train_loss=0.2830, train_acc=0.8850 | val_loss=0.2785, val_acc=0.8808
[Epoch 9] train_loss=0.2609, train_acc=0.8983 | val_loss=0.2667, val_acc=0.8947
[Epoch 10] train_loss=0.2394, train_acc=0.9065 | val_loss=0.2673, val_acc=0.9004


###6. 모델 4 – CNN + LSTM (하이브리드)

In [18]:
class CNN_LSTM(nn.Module):
    def __init__(self, input_dim=3, num_classes=6, seq_len=128, conv_channels=32, hidden_dim=64):
        super().__init__()
        # CNN part
        self.conv1 = nn.Conv1d(input_dim, conv_channels, kernel_size=5, padding=2)
        self.bn1   = nn.BatchNorm1d(conv_channels)
        self.pool  = nn.MaxPool1d(2)
        self.relu  = nn.ReLU()

        # LSTM part (CNN 출력 채널 수를 입력 차원으로 사용)
        self.lstm = nn.LSTM(
            input_size=conv_channels,
            hidden_size=hidden_dim,
            num_layers=1,
            batch_first=True,
            bidirectional=True
        )
        self.fc = nn.Linear(hidden_dim * 2, num_classes)

    def forward(self, x):
        # x: (B, seq_len, 3) → (B, 3, seq_len)
        x = x.permute(0, 2, 1)
        x = self.relu(self.bn1(self.conv1(x)))  # (B, C, T)
        x = self.pool(x)                        # (B, C, T/2)

        # (B, C, T') → (B, T', C) for LSTM
        x = x.permute(0, 2, 1)
        out, (hn, cn) = self.lstm(x)
        last_hidden = out[:, -1, :]
        logits = self.fc(last_hidden)
        return logits

cnn_lstm_model = CNN_LSTM(input_dim=input_dim, num_classes=num_classes, seq_len=seq_len)
# cnn_lstm_model = train_model(cnn_lstm_model, train_loader, val_loader)

In [32]:
cnn_lstm_model = train_model(cnn_lstm_model, train_loader, val_loader)

[Epoch 1] train_loss=0.9491, train_acc=0.6731 | val_loss=0.6370, val_acc=0.7939
[Epoch 2] train_loss=0.6025, train_acc=0.7927 | val_loss=0.5060, val_acc=0.8200
[Epoch 3] train_loss=0.5279, train_acc=0.8052 | val_loss=0.7284, val_acc=0.7257
[Epoch 4] train_loss=0.4815, train_acc=0.8232 | val_loss=0.3918, val_acc=0.8584
[Epoch 5] train_loss=0.3773, train_acc=0.8592 | val_loss=0.3083, val_acc=0.8808
[Epoch 6] train_loss=0.3502, train_acc=0.8717 | val_loss=0.3043, val_acc=0.8906
[Epoch 7] train_loss=0.3018, train_acc=0.8878 | val_loss=0.2535, val_acc=0.8971
[Epoch 8] train_loss=0.2754, train_acc=0.8951 | val_loss=0.2851, val_acc=0.8922
[Epoch 9] train_loss=0.2567, train_acc=0.9039 | val_loss=0.2225, val_acc=0.9167
[Epoch 10] train_loss=0.2318, train_acc=0.9137 | val_loss=0.2073, val_acc=0.9233


###7. 모델 5 – Transformer (Encoder)

In [19]:
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=500):
        super().__init__()
        pe = torch.zeros(max_len, d_model)  # (max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)   # (1, max_len, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: (B, seq_len, d_model)
        seq_len = x.size(1)
        x = x + self.pe[:, :seq_len, :]
        return x


class TransformerModel(nn.Module):
    def __init__(self, input_dim=3, num_classes=6, seq_len=128, d_model=64, nhead=8, num_layers=2, dim_feedforward=128):
        super().__init__()
        # 입력 (3차원)을 d_model 차원으로 projection
        self.input_fc = nn.Linear(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len=seq_len)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.fc_out = nn.Linear(d_model, num_classes)

    def forward(self, x):
        # x: (B, seq_len, 3)
        x = self.input_fc(x)          # (B, seq_len, d_model)
        x = self.pos_encoder(x)       # (B, seq_len, d_model)
        x = self.transformer_encoder(x)  # (B, seq_len, d_model)

        # [CLS] 토큰이 없으니 간단히 평균 pooling 사용
        x = x.mean(dim=1)             # (B, d_model)
        logits = self.fc_out(x)       # (B, num_classes)
        return logits

transformer_model = TransformerModel(input_dim=input_dim, num_classes=num_classes, seq_len=seq_len)
# transformer_model = train_model(transformer_model, train_loader, val_loader)

In [None]:
transformer_model = train_model(transformer_model, train_loader, val_loader)

###8. 테스트 세트 평가

In [22]:
def evaluate_model(model, test_loader):
    model.to(device)
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            outputs = model(X_batch)
            _, predicted = outputs.max(1)
            total += y_batch.size(0)
            correct += predicted.eq(y_batch).sum().item()

    print("Test Accuracy:", correct / total)
# evaluate_model(cnn_model, test_loader)

In [26]:
evaluate_model(cnn_model, test_loader)

Test Accuracy: 0.3183673469387755
