<a href="https://colab.research.google.com/github/TheCaveOfAdullam/study2/blob/main/marchine6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install tensorflow-model-optimization

Collecting tensorflow-model-optimization
  Downloading tensorflow_model_optimization-0.8.0-py2.py3-none-any.whl (242 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m242.5/242.5 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorflow-model-optimization
Successfully installed tensorflow-model-optimization-0.8.0


In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense, BatchNormalization
from keras.optimizers import Adam
from keras.utils import to_categorical
from sklearn.metrics import confusion_matrix, classification_report
import tensorflow_model_optimization as tfmot
import tensorflow as tf
import time
import psutil

In [None]:
# Set the directory path
base_path = '/content/drive/MyDrive/marine_data'
os.chdir(base_path)

# Function to resize images
def resize_img(img, size):
    return img.resize(size)

# Function to load and resize images from a directory
def load_img(file_path):
    data = []
    full_path = os.path.join(base_path, file_path)
    for f in os.listdir(full_path):
        data.append(resize_img(Image.open(os.path.join(full_path, f)), (64, 64)))
    return data

In [None]:
# Load images for different sets
train_normal = load_img('train/normal')
test_normal = load_img('test/normal')
val_normal = load_img('validation/normal')

train_fault_BB = load_img('train/fault_BB')
test_fault_BB = load_img('test/fault_BB')
val_fault_BB = load_img('validation/fault_BB')

train_fault_SM = load_img('train/fault_SM')
test_fault_SM = load_img('test/fault_SM')
val_fault_SM = load_img('validation/fault_SM')

train_fault_RI = load_img('train/fault_RI')
test_fault_RI = load_img('test/fault_RI')
val_fault_RI = load_img('validation/fault_RI')

In [None]:
# Print the lengths of the datasets
print(len(train_normal), len(test_normal), len(val_normal))
print(len(train_fault_BB), len(test_fault_BB), len(val_fault_BB))
print(len(train_fault_SM), len(test_fault_SM), len(val_fault_SM))
print(len(train_fault_RI), len(test_fault_RI), len(val_fault_RI))

# Function to convert images to arrays and normalize
def img_to_array(img):
    return np.array(img, dtype='float32') / 255.0

3000 3000 1500
3004 3000 1500
3000 3000 1500
3000 3000 1500


In [None]:
# Convert images to arrays and define labels
train_normal_arr = np.array([img_to_array(normal) for normal in train_normal])
test_normal_arr = np.array([img_to_array(normal) for normal in test_normal])
val_normal_arr = np.array([img_to_array(normal) for normal in val_normal])

train_fault_BB_arr = np.array([img_to_array(fault) for fault in train_fault_BB])
test_fault_BB_arr = np.array([img_to_array(fault) for fault in test_fault_BB])
val_fault_BB_arr = np.array([img_to_array(fault) for fault in val_fault_BB])

train_fault_SM_arr = np.array([img_to_array(fault) for fault in train_fault_SM])
test_fault_SM_arr = np.array([img_to_array(fault) for fault in test_fault_SM])
val_fault_SM_arr = np.array([img_to_array(fault) for fault in val_fault_SM])

train_fault_RI_arr = np.array([img_to_array(fault) for fault in train_fault_RI])
test_fault_RI_arr = np.array([img_to_array(fault) for fault in test_fault_RI])
val_fault_RI_arr = np.array([img_to_array(fault) for fault in val_fault_RI])

train_normal_sol = np.array([0] * len(train_normal))
test_normal_sol = np.array([0] * len(test_normal))
val_normal_sol = np.array([0] * len(val_normal))

train_fault_BB_sol = np.array([1] * len(train_fault_BB))
test_fault_BB_sol = np.array([1] * len(test_fault_BB))
val_fault_BB_sol = np.array([1] * len(val_fault_BB))

train_fault_SM_sol = np.array([2] * len(train_fault_SM))
test_fault_SM_sol = np.array([2] * len(test_fault_SM))
val_fault_SM_sol = np.array([2] * len(val_fault_SM))

train_fault_RI_sol = np.array([3] * len(train_fault_RI))
test_fault_RI_sol = np.array([3] * len(test_fault_RI))
val_fault_RI_sol = np.array([3] * len(val_fault_RI))

In [None]:
# Combine data and labels
train_img = np.concatenate((train_normal_arr, train_fault_BB_arr, train_fault_SM_arr, train_fault_RI_arr))
train_sol = np.concatenate((train_normal_sol, train_fault_BB_sol, train_fault_SM_sol, train_fault_RI_sol))

test_img = np.concatenate((test_normal_arr, test_fault_BB_arr, test_fault_SM_arr, test_fault_RI_arr))
test_sol = np.concatenate((test_normal_sol, test_fault_BB_sol, test_fault_SM_sol, test_fault_RI_sol))

val_img = np.concatenate((val_normal_arr, val_fault_BB_arr, val_fault_SM_arr, val_fault_RI_arr))
val_sol = np.concatenate((val_normal_sol, val_fault_BB_sol, val_fault_SM_sol, val_fault_RI_sol))

# One-hot encode the labels
train_sol = to_categorical(train_sol, 4)
test_sol = to_categorical(test_sol, 4)
val_sol = to_categorical(val_sol, 4)

In [None]:
# Build the original model
model = Sequential()

# First block
model.add(BatchNormalization(input_shape=train_img.shape[1:]))
model.add(Conv2D(filters=64, kernel_size=(5, 5), padding='same', activation='elu'))
model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(Dropout(0.25))

# Second block
model.add(BatchNormalization())
model.add(Conv2D(128, (5, 5), padding='same', activation='elu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

# Third block
model.add(BatchNormalization())
model.add(Conv2D(256, (5, 5), padding='same', activation='elu'))
model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(Dropout(0.25))

# Fully connected layers
model.add(Flatten())
model.add(Dense(256, activation='elu'))
model.add(Dropout(0.5))
model.add(Dense(128, activation='elu'))
model.add(Dense(4, activation='softmax'))

# Compile the original model
model.compile(loss='categorical_crossentropy', optimizer=Adam(learning_rate=1e-3), metrics=['accuracy'])

In [None]:
# Apply pruning
pruning_params = {
    'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(
        initial_sparsity=0.0, final_sparsity=0.50, begin_step=0, end_step=np.ceil(len(train_img) / 128).astype(np.int32) * 10)
}

pruned_layers = []
for layer in model.layers:
    if isinstance(layer, tf.keras.layers.Conv2D) or isinstance(layer, tf.keras.layers.Dense):
        pruned_layers.append(tfmot.sparsity.keras.prune_low_magnitude(layer, **pruning_params))
    else:
        pruned_layers.append(layer)

pruned_model = Sequential(pruned_layers)

pruned_model.compile(loss='categorical_crossentropy', optimizer=Adam(learning_rate=1e-3), metrics=['accuracy'])

In [None]:
# Train the pruned model
callbacks = [tfmot.sparsity.keras.UpdatePruningStep()]

pruned_model.fit(x=train_img, y=train_sol, batch_size=128, epochs=10, verbose=1, validation_data=(val_img, val_sol), callbacks=callbacks)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<keras.src.callbacks.History at 0x7e06f410c6d0>

In [None]:

# Strip pruning wrappers from the model
pruned_model = tfmot.sparsity.keras.strip_pruning(pruned_model)

In [None]:
# Save models
model.save('original_model.h5')
pruned_model.save('pruned_model.h5')

  saving_api.save_model(


In [None]:
# Model size
original_size = os.path.getsize('original_model.h5')
pruned_size = os.path.getsize('pruned_model.h5')
print(f"Original Model Size: {original_size / 1024:.2f} KB")
print(f"Pruned Model Size: {pruned_size / 1024:.2f} KB")

Original Model Size: 20601.56 KB
Pruned Model Size: 61721.56 KB


In [None]:
# Inference time measurement function
def measure_inference_time(model, data, num_runs=100):
    start_time = time.time()
    for _ in range(num_runs):
        model.predict(data)
    end_time = time.time()
    return (end_time - start_time) / num_runs

In [None]:
# Measure inference time
original_inference_time = measure_inference_time(model, test_img)
pruned_inference_time = measure_inference_time(pruned_model, test_img)
print(f"Original Model Inference Time: {original_inference_time:.6f} seconds per sample")
print(f"Pruned Model Inference Time: {pruned_inference_time:.6f} seconds per sample")



In [None]:
# Memory usage measurement function
def measure_memory_usage(model, data):
    process = psutil.Process(os.getpid())
    start_memory = process.memory_info().rss
    model.predict(data)
    end_memory = process.memory_info().rss
    return (end_memory - start_memory) / (1024 * 1024)  # in MB

In [None]:
# Measure memory usage
original_memory_usage = measure_memory_usage(model, test_img)
pruned_memory_usage = measure_memory_usage(pruned_model, test_img)
print(f"Original Model Memory Usage: {original_memory_usage:.2f} MB")
print(f"Pruned Model Memory Usage: {pruned_memory_usage:.2f} MB")

In [None]:
# Evaluate the pruned model on test data
loss, accuracy = pruned_model.evaluate(test_img, test_sol, batch_size=128)
print(f"Test Loss: {loss}")
print(f"Test Accuracy: {accuracy}")

In [None]:
# Predict the labels
test_pred = pruned_model.predict(test_img)
test_pred_classes = np.argmax(test_pred, axis=1)
test_true_classes = np.argmax(test_sol, axis=1)

In [None]:
# Confusion matrix
conf_matrix = confusion_matrix(test_true_classes, test_pred_classes)
print("Confusion Matrix:")
print(conf_matrix)

In [None]:
# Classification report
class_report = classification_report(test_true_classes, test_pred_classes, target_names=['Normal', 'Fault_BB', 'Fault_SM', 'Fault_RI'])
print("Classification Report:")
print(class_report)

In [None]:
# Plot confusion matrix
plt.figure(figsize=(8, 8))
plt.imshow(conf_matrix, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(4)
plt.xticks(tick_marks, ['Normal', 'Fault_BB', 'Fault_SM', 'Fault_RI'], rotation=45)
plt.yticks(tick_marks, ['Normal', 'Fault_BB', 'Fault_SM', 'Fault_RI'])

fmt = 'd'
thresh = conf_matrix.max() / 2.
for i, j in np.ndindex(conf_matrix.shape):
    plt.text(j, i, format(conf_matrix[i, j], fmt),
             ha="center", va="center",
             color="white" if conf_matrix[i, j] > thresh else "black")

plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.tight_layout()
plt.show()