## Get data from bucket

In [ ]:
import boto3
import matplotlib.pyplot as plt
from PIL import Image
from io import BytesIO
import os, sys
from PIL import Image



In [ ]:
# Define your S3 bucket and the image key
bucket_name = 's3-avalanche-guard'
positive_image_key = 'data/experiments/exp01-terrain-binary/train/positive/2015-03-07 rosskogel-windegg (3).jpg'
negative_image_key = 'data/experiments/exp01-terrain-binary/train/negative/ILSVRC2012_val_00000004_n04263257.JPEG'


In [ ]:
# Initialize a session using Amazon S3
s3 = boto3.client('s3')

# Get the image from S3
response = s3.get_object(Bucket=bucket_name, Key=positive_image_key)
image_data = response['Body'].read()

# Open the image
image = Image.open(BytesIO(image_data))

# Plot the image
plt.imshow(image)
plt.axis('off')  # Hide the axis
plt.show()

# Negative Image
# Get the image from S3
response = s3.get_object(Bucket=bucket_name, Key=negative_image_key)
image_data = response['Body'].read()

# Open the image
image = Image.open(BytesIO(image_data))

# Plot the image
plt.imshow(image)
plt.axis('off')  # Hide the axis
plt.show()


## Train a Custom CNN model for binary classification

In [ ]:
!pip install -q tensorflow==2.11.0 

In [ ]:
!pip install -q pydot

In [ ]:
!pip install -q graphviz

In [ ]:
#!pip install -q --upgrade keras
#!pip install -q --upgrade tensorflow

#!pip install -q keras-nightly
#!pip install -q tensorflow==2.12.0 --user


In [ ]:
import tensorflow as tf
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.applications import EfficientNetB4
from tensorflow.keras.applications import EfficientNetV2B0
from tensorflow.keras.applications.efficientnet_v2 import EfficientNetV2S
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

from keras import layers
import numpy as np

import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from keras import utils as kutils
#from keras import ops as kops

import keras



import boto3
import os

In [ ]:
print(tf.__version__)
#print(layers.__version__)

In [ ]:
print(tf.__version__)

In [ ]:
# Set the S3 bucket paths
# Set up the S3 bucket paths
s3_bucket = 's3-avalanche-guard'
s3_directory_key = 'data/experiments/exp01-terrain-binary/'

#s3_train ='cropped_images_noaugm_TrainValTest_balanced/train'
#s3_val = 'cropped_images_noaugm_TrainValTest_balanced/val'
#s3_test = 'cropped_images_noaugm_TrainValTest_balanced/test'

In [ ]:
# Local directory to save images
local_directory = 'AR/images/local_image_directory'

# Create the local directory if it doesn't exist
if not os.path.exists(local_directory):
    os.makedirs(local_directory)


In [ ]:
#### DO NOT RUN
RUN_THIS = False

# Initialize a session using Amazon S3
s3 = boto3.client('s3')

# List all objects in the S3 directory
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket_name, Prefix=s3_directory_key)



if RUN_THIS:
    # Download each image and recreate the folder structure
    for page in pages:
        for obj in page['Contents']:
            key = obj['Key']
            if key.endswith('/'):  # Skip directories
                continue
            # Recreate the directory structure locally
            relative_path = os.path.relpath(key, s3_directory_key)
            local_file_path = os.path.join(local_directory, relative_path)
            local_file_dir = os.path.dirname(local_file_path)
            if not os.path.exists(local_file_dir):
                os.makedirs(local_file_dir)
            # Download the file
            s3.download_file(bucket_name, key, local_file_path)

            # resize
            size = 224, 224
            #outfile = os.path.splitext(local_file_path)[0] + ".jpg"
            im = Image.open(local_file_path)
            im.thumbnail(size, Image.Resampling.LANCZOS)
    #im.save(local_file_path, "JPEG")
else:
    print("Skipped downloading images from s3")

In [ ]:
# Local Directory paths

local_train = os.path.join(local_directory, 'train')
local_val =  os.path.join(local_directory, 'val')
local_test =  os.path.join(local_directory, 'test')

In [ ]:
# Function to load datasets
def load_datasets(img_dir: [str], img_size=(224, 224), batch_size=32):

    train_dataset = None
    val_dataset = None
    test_dataset = None

    if len(img_dir) >= 1:
        train_dir = img_dir[0]
        print(f"train dir:{train_dir}")
        train_dataset = image_dataset_from_directory(
            train_dir,
            labels='inferred',
            class_names=['negative', 'positive'],
            label_mode='int',
            batch_size=batch_size,
            image_size=img_size,
            shuffle=True
        )
        val_dataset = None
        test_dataset = None
    
    if len(img_dir) >= 2:
        val_dir = img_dir[1]
        print(f"val dir:{val_dir}")
        val_dataset = image_dataset_from_directory(
            val_dir,
            labels='inferred',
            class_names=['negative', 'positive'],
            label_mode='int',
            batch_size=batch_size,
            image_size=img_size,
            shuffle=False
        )

    if len(img_dir) >= 3:
        test_dir = img_dir[2]
        print(f"test dir:{test_dir}")
        test_dataset = image_dataset_from_directory(
            test_dir,
            labels='inferred',
            class_names=['negative', 'positive'],
            label_mode='int',
            batch_size=batch_size,
            image_size=img_size,
            shuffle=False
        )

        
    return train_dataset, val_dataset, test_dataset

In [ ]:
# Load datasets
train_dataset, val_dataset, test_dataset = load_datasets([local_train, local_val, local_test],(224, 224),32)

In [ ]:
type(train_dataset)

In [ ]:
# Get cardinality
# Cardinality is images/batch size
print(f"Number of training samples: {train_dataset.cardinality()}")
print(f"Number of validation samples: {val_dataset.cardinality()}")
print(f"Number of test samples: {test_dataset.cardinality()}")
print(train_dataset.class_names)

#### Augment data

some reference from https://keras.io/guides/transfer_learning/

In [ ]:
augmentation_layers = [
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.1),
]


def data_augmentation(x):
    for layer in augmentation_layers:
        x = layer(x)
    return x


# aument only train dataset
train_dataset_aug = train_dataset.map(lambda x, y: (data_augmentation(x), y))


In [ ]:
#visualize the augmented data
print(train_dataset)
for images, labels in train_dataset.take(1):
    plt.figure(figsize=(10, 10))
    first_image = images[0]
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        augmented_image = data_augmentation(np.expand_dims(first_image, 0))
        plt.imshow(np.array(augmented_image[0]).astype("int32"))
        plt.title(int(labels[0]))
        plt.axis("off")


In [ ]:
# Function to build the model
def xx_build_model(num_classes):
    base_model = EfficientNetV2B0EfficientNetV2S(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(1024, activation='relu')(x)
    x = layers.Dropout(0.2)(x)  # Regularize with dropout
    predictions = Dense(num_classes, activation='softmax')(x)
    
    model = Model(inputs=base_model.input, outputs=predictions)
    
    for layer in base_model.layers:
        layer.trainable = False
    
    layerRange = []
    for _layer in model.layers:
        if _layer not in base_model.layers:
            layerRange.append(_layer.name)
            
    trainable_layers = len(layerRange)
    for idx,_layer in enumerate(layerRange):
        if idx != 0 and idx != trainable_layers-1:
            layerRange.remove(_layer)
        
    model.summary(show_trainable=True, layer_range=layerRange)    
        
        
    
    return model

In [ ]:
def xx2_build_model(p_input_shape):
    
    base_model = keras.applications.Xception(
        weights="imagenet",  # Load weights pre-trained on ImageNet.
        input_shape=p_input_shape,
        include_top=False,
    )  # Do not include the ImageNet classifier at the top.

    # Freeze the base_model
    base_model.trainable = False

    # Create new model on top
    inputs = keras.Input(shape=(224, 224, 3))

    # Pre-trained Xception weights requires that input be scaled
    # from (0, 255) to a range of (-1., +1.), the rescaling layer
    # outputs: `(inputs * scale) + offset`
    scale_layer = keras.layers.Rescaling(scale=1 / 127.5, offset=-1)
    x = scale_layer(inputs)

    # The base model contains batchnorm layers. We want to keep them in inference mode
    # when we unfreeze the base model for fine-tuning, so we make sure that the
    # base_model is running in inference mode here.
    x = base_model(x, training=False)
    x = keras.layers.GlobalAveragePooling2D()(x)
    x = keras.layers.Dropout(0.2)(x)  # Regularize with dropout
    outputs = keras.layers.Dense(1)(x)
    model = keras.Model(inputs, outputs)
    
    # Print summary of only trainable 
    layerRange = []
    for _layer in model.layers:
        if layer.trainable:
            layerRange.append(_layer.name)
            
    trainable_layers = len(layerRange)
    for idx,_layer in enumerate(layerRange):
        if idx != 0 and idx != trainable_layers-1:
            layerRange.remove(_layer)
        
    model.summary(show_trainable=True, layer_range=layerRange)    

    

    return model


##### Build From Scratch 
(ref https://keras.io/examples/vision/image_classification_from_scratch/)

In [ ]:
def build_model(input_shape, num_classes):
    inputs = keras.Input(shape=input_shape)

    # Entry block
    x = layers.Rescaling(1.0 / 255)(inputs)
    x = layers.Conv2D(128, 3, strides=2, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    previous_block_activation = x  # Set aside residual

    for size in [256, 512, 728]:
        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(size, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(size, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # Project residual
        residual = layers.Conv2D(size, 1, strides=2, padding="same")(
            previous_block_activation
        )
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    x = layers.SeparableConv2D(1024, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    x = layers.GlobalAveragePooling2D()(x)
    if num_classes == 2:
        units = 1
    else:
        units = num_classes

    x = layers.Dropout(0.25)(x)
    # We specify activation=None so as to return logits
    outputs = layers.Dense(units, activation=None)(x)
    
    return keras.Model(inputs, outputs)





In [ ]:
print(train_dataset.class_names)
y = set(np.concatenate([y for x, y in train_dataset_aug], axis=0))
print(f"y is {y}")

In [ ]:
image_size = (224,224)
model = build_model(input_shape=image_size + (3,), num_classes=2)
model.summary(show_trainable=True, layer_range=None)    

#keras.utils.plot_model(model, show_shapes=True)

In [ ]:
# Build the model
####num_classes = len(train_dataset.class_names)
####model = build_model(num_classes)

# Compile the model
####model.compile(optimizer=Adam(), 
####              loss='sparse_categorical_crossentropy', 
####              metrics=['accuracy'])



In [ ]:
# Train the model

'''
_epochs = 5

history = model.fit(train_dataset_aug, validation_data=val_dataset, epochs=_epochs)
'''

epochs = 25

####callbacks = [
####    keras.callbacks.ModelCheckpoint("save_at_{epoch}.keras"),
####]

model.compile(
    optimizer=keras.optimizers.Adam(3e-4),
    loss=keras.losses.BinaryCrossentropy(from_logits=True),
    metrics=[keras.metrics.BinaryAccuracy(name="acc")],
)

history = model.fit(
    train_dataset_aug,
    epochs=epochs,
    validation_data=val_dataset,
)




In [ ]:
print(model)

In [ ]:
#model.summary()

In [ ]:
local_model_path = 'AR/models/Binary_TerrainOrNot_ARCNN_1'  # Local path to save the model
model.save(local_model_path, save_format='tf')

In [ ]:
'''
tf.keras.utils.plot_model(
    model,
    to_file='AR/models/Binary_TerrainOrNot_EfficientNetV2S_1.png',
    show_shapes=False,
    show_dtype=False,
    show_layer_names=False,
    rankdir='TB',
    expand_nested=False,
    dpi=200,
    show_layer_activations=False,
    show_trainable=False,
    **kwargs
)
'''

#tf.keras.utils.plot_model(model)



In [ ]:


acc = history.history['']
val_acc = history.history['val_acc']

loss = history.history['loss']
val_loss = history.history['val_loss']

epochs_range = range()

plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()


#### Run Inference from the model

In [146]:
# load test dataset 

test_dataset = image_dataset_from_directory(
        local_test,
        labels='inferred',
        label_mode='int',
        batch_size=32,
        image_size=(224, 224),
        shuffle=False
    )
    

Found 722 files belonging to 2 classes.


Found 722 files belonging to 2 classes.


In [148]:
output = model.predict(traintest_dataset) 





In [133]:
print(output.shape)
print(tf.data.experimental.cardinality(test_dataset))
print(test_dataset.cardinality().numpy())


(5043, 2)
tf.Tensor(23, shape=(), dtype=int64)
23


In [None]:
print(output)

In [156]:
print(np.array(train_dataset.take(42)))

<TakeDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int32, name=None))>


In [166]:
plt.figure(figsize=(10, 10))
for images, labels in train_dataset.take(42):
    #print(np.array(images))
    #print(np.array(labels))
    for i in range(32):
        ax = plt.subplot(8, 4, i + 1)
        
        #neg_prob = float(output[i][0]) #round(float(output[i][0]),2)
        #pos_prob = float(output[i][1]) #round(float(output[i][1]),2)

        neg_prob = round(float(output[i][0]),2)
        pos_prob = round(float(output[i][1]),2)
        
        if pos_prob > neg_prob:
            pred_class = "Terrain"
        else:
            pred_class = "Not-a-Terrain"
        
        #print(labels[i],pos_prob)
        print(f"OLabel={labels[i]},PLabel={neg_prob},{pos_prob}")
        #plt.imshow(np.array(images[i]).astype("uint8"))
        #plt.title(f"OLabel={labels[i]},PLabel={neg_prob},{pos_prob}", fontsize = 6)
        #plt.axis("off")

#print(train_dataset.label)

Motivation - https://keras.io/examples/vision/image_classification_from_scratch/

In [ ]:

# import required module

#print(directory) 

# iterate over files in
# that directory
i=0
LIMIT = 70
directory = local_test + "/negative"
image_size = (224,224)


for filename in os.listdir(directory):
    i=i+1
    ##pred_class_arr = output[i-1]
    ##if pred_class_arr[1] > pred_class_arr[0]:
    ##    pred_class = "Terrain"
    ##else:
    ##    pred_class = "Not-a-Terrain"
    ##print(f"Index = {i} - Class = {pred_class}  -  ClassArray={pred_class_arr}")
    if i> LIMIT:
        break
    f = os.path.join(directory, filename)
    # checking if it is a file
    if os.path.isfile(f):
        
        img = kutils.load_img(f, target_size=image_size)
        #plt.imshow(img)

        img_array = kutils.img_to_array(img)
        img_array = np.expand_dims(img_array, 0)  # Create batch axis

        predictions = model.predict(img_array)
        print(predictions) 
        #pred_neg = predictions[0][0]
        #pred_pos = predictions[0][1]
        prediction_score_raw = predictions[0,0]
        prediction_score = 1/(1+np.exp(-prediction_score_raw) )
        
        
        #pos_score = round(pred_pos, 5)
        #print(f"** Pred_neg_raw = {pred_neg} , Pred_pos_raw = {pred_pos} , Istoal 100%{pred_pos + pred_neg}. This image is {100 * (1 - pos_score):.2f}% Not-A-Terrain and {100 * pos_score:.2f}% Terrain.")
        print(f"{100 * prediction_score:.2f}% Terrain.")

        # Open the image
        ###image = mpimg.imread(f)
        plt.figure(figsize=(6, 3))

        # Plot the image
        plt.imshow(img)
        plt.axis('off')  # Hide the axis
        plt.show()

        
