In [None]:
from google.colab import drive
drive.mount('/content/drive')

Divide the RESISC45 dataset into train, validation and test folder for training our ResNet50 model.

In [None]:
import os
import shutil
import random
from tqdm import tqdm

def split_dataset(dataset_dir, output_dir, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
    # Check if any of the ratio variables have been overwritten in the global scope
    if 'train_ratio' in globals():
        print("Warning: 'train_ratio' is defined in the global scope. Using the default value for splitting.")
    if 'val_ratio' in globals():
        print("Warning: 'val_ratio' is defined in the global scope. Using the default value for splitting.")
    if 'test_ratio' in globals():
        print("Warning: 'test_ratio' is defined in the global scope. Using the default value for splitting.")

    assert train_ratio + val_ratio + test_ratio == 1, "Train, validation and test ratios must sum to 1"
    classes = [d for d in os.listdir(dataset_dir) if os.path.isdir(os.path.join(dataset_dir, d))]

    for class_name in classes:
        class_dir = os.path.join(dataset_dir, class_name)
        images = os.listdir(class_dir)
        random.shuffle(images)

        train_split = int(train_ratio * len(images))
        val_split = int(val_ratio * len(images))

        train_images = images[:train_split]
        val_images = images[train_split:train_split + val_split]
        test_images = images[train_split + val_split:]

        # Create directories for train, validation, and test splits
        train_dir = os.path.join(output_dir, 'train', class_name)
        val_dir = os.path.join(output_dir, 'validation', class_name)
        test_dir = os.path.join(output_dir, 'test', class_name)

        os.makedirs(train_dir, exist_ok=True)
        os.makedirs(val_dir, exist_ok=True)
        os.makedirs(test_dir, exist_ok=True)

        # Move images to the corresponding directories
        for img in tqdm(train_images, desc=f'Moving train images for class {class_name}'):
            shutil.move(os.path.join(class_dir, img), os.path.join(train_dir, img))

        for img in tqdm(val_images, desc=f'Moving validation images for class {class_name}'):
            shutil.move(os.path.join(class_dir, img), os.path.join(val_dir, img))

        for img in tqdm(test_images, desc=f'Moving test images for class {class_name}'):
            shutil.move(os.path.join(class_dir, img), os.path.join(test_dir, img))

# Example usage - Provide values for train_ratio, val_ratio, and test_ratio that sum to 1
dataset_dir = '/content/drive/MyDrive/darpa/data_in'
output_dir = '/content/drive/MyDrive/darpa/data_out'
split_dataset(dataset_dir, output_dir, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1) # Example: 80% train, 10% validation, 10% test

Training and Validating the ResNet50 model with 45 classes RESISC45 dataset for finding the Class Consistency.

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Activation, MaxPooling2D, Add, GlobalAveragePooling2D, Dense
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras import mixed_precision
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import CategoricalCrossentropy
import os

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
def identity_block(X, f, filters, stage, block):
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'

    F1, F2, F3 = filters
    X_shortcut = X

    X = Conv2D(F1, (1, 1), name=conv_name_base + '2a')(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2a')(X)
    X = Activation('relu')(X)

    X = Conv2D(F2, (f, f), padding='same', name=conv_name_base + '2b')(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2b')(X)
    X = Activation('relu')(X)

    X = Conv2D(F3, (1, 1), name=conv_name_base + '2c')(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2c')(X)

    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)

    return X

def convolutional_block(X, f, filters, stage, block, s=2):
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'

    F1, F2, F3 = filters
    X_shortcut = X

    X = Conv2D(F1, (1, 1), strides=(s, s), name=conv_name_base + '2a')(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2a')(X)
    #Add activation function

    X = Conv2D(F2, (f, f), padding='same', name=conv_name_base + '2b')(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2b')(X)
    #Add activation function

    X = Conv2D(F3, (1, 1), name=conv_name_base + '2c')(X)
    X = BatchNormalization(axis=3, name=bn_name_base + '2c')(X)

    X_shortcut = Conv2D(F3, (1, 1), strides=(s, s), name=conv_name_base + '1')(X_shortcut)
    X_shortcut = BatchNormalization(axis=3, name=bn_name_base + '1')(X_shortcut)

    X = Add()([X, X_shortcut])
    #Add activation function

    return X

def ResNet50(input_shape=(224, 224, 3), classes=45):
    X_input = Input(input_shape)

    X = Conv2D(64, (7, 7), strides=(2, 2), name='conv1')(X_input)
    X = BatchNormalization(axis=3, name='bn_conv1')(X)
    X = Activation('relu')(X)
    X = MaxPooling2D((3, 3), strides=(2, 2))(X)

    X = convolutional_block(X, f=3, filters=[64, 64, 256], stage=2, block='a', s=1)
    X = identity_block(X, 3, [64, 64, 256], stage=2, block='b')
    X = identity_block(X, 3, [64, 64, 256], stage=2, block='c')

    X = convolutional_block(X, f=3, filters=[128, 128, 512], stage=3, block='a', s=2)
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='b')
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='c')
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='d')

    X = convolutional_block(X, f=3, filters=[256, 256, 1024], stage=4, block='a', s=2)
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='b')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='c')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='d')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='e')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='f')

    X = convolutional_block(X, f=3, filters=[512, 512, 2048], stage=5, block='a', s=2)
    X = identity_block(X, 3, [512, 512, 2048], stage=5, block='b')
    X = identity_block(X, 3, [512, 512, 2048], stage=5, block='c')

   ##Add Global Average pooling and Desne layer for taining our model.
    model = Model(inputs=X_input, outputs=X, name='ResNet50')

    return model

# Set mixed precision policy
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_global_policy(policy)

# Create a MirroredStrategy
strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    train_datagen = ImageDataGenerator(rescale=1./255,
                                       shear_range=0.2,
                                       zoom_range=0.2,
                                       horizontal_flip=True)

    val_datagen = ImageDataGenerator(rescale=1./255)

    train_generator = train_datagen.flow_from_directory('/content/drive/MyDrive/darpa/data_out/train',
                                                        target_size=(224, 224),
                                                        batch_size=16,  # Reduce batch size
                                                        class_mode='categorical')

    validation_generator = val_datagen.flow_from_directory('/content/drive/MyDrive/darpa/data_out/validation',
                                                           target_size=(224, 224),
                                                           batch_size=16,  # Reduce batch size
                                                           class_mode='categorical')

    model = ResNet50(input_shape=(224, 224, 3), classes=train_generator.num_classes)
    model.compile(optimizer=Adam(), loss=CategoricalCrossentropy(), metrics=['accuracy'])

    checkpoint = ModelCheckpoint('best_model.h5', monitor='val_loss', save_best_only=True, mode='min')
    early_stopping = EarlyStopping(monitor='val_loss', patience=5, mode='min')

    model.fit(train_generator,
              steps_per_epoch=train_generator.samples // train_generator.batch_size,
              validation_data=validation_generator,
              validation_steps=validation_generator.samples // validation_generator.batch_size,
              epochs=20,
              callbacks=[checkpoint, early_stopping])

    # Save the final model
    model.save('/content/drive/MyDrive/darpa/resnet50_model1.h5')


Measure the accuracy of our Trained model with Testing dataset

In [None]:
import numpy as np
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Load the trained model
model = load_model('/content/drive/MyDrive/darpa/resnet50_model1.h5')

# Prepare the test data generator
test_datagen = ImageDataGenerator(rescale=1./255)
test_generator = test_datagen.flow_from_directory('/content/drive/MyDrive/darpa/data_out/test',
                                                  target_size=(224, 224),
                                                  batch_size=16,
                                                  class_mode='categorical',
                                                  shuffle=False)

# Evaluate the model on the test data
test_loss, test_accuracy = model.evaluate(test_generator, steps=test_generator.samples // test_generator.batch_size)

print(f'Test Accuracy: {test_accuracy * 100:.2f}%')

# Predict the classes of the test data
Y_pred = model.predict(test_generator, steps=test_generator.samples // test_generator.batch_size + 1)
y_pred = np.argmax(Y_pred, axis=1)

# Compute confusion matrix
conf_matrix = confusion_matrix(test_generator.classes, y_pred)
class_names = list(test_generator.class_indices.keys())

# Plot confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

# Classification report
report = classification_report(test_generator.classes, y_pred, target_names=class_names)
print(report)

Find Class Consistency by predicting top 5 classes of provided image with our trained ResNet50 model

In [None]:
from keras.models import load_model
from keras.preprocessing import image
import numpy as np

# Load your trained ResNet50 model
model_path = "/content/drive/MyDrive/darpa/resnet50_model1.h5"  # Replace with your model path
model = load_model(model_path)

# Load and preprocess the image
file_path = "/content/drive/MyDrive/darpa/findclass_level/patched_image_new_1424.png"  # Replace with your image path
img = image.load_img(file_path, target_size=(224, 224))
img_array = image.img_to_array(img)
img_array = np.expand_dims(img_array, axis=0)
img_array = img_array / 255.0  # Normalize pixel values

# Predict classes
predictions = model.predict(img_array)

# Decode predictions manually for 45 classes

class_names = [
    'airplane', 'airport', 'baseball_diamond', 'basketball_court', 'beach',
    'bridge', 'chaparral', 'church', 'circular_farmland', 'cloud',
    'commercial_area', 'dense_residential', 'desert', 'forest', 'freeway',
    'golf_course', 'ground_track_field', 'harbor', 'industrial_area', 'intersection',
    'island', 'lake', 'meadow', 'medium_residential', 'mobile_home_park',
    'mountain', 'overpass', 'palace', 'parking_lot', 'railway',
    'railway_station', 'rectangular_farmland', 'river', 'roundabout', 'runway',
    'sea_ice', 'ship', 'snowberg', 'sparse_residential', 'stadium',
    'storage_tank', 'tennis_court', 'terrace', 'thermal_power_station', 'wetland'
]
# Predict top 5 classes of a given image with our trained model

# Display top predictions
for i, class_idx in enumerate(top_classes):
    class_name = class_names[class_idx]
    score = top_scores[i]
    print(f"{i + 1}: {class_name} ({score:.2f})")

With our trained model divide the images in two folder which one is attacked and which are not based on class consistency.

In [None]:
from keras.models import load_model
from keras.preprocessing import image
import numpy as np
import os
import shutil

# Load your trained ResNet50 model
model_path = "/content/drive/MyDrive/darpa/resnet50_model1.h5"  # Replace with your model path
model = load_model(model_path)

# Paths to the folders containing images and where to move classified images
image_folder = "/content/drive/MyDrive/darpa/test/terrace"  # Replace with your folder path
attacked_folder = "/content/drive/MyDrive/darpa/attackedd/"  # Replace with your folder path
non_attacked_folder = "/content/drive/MyDrive/darpa/non_attackedd/"  # Replace with your folder path

# Create directories if they don't exist
os.makedirs(attacked_folder, exist_ok=True)
os.makedirs(non_attacked_folder, exist_ok=True)

# Define class names
class_names = [
    'airplane', 'airport', 'baseball_diamond', 'basketball_court', 'beach',
    'bridge', 'chaparral', 'church', 'circular_farmland', 'cloud',
    'commercial_area', 'dense_residential', 'desert', 'forest', 'freeway',
    'golf_course', 'ground_track_field', 'harbor', 'industrial_area', 'intersection',
    'island', 'lake', 'meadow', 'medium_residential', 'mobile_home_park',
    'mountain', 'overpass', 'palace', 'parking_lot', 'railway',
    'railway_station', 'rectangular_farmland', 'river', 'roundabout', 'runway',
    'sea_ice', 'ship', 'snowberg', 'sparse_residential', 'stadium',
    'storage_tank', 'tennis_court', 'terrace', 'thermal_power_station', 'wetland'
]

# Threshold to determine if top class probabilities are almost the same
probability_threshold = 0.95

# Process each image in the folder
for filename in os.listdir(image_folder):
    file_path = os.path.join(image_folder, filename)
    if not os.path.isfile(file_path):
        continue

    # Load and preprocess the image
    img = image.load_img(file_path, target_size=(224, 224))
    img_array = image.img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)
    img_array = img_array / 255.0  # Normalize pixel values


    # Predict top 5 classes of a given image with our trained model and
    #divide the images into attacked and non-attacked folder


    # Determine if the image is attacked or non-attacked
    attacked = False
    for i in range(1, 3):  # Compare top class with 2nd and 3rd classes
        if abs(top_scores[0] - top_scores[i]) < probability_threshold:
            attacked = True
            break

    status = "attacked" if attacked else "non-attacked"

    # Move the image to the corresponding folder
    if status == "attacked":
        shutil.move(file_path, os.path.join(attacked_folder, filename))
    else:
        shutil.move(file_path, os.path.join(non_attacked_folder, filename))

    # Display result
    print(f"Image: {filename}")
    print("Top 5 predicted classes:")
    for i, class_idx in enumerate(top_classes):
        class_name = class_names[class_idx]
        score = top_scores[i]
        print(f"{i + 1}: {class_name} ({score:.2f})")
    print(f"Status: {status}")
    print()