<a href="https://colab.research.google.com/github/TheCaveOfAdullam/study1/blob/main/pruningApple1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
pip install tensorflow_model_optimization

Collecting tensorflow_model_optimization
  Downloading tensorflow_model_optimization-0.8.0-py2.py3-none-any.whl (242 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m242.5/242.5 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorflow_model_optimization
Successfully installed tensorflow_model_optimization-0.8.0


In [3]:
import pandas as pd
import numpy as np
import os
import time
import psutil
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout
from keras.utils import to_categorical
import tensorflow as tf
import tensorflow_model_optimization as tfmot

In [4]:
# 기본 경로 설정
base_dir = '/content/drive/MyDrive/ship_data'
categories = ['normal', 'fault_BB', 'fault_RI', 'fault_SM']

# 데이터 로드 및 전처리 함수 정의
def load_data(base_dir, split):
    X = []
    y = []
    split_dir = os.path.join(base_dir, split)
    for category in categories:
        category_dir = os.path.join(split_dir, category)
        for file in os.listdir(category_dir):
            file_path = os.path.join(category_dir, file)
            data = pd.read_csv(file_path, header=None).values
            data = pd.to_numeric(data.flatten(), errors='coerce').reshape(-1, data.shape[1])
            data = np.nan_to_num(data).astype('float32')  # NaN 값을 0으로 대체하고, float32로 변환
            X.append(data)
            y.append(category)
    return np.array(X), np.array(y)

# 데이터 로드
X_train, y_train = load_data(base_dir, 'train')
X_val, y_val = load_data(base_dir, 'validation')
X_test, y_test = load_data(base_dir, 'test')

In [5]:
# 데이터 차원 변경 (CNN 입력 형식에 맞게)
X_train = np.expand_dims(X_train, axis=-1)
X_val = np.expand_dims(X_val, axis=-1)
X_test = np.expand_dims(X_test, axis=-1)

# 레이블 인코딩
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_val_encoded = label_encoder.transform(y_val)
y_test_encoded = label_encoder.transform(y_test)

# 원-핫 인코딩
y_train_categorical = to_categorical(y_train_encoded)
y_val_categorical = to_categorical(y_val_encoded)
y_test_categorical = to_categorical(y_test_encoded)

In [6]:
# 원본 CNN 모델 정의
original_model = Sequential([
    Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(X_train.shape[1], X_train.shape[2])),
    MaxPooling1D(pool_size=2),
    Conv1D(filters=128, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    Flatten(),
    Dense(100, activation='relu'),
    Dropout(0.5),
    Dense(len(categories), activation='softmax')
])

# 모델 컴파일
original_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [7]:
# 원본 모델 학습
original_history = original_model.fit(X_train, y_train_categorical, epochs=10, batch_size=32, validation_data=(X_val, y_val_categorical))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [8]:
# 원본 모델 평가
original_val_loss, original_val_accuracy = original_model.evaluate(X_val, y_val_categorical)
original_test_loss, original_test_accuracy = original_model.evaluate(X_test, y_test_categorical)



In [9]:
# 원본 모델 사이즈 확인 및 저장
original_model.save('original_model.h5')
original_model_size = os.path.getsize('original_model.h5') / (1024 * 1024)  # 모델 사이즈를 MB 단위로 변환
print(f"Original Model Size: {original_model_size:.2f} MB")

# 원본 모델 메모리 사용량 확인
process = psutil.Process(os.getpid())
original_memory_usage = process.memory_info().rss / (1024 * 1024)  # 메모리 사용량을 MB 단위로 변환
print(f"Original Memory Usage: {original_memory_usage:.2f} MB")

  saving_api.save_model(


Original Model Size: 439.50 MB
Original Memory Usage: 7503.37 MB


In [10]:
# 원본 모델 추론 시간 측정
start_time = time.time()
original_y_pred_categorical = original_model.predict(X_test)
end_time = time.time()
original_inference_time = end_time - start_time
print(f"Original Inference Time: {original_inference_time:.2f} seconds")

original_y_pred = np.argmax(original_y_pred_categorical, axis=1)

Original Inference Time: 2.97 seconds


In [11]:
# 혼동 행렬 및 성능 지표 출력 (검증 데이터)
original_y_pred_val = original_model.predict(X_val)
original_y_pred_val_classes = np.argmax(original_y_pred_val, axis=1)

original_conf_matrix_val = confusion_matrix(y_val_encoded, original_y_pred_val_classes)
original_class_report_val = classification_report(y_val_encoded, original_y_pred_val_classes, target_names=categories)

print("Original Confusion Matrix (Validation):")
print(original_conf_matrix_val)
print("\nOriginal Classification Report (Validation):")
print(original_class_report_val)

Original Confusion Matrix (Validation):
[[176   0   0   0]
 [  0 176   0   0]
 [  9   0 165   2]
 [  0   0   0 750]]

Original Classification Report (Validation):
              precision    recall  f1-score   support

      normal       0.95      1.00      0.98       176
    fault_BB       1.00      1.00      1.00       176
    fault_RI       1.00      0.94      0.97       176
    fault_SM       1.00      1.00      1.00       750

    accuracy                           0.99      1278
   macro avg       0.99      0.98      0.99      1278
weighted avg       0.99      0.99      0.99      1278



In [15]:
# 혼동 행렬 및 성능 지표 출력 (테스트 데이터)
original_conf_matrix_test = confusion_matrix(y_test_encoded, original_y_pred)
original_class_report_test = classification_report(y_test_encoded, original_y_pred, target_names=categories)

print("Original Confusion Matrix (Test):")
print(original_conf_matrix_test)
print("\nOriginal Classification Report (Test):")
print(original_class_report_test)

Original Confusion Matrix (Test):
[[176   0   0   0]
 [  0 176   0   0]
 [  7   0 165   4]
 [  0   0   0 750]]

Original Classification Report (Test):
              precision    recall  f1-score   support

      normal       0.96      1.00      0.98       176
    fault_BB       1.00      1.00      1.00       176
    fault_RI       1.00      0.94      0.97       176
    fault_SM       0.99      1.00      1.00       750

    accuracy                           0.99      1278
   macro avg       0.99      0.98      0.99      1278
weighted avg       0.99      0.99      0.99      1278



In [28]:
# Structured Pruning 적용 함수 정의
def apply_structured_pruning(model, pruning_percentage=50):
    for layer in model.layers:
        if isinstance(layer, Conv1D):
            weights, biases = layer.get_weights()
            num_filters = weights.shape[-1]
            num_prune = int(num_filters * pruning_percentage / 100)

            # Compute the L1 norm of the filters
            filter_norms = np.sum(np.abs(weights), axis=(0, 1))
            prune_indices = np.argsort(filter_norms)[:num_prune]

            # Set the weights of the pruned filters to zero
            for idx in prune_indices:
                weights[:, :, idx] = 0

            layer.set_weights([weights, biases])

# 원본 모델 복사 (모델을 재사용하기 위해)
pruned_model = tf.keras.models.clone_model(original_model)

# Build the pruned model (this is the key change)
pruned_model.build(input_shape=original_model.input_shape)

# Now you can set the weights
pruned_model.set_weights(original_model.get_weights())

apply_structured_pruning(pruned_model, pruning_percentage=50)

# 프루닝 후 모델 컴파일
# Pass 'adam' as a string identifier instead of an Adam object
pruned_model.compile(optimizer='adam',
                     loss='categorical_crossentropy',
                     metrics=['accuracy'])

In [39]:
# 프루닝된 모델 재학습
pruned_history = pruned_model.fit(X_train, y_train_categorical, epochs=10, batch_size=32, validation_data=(X_val, y_val_categorical))

RuntimeError: You must compile your model before training/testing. Use `model.compile(optimizer, loss)`.

In [35]:
# 프루닝된 모델 평가
pruned_val_loss, pruned_val_accuracy = pruned_model.evaluate(X_val, y_val_categorical)
pruned_test_loss, pruned_test_accuracy = pruned_model.evaluate(X_test, y_test_categorical)

RuntimeError: You must compile your model before training/testing. Use `model.compile(optimizer, loss)`.

In [None]:
# 스트립 프루닝 적용
pruned_model = tfmot.sparsity.keras.strip_pruning(pruned_model)

In [36]:
# 프루닝된 모델 사이즈 확인 및 저장
pruned_model.save('pruned_model.h5')
pruned_model_size = os.path.getsize('pruned_model.h5') / (1024 * 1024)  # 모델 사이즈를 MB 단위로 변환
print(f"Pruned Model Size: {pruned_model_size:.2f} MB")

  saving_api.save_model(


KeyError: 'module_wrapper_99_input_ib-0'

In [None]:
# 프루닝된 모델 메모리 사용량 확인
pruned_memory_usage = process.memory_info().rss / (1024 * 1024)  # 메모리 사용량을 MB 단위로 변환
print(f"Pruned Memory Usage: {pruned_memory_usage:.2f} MB")

In [None]:
# 프루닝된 모델 추론 시간 측정
start_time = time.time()
pruned_y_pred_categorical = pruned_model.predict(X_test)
end_time = time.time()
pruned_inference_time = end_time - start_time
print(f"Pruned Inference Time: {pruned_inference_time:.2f} seconds")

pruned_y_pred = np.argmax(pruned_y_pred_categorical, axis=1)

In [None]:
# 혼동 행렬 및 성능 지표 출력 (검증 데이터)
pruned_y_pred_val = pruned_model.predict(X_val)
pruned_y_pred_val_classes = np.argmax(pruned_y_pred_val, axis=1)

pruned_conf_matrix_val = confusion_matrix(y_val_encoded, pruned_y_pred_val_classes)
pruned_class_report_val = classification_report(y_val_encoded, pruned_y_pred_val_classes, target_names=categories)

print("Pruned Confusion Matrix (Validation):")
print(pruned_conf_matrix_val)
print("\nPruned Classification Report (Validation):")
print(pruned_class_report_val)

In [None]:
# 혼동 행렬 및 성능 지표 출력 (테스트 데이터)
pruned_conf_matrix_test = confusion_matrix(y_test_encoded, pruned_y_pred)
pruned_class_report_test = classification_report(y_test_encoded, pruned_y_pred, target_names=categories)

print("Pruned Confusion Matrix (Test):")
print(pruned_conf_matrix_test)
print("\nPruned Classification Report (Test):")
print(pruned_class_report_test)

In [None]:
# 양자화 인식 훈련(QAT) 적용
def apply_qat(model):
    quantize_model = tfmot.quantization.keras.quantize_model

    # 기존 모델을 양자화 모델로 래핑
    qat_model = quantize_model(model)

    # 양자화 모델 컴파일
    qat_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
                      loss='categorical_crossentropy',
                      metrics=['accuracy'])

    return qat_model

In [None]:
# 양자화 인식 훈련 모델 생성 및 재학습
qat_model = apply_qat(pruned_model)
qat_history = qat_model.fit(X_train, y_train_categorical, epochs=10, batch_size=32, validation_data=(X_val, y_val_categorical))

In [None]:
# 양자화 인식 훈련 모델 평가
qat_val_loss, qat_val_accuracy = qat_model.evaluate(X_val, y_val_categorical)
qat_test_loss, qat_test_accuracy = qat_model.evaluate(X_test, y_test_categorical)

In [None]:
# 양자화 인식 훈련 모델 저장
qat_model.save('qat_model.h5')

# 양자화 인식 훈련 모델 스트립 프루닝 적용
qat_model_stripped = tfmot.sparsity.keras.strip_pruning(qat_model)

In [None]:
# 양자화 인식 훈련 모델 평가
qat_val_loss_stripped, qat_val_accuracy_stripped = qat_model_stripped.evaluate(X_val, y_val_categorical)
qat_test_loss_stripped, qat_test_accuracy_stripped = qat_model_stripped.evaluate(X_test, y_test_categorical)

In [None]:
# 양자화 인식 훈련 모델 사이즈 확인 및 저장
qat_model_stripped.save('qat_model_stripped.h5')
qat_model_size_stripped = os.path.getsize('qat_model_stripped.h5') / (1024 * 1024)  # 모델 사이즈를 MB 단위로 변환
print(f"QAT Model Size (Stripped): {qat_model_size_stripped:.2f} MB")

In [None]:
# 양자화 인식 훈련 모델 메모리 사용량 확인
qat_memory_usage = process.memory_info().rss / (1024 * 1024)  # 메모리 사용량을 MB 단위로 변환
print(f"QAT Memory Usage: {qat_memory_usage:.2f} MB")

In [None]:
# 양자화 인식 훈련 모델 추론 시간 측정
start_time = time.time()
qat_y_pred_categorical = qat_model_stripped.predict(X_test)
end_time = time.time()
qat_inference_time = end_time - start_time
print(f"QAT Inference Time: {qat_inference_time:.2f} seconds")

qat_y_pred = np.argmax(qat_y_pred_categorical, axis=1)

In [None]:
# 혼동 행렬 및 성능 지표 출력 (검증 데이터)
qat_y_pred_val = qat_model_stripped.predict(X_val)
qat_y_pred_val_classes = np.argmax(qat_y_pred_val, axis=1)

qat_conf_matrix_val = confusion_matrix(y_val_encoded, qat_y_pred_val_classes)
qat_class_report_val = classification_report(y_val_encoded, qat_y_pred_val_classes, target_names=categories)

print("QAT Confusion Matrix (Validation):")
print(qat_conf_matrix_val)
print("\nQAT Classification Report (Validation):")
print(qat_class_report_val)


In [None]:
# 혼동 행렬 및 성능 지표 출력 (테스트 데이터)
qat_conf_matrix_test = confusion_matrix(y_test_encoded, qat_y_pred)
qat_class_report_test = classification_report(y_test_encoded, qat_y_pred, target_names=categories)

print("QAT Confusion Matrix (Test):")
print(qat_conf_matrix_test)
print("\nQAT Classification Report (Test):")
print(qat_class_report_test)

In [None]:
# 최종 양자화 모델 생성
converter = tf.lite.TFLiteConverter.from_keras_model(qat_model_stripped)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
quantized_tflite_model = converter.convert()

In [None]:
# 양자화된 모델 저장
with open('quantized_model.tflite', 'wb') as f:
    f.write(quantized_tflite_model)

In [None]:
# 양자화된 모델 사이즈 확인
quantized_model_size = os.path.getsize('quantized_model.tflite') / (1024 * 1024)  # 모델 사이즈를 MB 단위로 변환
print(f"Quantized Model Size: {quantized_model_size:.2f} MB")

In [None]:
# 양자화된 모델 평가 함수 정의
def evaluate_tflite_model(tflite_model_path, X_test, y_test_encoded):
    # Load the TFLite model and allocate tensors
    interpreter = tf.lite.Interpreter(model_path=tflite_model_path)
    interpreter.allocate_tensors()

    # Get input and output tensors
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    # Test the model on the test data
    input_shape = input_details[0]['shape']
    correct_predictions = 0
    total_predictions = 0
    all_preds = []

    start_time = time.time()
    for i in range(X_test.shape[0]):
        input_data = np.expand_dims(X_test[i], axis=0).astype(np.float32)
        interpreter.set_tensor(input_details[0]['index'], input_data)
        interpreter.invoke()
        output_data = interpreter.get_tensor(output_details[0]['index'])
        predicted_label = np.argmax(output_data)
        all_preds.append(predicted_label)
        if predicted_label == y_test_encoded[i]:
            correct_predictions += 1
        total_predictions += 1
    end_time = time.time()

    accuracy = correct_predictions / total_predictions
    inference_time = end_time - start_time
    return accuracy, inference_time, all_preds

In [None]:
# 양자화된 모델 평가 수행
quantized_accuracy, quantized_inference_time, quantized_preds = evaluate_tflite_model('quantized_model.tflite', X_test, y_test_encoded)

print(f"Quantized Model Accuracy: {quantized_accuracy:.2f}")
print(f"Quantized Model Inference Time: {quantized_inference_time:.2f} seconds")

In [None]:
# 양자화된 모델 혼동 행렬 및 성능 지표 출력
quantized_conf_matrix = confusion_matrix(y_test_encoded, quantized_preds)
quantized_class_report = classification_report(y_test_encoded, quantized_preds, target_names=categories)

print("Quantized Confusion Matrix (Test):")
print(quantized_conf_matrix)
print("\nQuantized Classification Report (Test):")
print(quantized_class_report)