In [2]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [3]:
path_to_invasive = '/content/drive/MyDrive/SRCNN/water/inv'
path_to_non_invasive = '/content/drive/MyDrive/SRCNN/water/non_inv'


In [4]:
import os
import numpy as np
from keras.preprocessing.image import load_img, img_to_array

def load_images_and_labels(image_paths, label):
    images = []
    labels = []
    try:
        file_names = os.listdir(image_paths)
        if not file_names:
            print("Directory is empty:", image_paths)
        for image_path in file_names:
            full_path = os.path.join(image_paths, image_path)
            print("Loading:", full_path)  # Debug: Print the path of each image being loaded
            img = load_img(full_path, target_size=(32, 32))
            img = img_to_array(img)
            images.append(img)
            labels.append(label)
    except Exception as e:
        print(f"Error loading images: {e}")
    return np.array(images), np.array(labels)



x_invasive, y_invasive = load_images_and_labels(path_to_invasive, 1)
x_non_invasive, y_non_invasive = load_images_and_labels(path_to_non_invasive, 0)


Loading: /content/drive/MyDrive/SRCNN/water/inv/Im149 (27).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im153 (9).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im152 (21).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im146 (37).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im156 (2).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im154 (8).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im153 (10).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im146 (38).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im149 (28).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im154 (9).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im152 (22).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im156 (3).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im153 (11).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im149 (29).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im152 (23).png
Loading: /content/drive/MyDrive/SRCNN/water/inv/Im156 (4).pn

In [5]:
from sklearn.model_selection import train_test_split

# Concatenate invasive and non-invasive data
x = np.concatenate((x_invasive, x_non_invasive), axis=0)
y = np.concatenate((y_invasive, y_non_invasive), axis=0)

# Split the data into training and testing sets
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)


In [6]:
x_train = x_train.astype('float32') / 255
x_test = x_test.astype('float32') / 255


In [7]:
from keras.utils import to_categorical

y_train = to_categorical(y_train, num_classes=2)
y_test = to_categorical(y_test, num_classes=2)


In [8]:
print(f"x_train shape: {x_train.shape} - y_train shape: {y_train.shape}")
print(f"x_test shape: {x_test.shape} - y_test shape: {y_test.shape}")


x_train shape: (1523, 32, 32, 3) - y_train shape: (1523, 2)
x_test shape: (381, 32, 32, 3) - y_test shape: (381, 2)


In [9]:
!pip install tensorflow-addons

Collecting tensorflow-addons
  Downloading tensorflow_addons-0.23.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (611 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/611.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m174.1/611.8 kB[0m [31m5.1 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m604.2/611.8 kB[0m [31m9.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m611.8/611.8 kB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m
Collecting typeguard<3.0.0,>=2.7 (from tensorflow-addons)
  Downloading typeguard-2.13.3-py3-none-any.whl (17 kB)
Installing collected packages: typeguard, tensorflow-addons
Successfully installed tensorflow-addons-0.23.0 typeguard-2.13.3


In [10]:
import tensorflow as tf
import tensorflow_addons as tfa
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 



In [11]:
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.02),
    ]
)

# Setting the state of the normalization layer.
data_augmentation.layers[0].adapt(x_train)

In [12]:
num_classes = 2
input_shape = (32, 32, 3)

In [13]:
def create_encoder():
    resnet = keras.applications.ResNet50V2(
        include_top=False, weights=None, input_shape=input_shape, pooling="avg"
    )

    inputs = keras.Input(shape=input_shape)
    augmented = data_augmentation(inputs)
    outputs = resnet(augmented)
    model = keras.Model(inputs=inputs, outputs=outputs, name="cifar10-encoder")
    return model


encoder = create_encoder()
encoder.summary()

learning_rate = 0.001
batch_size = 265
hidden_units = 512
projection_units = 128
num_epochs = 50
dropout_rate = 0.5
temperature = 0.05

Model: "cifar10-encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 32, 32, 3)]       0         
                                                                 
 sequential (Sequential)     (None, 32, 32, 3)         7         
                                                                 
 resnet50v2 (Functional)     (None, 2048)              23564800  
                                                                 
Total params: 23564807 (89.89 MB)
Trainable params: 23519360 (89.72 MB)
Non-trainable params: 45447 (177.53 KB)
_________________________________________________________________


In [14]:
from keras.losses import CategoricalCrossentropy

def create_classifier(encoder, trainable=True):
    for layer in encoder.layers:
        layer.trainable = trainable

    inputs = keras.Input(shape=input_shape)
    features = encoder(inputs)
    features = layers.Dropout(dropout_rate)(features)
    features = layers.Dense(hidden_units, activation="relu")(features)
    features = layers.Dropout(dropout_rate)(features)
    outputs = layers.Dense(num_classes, activation="softmax")(features)

    model = keras.Model(inputs=inputs, outputs=outputs, name="cifar10-classifier")
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate),
        loss=CategoricalCrossentropy(),  # Changed here
        metrics=['accuracy']  # Simplified metric for clarity
    )
    return model

# Create the encoder and classifier again
encoder = create_encoder()
classifier = create_classifier(encoder)

# Now fit the model
history = classifier.fit(x=x_train, y=y_train, batch_size=batch_size, epochs=num_epochs)

# Evaluate the model
accuracy = classifier.evaluate(x_test, y_test)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
Test accuracy: 88.71%


In [20]:
import tensorflow as tf
import tensorflow_addons as tfa

class SupervisedContrastiveLoss(tf.keras.losses.Loss):
    def __init__(self, temperature=0.05):
        super().__init__()
        self.temperature = temperature

    def call(self, labels, feature_vectors):
        # Normalize the feature vectors
        feature_vectors_normalized = tf.math.l2_normalize(feature_vectors, axis=1)
        # Compute logits
        logits = tf.matmul(feature_vectors_normalized, feature_vectors_normalized, transpose_b=True) / self.temperature
        # Check shapes
        print("Logits shape:", logits.shape)
        print("Labels shape:", labels.shape)
        # Flatten labels to ensure they are one-dimensional
        labels = tf.reshape(labels, [-1])
        return tfa.losses.npairs_loss(labels, logits)

def add_projection_head(encoder):
    inputs = keras.Input(shape=input_shape)
    features = encoder(inputs)
    outputs = layers.Dense(projection_units, activation="relu")(features)
    model = keras.Model(
        inputs=inputs, outputs=outputs, name="with_projection-head"
    )
    return model

In [21]:
encoder = create_encoder()
encoder_with_projection_head = add_projection_head(encoder)

# Assuming your model's projection head outputs embeddings correctly
encoder_with_projection_head.compile(
    optimizer=keras.optimizers.Adam(learning_rate),
    loss=SupervisedContrastiveLoss(temperature)
)

encoder_with_projection_head.summary()




Model: "with_projection-head"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_12 (InputLayer)       [(None, 32, 32, 3)]       0         
                                                                 
 cifar10-encoder (Functiona  (None, 2048)              23564807  
 l)                                                              
                                                                 
 dense_5 (Dense)             (None, 128)               262272    
                                                                 
Total params: 23827079 (90.89 MB)
Trainable params: 23781632 (90.72 MB)
Non-trainable params: 45447 (177.53 KB)
_________________________________________________________________


In [24]:
import tensorflow as tf

y_train = tf.cast(y_train, tf.int32)
print(y_train.dtype)  # This should output 'int32'


<dtype: 'int32'>


In [18]:
if y_train.ndim > 1:
    y_train = tf.argmax(y_train, axis=1)

history = encoder_with_projection_head.fit(
    x=x_train, y=y_train, batch_size=batch_size, epochs=num_epochs
)


Epoch 1/50
Logits shape: (None, None)
Labels shape: (None, 1)
Logits shape: (None, None)
Labels shape: (None, 1)
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [26]:
classifier = create_classifier(encoder, trainable=False)
import tensorflow as tf

y_train_one_hot = tf.keras.utils.to_categorical(y_train, num_classes=2)

history = classifier.fit(x=x_train, y=y_train_one_hot, batch_size=batch_size, epochs=num_epochs)


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [29]:
# If `y_test` is one-hot encoded, convert it to class indices
y_test_indices = tf.argmax(y_test, axis=1)
y_test_one_hot = tf.keras.utils.to_categorical(y_test_indices, num_classes=2)
# Evaluate the classifier using the one-hot encoded test labels
accuracy = classifier.evaluate(x_test, y_test_one_hot)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")


Test accuracy: 88.19%
