<a href="https://colab.research.google.com/github/TheCaveOfAdullam/study3/blob/main/1025Test8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# 필요한 라이브러리 설치 및 Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install torch_pruning

Collecting torch_pruning
  Downloading torch_pruning-1.4.3-py3-none-any.whl.metadata (29 kB)
Downloading torch_pruning-1.4.3-py3-none-any.whl (62 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/62.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.9/62.9 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torch_pruning
Successfully installed torch_pruning-1.4.3


In [3]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.utils as utils
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import LabelEncoder
import torch.nn.utils.prune as prune
import torch_pruning as tp

In [4]:
# 기본 경로 설정
base_dir = '/content/drive/MyDrive/ship_motor10'
categories = ['normal', 'fault_BB', 'fault_RI', 'fault_SM']
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 데이터 로드 및 전처리 함수 정의
class VibrationDataset(Dataset):
    def __init__(self, base_dir, split, categories, transform=None):
        self.X = []
        self.y = []
        self.transform = transform
        split_dir = os.path.join(base_dir, split)
        for category in categories:
            category_dir = os.path.join(split_dir, category)
            files = os.listdir(category_dir)
            for file in files:
                file_path = os.path.join(category_dir, file)
                data = pd.read_csv(file_path, header=None).values
                data = pd.to_numeric(data.flatten(), errors='coerce').reshape(-1, data.shape[1])
                data = np.nan_to_num(data).astype('float32')
                self.X.append(data)
                self.y.append(category)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        X = self.X[idx]
        y = self.y[idx]
        X = X.reshape(-1)
        return torch.tensor(X, dtype=torch.float32), y

# 레이블 인코딩
label_encoder = LabelEncoder()

# 데이터셋 준비
train_dataset = VibrationDataset(base_dir, 'train', categories)
val_dataset = VibrationDataset(base_dir, 'validation', categories)
test_dataset = VibrationDataset(base_dir, 'test', categories)

# 레이블 인코딩 및 원-핫 인코딩
y_train_encoded = label_encoder.fit_transform([y for _, y in train_dataset])
y_val_encoded = label_encoder.transform([y for _, y in val_dataset])
y_test_encoded = label_encoder.transform([y for _, y in test_dataset])

# 데이터셋에 레이블 추가
train_dataset.y = y_train_encoded
val_dataset.y = y_val_encoded
test_dataset.y = y_test_encoded

# 데이터 로더
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [5]:
# CNN 모델 정의
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=64, kernel_size=16, stride=16)
        self.pool1 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=32, kernel_size=3, stride=1)
        self.conv3 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=5, stride=1)
        self.conv4 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=5, stride=1)
        self.pool2 = nn.MaxPool1d(kernel_size=2, stride=2)

        conv1_output_size = (24002 - 16) // 16 + 1
        pool1_output_size = conv1_output_size // 2
        conv2_output_size = (pool1_output_size - 3) // 1 + 1
        conv3_output_size = (conv2_output_size - 5) // 1 + 1
        conv4_output_size = (conv3_output_size - 5) // 1 + 1
        pool2_output_size = conv4_output_size // 2

        self.fc1 = nn.Linear(128 * pool2_output_size, 5000)
        self.fc2 = nn.Linear(5000, 1000)
        self.fc3 = nn.Linear(1000, len(categories))

    def forward(self, x):
        x = self.conv1(x)
        x = torch.relu(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = torch.relu(x)
        x = self.conv3(x)
        x = torch.relu(x)
        x = self.conv4(x)
        x = torch.relu(x)
        x = self.pool2(x)
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [6]:
# 1차 테일러 전개 기반 비구조적 프루닝 (마스크 적용)
def prune_by_taylor_with_mask(model, threshold=0.01):
    for name, module in model.named_modules():
        if isinstance(module, nn.Conv1d) or isinstance(module, nn.Linear):
            if module.weight.grad is None:
                raise ValueError(f"Gradients not found for {name}. Run backward pass before pruning.")

            importance = torch.abs(module.weight * module.weight.grad)
            mask = importance > threshold
            prune.custom_from_mask(module, name='weight', mask=mask)
    print(f"Taylor expansion-based pruning with threshold: {threshold} applied with mask.")

# 0 비율 기반 필터 감지 및 구조적 프루닝 적용 함수
def detect_and_apply_structural_pruning_with_zero_ratio(model, prune_threshold=0.7, example_inputs=None):
    if example_inputs is None:
        example_inputs = torch.randn(1, 1, 24002).to(next(model.parameters()).device)

    DG = tp.DependencyGraph().build_dependency(model, example_inputs=example_inputs)

    total_pruned = 0
    for name, module in model.named_modules():
        if isinstance(module, nn.Conv1d) or isinstance(module, nn.Linear):
            # **출력 레이어는 프루닝 대상에서 제외**
            if name == 'fc3':
                print(f"Skipping pruning for {name} (output layer).")
                continue

            weight_data = module.weight.detach().cpu().numpy()
            # 0 비율 계산
            if isinstance(module, nn.Conv1d):
                filter_zero_percentage = np.mean(weight_data == 0, axis=(1, 2))
            else:
                filter_zero_percentage = np.mean(weight_data == 0, axis=1)
            prune_indices = np.where(filter_zero_percentage >= prune_threshold)[0]

            if len(prune_indices) > 0 and len(prune_indices) < module.weight.shape[0]:
                pruning_group = None
                if isinstance(module, nn.Conv1d):
                    pruning_group = DG.get_pruning_group(module, tp.prune_conv_out_channels, idxs=prune_indices)
                elif isinstance(module, nn.Linear):
                    pruning_group = DG.get_pruning_group(module, tp.prune_linear_out_channels, idxs=prune_indices)

                if pruning_group is not None:
                    pruning_group.prune()
                    total_pruned += len(prune_indices)
                    print(f"Pruned {len(prune_indices)} filters/neuron(s) from {name}.")
            else:
                print(f"Skipping pruning for {name} as it would remove all filters/neuron(s).")

    print(f"Structural pruning based on zero ratio applied. {total_pruned} filters/neuron(s) pruned in total.")
    return model

In [7]:
# 0인 가중치에 마스크 재적용하여 업데이트 방지
def apply_mask_to_zero_weights(model):
    for name, module in model.named_modules():
        if isinstance(module, nn.Conv1d) or isinstance(module, nn.Linear):
            # 현재 가중치가 0인 위치에 대한 마스크 생성
            weight_mask = (module.weight.data != 0).float()
            # 마스크를 레이어에 적용
            prune.CustomFromMask.apply(module, 'weight', mask=weight_mask)

In [8]:
# 모델 학습 함수
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=5, max_norm=1.0):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.unsqueeze(1).to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()

            # 0인 가중치의 기울기를 0으로 설정하여 업데이트 방지
            for name, param in model.named_parameters():
                if 'weight' in name:
                    if param.grad is not None:
                        param.grad.data[param.data == 0] = 0

            utils.clip_grad_norm_(model.parameters(), max_norm)
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        val_loss, val_accuracy = evaluate_model(model, val_loader, criterion)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/total:.4f}, Accuracy: {100 * correct/total:.2f}%, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%')

    return model

# 모델 평가 함수
def evaluate_model(model, loader, criterion):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.unsqueeze(1).to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return val_loss / total, 100 * correct / total

In [9]:
# 프루닝 및 재학습 과정
def prune_and_retrain_with_mask(model, train_loader, val_loader, test_loader, criterion, device, optimizer_params, threshold_taylor=0.01, prune_threshold=0.7):
    optimizer = optim.Adam(model.parameters(), **optimizer_params)

    print("Initial training before pruning")
    model = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=5)

    print("Step 1: Taylor expansion-based pruning with mask")
    # 프루닝을 위한 한 번의 forward 및 backward 패스 수행
    for inputs, labels in train_loader:
        inputs, labels = inputs.unsqueeze(1).to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        break  # 프루닝을 위해 한 배치만 필요

    prune_by_taylor_with_mask(model, threshold_taylor)

    # 비구조적 프루닝 후 모델 재학습
    optimizer = optim.Adam(model.parameters(), **optimizer_params)
    model = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=5)

    # 구조적 프루닝 전에 프루닝 마스크 제거
    print("Removing pruning masks before structural pruning")
    for name, module in model.named_modules():
        if isinstance(module, nn.Conv1d) or isinstance(module, nn.Linear):
            try:
                prune.remove(module, 'weight')
            except ValueError:
                pass  # 프루닝이 적용되지 않은 레이어는 건너뜁니다

    print("Step 2: Structural pruning based on zero ratio")
    model = detect_and_apply_structural_pruning_with_zero_ratio(model, prune_threshold=prune_threshold)

    # 0인 가중치에 마스크 재적용하여 업데이트 방지
    print("Reapplying mask to zero weights to prevent them from updating")
    apply_mask_to_zero_weights(model)

    # 구조적 프루닝 후 모델 재학습
    optimizer = optim.Adam(model.parameters(), **optimizer_params)
    model = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10)

    # 재학습 후 프루닝 마스크 제거
    print("Removing pruning masks after retraining")
    for name, module in model.named_modules():
        if isinstance(module, nn.Conv1d) or isinstance(module, nn.Linear):
            try:
                prune.remove(module, 'weight')
            except ValueError:
                pass  # 프루닝이 적용되지 않은 레이어는 건너뜁니다

    print("Final evaluation on the test set...")
    test_loss, test_accuracy = evaluate_model(model, test_loader, criterion)
    print(f'Final Test Loss: {test_loss:.4f}, Final Test Accuracy: {test_accuracy:.2f}%')

    return model

In [10]:
# 파라미터 설정 및 프루닝 실행
model = CNNModel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer_params = {'lr': 0.0001, 'weight_decay': 1e-5}

In [11]:
# 비제로 가중치 계산 함수
def count_nonzero_weights(model):
    nonzero_count = 0
    total_count = 0
    for name, param in model.named_parameters():
        if param.requires_grad:
            nonzero_count += torch.sum(param != 0).item()  # 0이 아닌 가중치 수 계산
            total_count += param.numel()  # 전체 가중치 수 계산
    return nonzero_count, total_count

# 비제로 가중치 수 계산
nonzero_weights, total_weights = count_nonzero_weights(model)
print(f"Number of non-zero weights: {nonzero_weights}")
print(f"Total number of weights: {total_weights}")
print(f"Percentage of non-zero weights: {100 * nonzero_weights / total_weights:.2f}%")

Number of non-zero weights: 241868647
Total number of weights: 241868660
Percentage of non-zero weights: 100.00%


In [12]:
model = prune_and_retrain_with_mask(model, train_loader, val_loader, test_loader, criterion, device, optimizer_params, threshold_taylor=0.00001, prune_threshold=0.9)

Initial training before pruning
Epoch [1/5], Loss: 0.0391, Accuracy: 49.94%, Val Loss: 0.0393, Val Accuracy: 50.00%
Epoch [2/5], Loss: 0.0390, Accuracy: 50.00%, Val Loss: 0.0392, Val Accuracy: 50.00%
Epoch [3/5], Loss: 0.0353, Accuracy: 52.95%, Val Loss: 0.0185, Val Accuracy: 66.67%
Epoch [4/5], Loss: 0.0184, Accuracy: 72.16%, Val Loss: 0.0143, Val Accuracy: 98.59%
Epoch [5/5], Loss: 0.0117, Accuracy: 87.29%, Val Loss: 0.0102, Val Accuracy: 98.89%
Step 1: Taylor expansion-based pruning with mask
Taylor expansion-based pruning with threshold: 1e-05 applied with mask.
Epoch [1/5], Loss: 0.0483, Accuracy: 49.95%, Val Loss: 0.0299, Val Accuracy: 66.67%
Epoch [2/5], Loss: 0.0262, Accuracy: 67.64%, Val Loss: 0.0210, Val Accuracy: 66.78%
Epoch [3/5], Loss: 0.0183, Accuracy: 78.47%, Val Loss: 0.0144, Val Accuracy: 83.33%
Epoch [4/5], Loss: 0.0140, Accuracy: 82.07%, Val Loss: 0.0093, Val Accuracy: 86.52%
Epoch [5/5], Loss: 0.0105, Accuracy: 88.44%, Val Loss: 0.0286, Val Accuracy: 62.11%
Removin

In [13]:
# 최종 테스트 평가
print("Final evaluation on the test set...")
test_loss, test_accuracy = evaluate_model(model, test_loader, criterion)
print(f'Final Test Loss: {test_loss:.4f}, Final Test Accuracy: {test_accuracy:.2f}%')

Final evaluation on the test set...
Final Test Loss: 0.0002, Final Test Accuracy: 99.74%


In [14]:
# 비제로 가중치 계산 함수
def count_nonzero_weights(model):
    nonzero_count = 0
    total_count = 0
    for name, param in model.named_parameters():
        if param.requires_grad:
            nonzero_count += torch.sum(param != 0).item()  # 0이 아닌 가중치 수 계산
            total_count += param.numel()  # 전체 가중치 수 계산
    return nonzero_count, total_count

# 비제로 가중치 수 계산
nonzero_weights, total_weights = count_nonzero_weights(model)
print(f"Number of non-zero weights: {nonzero_weights}")
print(f"Total number of weights: {total_weights}")
print(f"Percentage of non-zero weights: {100 * nonzero_weights / total_weights:.2f}%")

Number of non-zero weights: 82522
Total number of weights: 340308
Percentage of non-zero weights: 24.25%
