In [215]:
#!pip install pillow


In [216]:
from PIL import Image, ImageOps
import shutil
import os
import pandas as pd
import numpy as np
from collections import Counter

import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator, img_to_array, load_img
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense


In [217]:
# Parameters
param = {}

# if we want to try one part of our dataset
param['frac'] = 1 
param['random_state'] = 42 # for part of dataset and train_test_split

param['use_white_line_cropping'] = True         # removes 16 pixels at the bottom if its white
param['use_logo_cropping'] = True               # removes the orange logo in the bottom left corner
param['use_padding'] = True                     # it makes photos look square due to black borders at the top and bottom 
param['channels_number'] = 1                    # 3 = RGB, 1 = black & white after cropping the orange logo
param['use_augmentation'] = False
param['low_classes'] = ['bird', 'hog']          # for augmentation

param['data_folder'] = '../data'                # if it's the same folder use '.'
param['temp_folder'] = os.path.join(data_folder, 'temp') 

# Check if temp folder alredy exists and delete it to have only files from this session
if os.path.exists(temp_folder):
    #Delete the directory along with its contents
    shutil.rmtree(temp_folder)
# create empty temp folder
os.makedirs(temp_folder)

# Set paths for input data folders
param['input_train_folder'] = os.path.join(param['data_folder'], 'train_features')
param['input_test_folder'] = os.path.join(param['data_folder'], 'test_features')
param['label_csv'] = os.path.join(param['data_folder'], 'train_labels.csv')
param['train_features_csv'] = os.path.join(param['data_folder'], 'train_features.csv')
param['test_features_csv'] = os.path.join(param['data_folder'], 'test_features.csv')

# Set paths for output data folders
param['output_train_and_val_folder'] = os.path.join(param['temp_folder'], 'train_and_val') # if we don't do any augmentation
param['output_test_folder'] = os.path.join(param['temp_folder'], 'test')

# Set paths for output label files
param['output_train_label_csv'] = os.path.join(param['temp_folder'], 'train_labels.csv')
param['output_val_label_csv'] = os.path.join(param['temp_folder'], 'val_labels.csv')

# Set paths for output feature files
param['output_train_features_csv'] = os.path.join(param['temp_folder'], 'train_features.csv')
param['output_val_features_csv'] = os.path.join(param['temp_folder'], 'val_features.csv')
param['output_test_features_csv'] = os.path.join(param['temp_folder'], 'test_features.csv')

# Set path to save the results
param['results_csv_path'] = os.path.join(param['temp_folder'], 'test_predictions.csv')



# Create variables and assign values. It means we can use for example    batch_size = 32    low_classes = ['bird', 'hog']
for key, value in param.items():
    globals()[key] = value


# Convert dictionary to DataFrame with parameters as index and save to csv
df_param = pd.DataFrame(list(param.items()), columns=['parameter', 'value']).set_index('parameter')
df_param.to_csv('../data/temp/parameters.csv', index=True)


#We should use it in models notebook to have the same parameters

# # Convert 'value' column to appropriate data types
# def convert_value(value):
#     try:
#         return int(value)
#     except ValueError:
#         try:
#             return float(value)
#         except ValueError:
#             return value  # Return as string if it can't be converted to number

# param_path = '../data/temp/parameters.csv'

# if os.path.exists(param_path):
#     df_param = pd.read_csv(param_path)
#     df_param.set_index('parameter', inplace=True)
#     # Apply the conversion function to the 'value' column
#     df_param['value'] = df_param['value'].apply(convert_value)
#     param = df_param.to_dict()['value']

# else:
#     param = {}

#     # if we want to try one part of our dataset
#     param['frac'] = 0.1 
#     param['random_state'] = 1 
    
#     param['data_folder'] = '../data' # if this file and folders train_features and test_features are in the same folder use '.'
#     param['label_csv'] = os.path.join(data_folder, 'train_labels.csv')
#     param['train_features_csv'] = os.path.join(data_folder, 'train_features.csv')
#     param['test_features_csv'] = os.path.join(data_folder, 'test_features.csv')

#     # Convert dictionary to DataFrame with parameters as index and save to csv
#     df_param = pd.DataFrame(list(param.items()), columns=['parameter', 'value']).set_index('parameter')


# # Create variables and assign values
# for key, value in param.items():
#     globals()[key] = value
# print(df_param)


print(df_param)


                                                         value
parameter                                                     
frac                                                         1
random_state                                                42
use_white_line_cropping                                   True
use_logo_cropping                                         True
use_padding                                               True
channels_number                                              1
use_augmentation                                         False
low_classes                                        [bird, hog]
data_folder                                            ../data
temp_folder                                       ../data/temp
input_train_folder                      ../data/train_features
input_test_folder                        ../data/test_features
label_csv                             ../data/train_labels.csv
train_features_csv                  ../data/train_featu

In [218]:
# Define the range of white color
def is_white_color(color, threshold=230):
    # Check if color is close to white within the given threshold
    return all(c >= threshold for c in color)


# Adjust threshold based on what is considered "close to white"
def crop_white_bottom_from_image(img, check_hight=8, check_width=10, start_from=60, threshold=230):
    width, height = img.size
    # Define the region to check for white color
    box = (start_from, height - check_hight, start_from + check_width, height)
    region = img.crop(box)
    # Convert the region to a NumPy array for easy processing
    region_np = np.array(region)
    # Check if all pixels in the region are close to white
    white_pixels = np.apply_along_axis(is_white_color, 1, region_np)
    
    if np.all(white_pixels):
        # Crop the bottom 16 pixels
        img = img.crop((0, 0, width, height - check_hight - 8))
    return img


def padding_images(img):
    # Determine the maximum size (either width or height)
    max_size = max(img.width, img.height)
    
    # Calculate padding needed to center the image within a square of size max_size x max_size
    delta_width = max_size - img.width
    delta_height = max_size - img.height
    padding = (
        delta_width // 2,  # Left padding
        delta_height // 2,  # Top padding
        delta_width - (delta_width // 2),  # Right padding
        delta_height - (delta_height // 2)  # Bottom padding
    )
    
    # Add padding and create a new image with a black background
    return ImageOps.expand(img, padding, fill=0)


def set_channel(img,channel):
    if channel == 3:
        img = img.convert('RGB')
    else:
        # Convert to grayscale
        img = img.convert('L')
    return img


def is_bright_orange(rgb):
    r, g, b = rgb
    # Approximate boundaries for bright orange color in RGB
    return (r > 180 and 50 < g < 200 and b < 110)

def replace_logo_with_neighboring_colors(img,search_area_height=50, search_area_width=50):
    #Get the image dimensions
    width, height = img.size
    pixels = img.load()

    # List to store coordinates of orange pixels
    orange_points = []

    # Iterate over the bottom-left 50x50 rectangle
    for x in range(search_area_width):
        for y in range(height - search_area_height, height):
            if is_bright_orange(pixels[x, y]):
                orange_points.append((x, y))
 
    # If orange pixels are found
    if orange_points:
        # Find the top-right orange point and take some points more
        max_orange_x = max(x for x,y in orange_points)+3
        min_orange_y = min(y for x,y in orange_points)-5

        # Replace logo with vertical lines. Color is like color of pixel above
        for x in range(max_orange_x):
            color = pixels[x, min_orange_y-1]
            for y in range(min_orange_y, height):
                pixels[x, y] = color
    return img



In [219]:
# Augmentation for birds and hogs

def augment_class_images(input_folder, train_df, target_classes):
    # Calculate the average number of images across all classes
    class_counts = train_df.drop(columns=['id']+target_classes).sum(axis=0)
    avg_count = int(class_counts.mean())

    # Create a generator for data augmentation
    datagen = ImageDataGenerator(
        rotation_range=10,       # Rotations
        width_shift_range=0.1,   # Width shifts
        height_shift_range=0.1,  # Height shifts
        shear_range=0.1,         # Shear angle shifts
        zoom_range=0.1,          # Zooming
        horizontal_flip=True,    # Horizontal flipping
        fill_mode='nearest'      # Filling new pixels after transformation
    )
    # Perform augmentation for each target class
    for target_class in target_classes:
        # Filter the dataframe for the target class
        class_df = df[df[target_class] == 1]
        # Calculate how many images need to be generated
        num_images_needed = avg_count - len(class_df)

        # Create output folder for the augmented images
        output_folder = os.path.join(input_folder, target_class)#f'{target_class}_augmented')

        # Check if the directory exists
        if os.path.exists(output_folder):
            #Delete the directory along with its contents
            shutil.rmtree(output_folder)

        os.makedirs(output_folder, exist_ok=True)
        for _ in range(num_images_needed): 
            random_id = class_df['id'].sample(n=1).iloc[0] 
            img_path = os.path.join(input_folder, random_id)
            img = load_img(img_path)
            x = img_to_array(img)  # Convert image to numpy array
            x = x.reshape((1,) + x.shape)  # Add dimension to work with ImageDataGenerator
            # Generate new images
            for batch in datagen.flow(x, batch_size=1,save_to_dir=output_folder, save_prefix=target_class, save_format='jpeg'):
                break
           
           
        print(f"Created {num_images_needed} augmented images for class '{target_class}' in folder '{output_folder}'.")




In [220]:
def preprocessing_features(input_folder,output_folder,input_df=None,use_white_line_cropping=use_white_line_cropping,use_logo_cropping=use_logo_cropping,\
                           use_padding=use_padding,channels_number=channels_number):

    # Create the new folder if it does not exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    # Iterate through each image in the source folder
    if input_df is not None and not input_df.empty:
        file_list = input_df['id']
    else:
        file_list = os.listdir(input_folder)
        #file_list = file_list.apply(lambda x: os.path.join(input_folder, x))

    i = 0    
    for filename in file_list:
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            file_path = os.path.join(input_folder, filename)
            with Image.open(file_path) as img:
                img = set_channel(img,3) # to crop orange logo
                if use_white_line_cropping:
                    img = crop_white_bottom_from_image(img)
                if use_logo_cropping:
                    img = replace_logo_with_neighboring_colors(img)
                img =  set_channel(img,channels_number)
                if use_padding:
                    img = padding_images(img)
           
                new_file_path = os.path.join(output_folder, filename)
                img.save(new_file_path)
                i += 1
    print(f"{i} rows were added in {output_folder}")


In [221]:
df_features = pd.read_csv(train_features_csv)
df = pd.read_csv(label_csv)
df = df_features.merge(df,on='id')
df

Unnamed: 0,id,filepath,site,antelope_duiker,bird,blank,civet_genet,hog,leopard,monkey_prosimian,rodent
0,ZJ000000,train_features/ZJ000000.jpg,S0120,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
1,ZJ000001,train_features/ZJ000001.jpg,S0069,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
2,ZJ000002,train_features/ZJ000002.jpg,S0009,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
3,ZJ000003,train_features/ZJ000003.jpg,S0008,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
4,ZJ000004,train_features/ZJ000004.jpg,S0036,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...
16483,ZJ016483,train_features/ZJ016483.jpg,S0093,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
16484,ZJ016484,train_features/ZJ016484.jpg,S0043,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
16485,ZJ016485,train_features/ZJ016485.jpg,S0089,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
16486,ZJ016486,train_features/ZJ016486.jpg,S0095,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0


In [222]:
class_names = list(df.columns[3:])

df = df.sample(frac=frac, random_state=random_state)


# Add a full path to each image in the dataframe
df['id'] = df['id'].apply(lambda x: x+'.jpg')#os.path.join(input_train_folder, x+'.jpg'))
#df['id'] = df['id'].apply(lambda x: x+'.jpg')#os.path.join(dataset_folder, x+'.jpg'))

# Perform stratified split based on the labels
train_df, val_df = train_test_split(df, test_size=0.25, random_state=random_state, stratify=df[df.columns[3:]])



preprocessing_features(input_train_folder, output_train_and_val_folder, train_df)
preprocessing_features(input_train_folder, output_train_and_val_folder, val_df)
preprocessing_features(input_test_folder, output_test_folder)

#if use_augmentation:
    #augment_class_images(middle_train_folder, train_df, low_classes)




train_df['filepath'] = train_df['id'].apply(lambda x: os.path.join(output_train_and_val_folder, x))
train_df['id'] = train_df['id'].str.replace('.jpg', '', regex=False)
train_df.drop(columns=['filepath','site']).to_csv(output_train_label_csv, index=False)
train_df[['id','filepath','site']].to_csv(output_train_features_csv, index=False)


val_df['filepath'] = val_df['id'].apply(lambda x: os.path.join(output_train_and_val_folder, x))
val_df['id'] = val_df['id'].str.replace('.jpg', '', regex=False)
val_df.drop(columns=['filepath','site']).to_csv(output_val_label_csv, index=False)
val_df[['id','filepath','site']].to_csv(output_val_features_csv, index=False)

test_df = pd.read_csv(test_features_csv)
test_df['filepath'] = test_df['id'].apply(lambda x: os.path.join(output_test_folder, x+'.jpg'))
test_df.to_csv(output_test_features_csv, index=False)

print(f"Train labels ({len(train_df)}) saved to {output_train_label_csv}")
print(f"Validation labels ({len(val_df)}) saved to {output_val_label_csv}")

print(f"Train features saved to {output_train_features_csv}")
print(f"Validation features saved to {output_val_features_csv}")

print(f"Test features saved to {output_test_features_csv}")



12366 rows were added in ../data/temp/train_and_val
4122 rows were added in ../data/temp/train_and_val
4464 rows were added in ../data/temp/test
Train labels (12366) saved to ../data/temp/train_labels.csv
Validation labels (4122) saved to ../data/temp/val_labels.csv
Train features saved to ../data/temp/train_features.csv
Validation features saved to ../data/temp/val_features.csv
Test features saved to ../data/temp/test_features.csv
