<a href="https://colab.research.google.com/github/danort92/Bear-Detection/blob/main/Bear_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#BEAR DETECTOR

##LIBRARIES

In [None]:
!pip install tensorflow keras opencv-python
!pip install tensorflow-addons
!pip install --upgrade typeguard
!pip install ultralytics==8.0.196
!pip install roboflow
!pip install pyyaml

In [2]:
import os
import cv2
import glob
import torch
import random
import shutil
import logging
import zipfile
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
import yaml
from sklearn.utils import class_weight
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.metrics import Recall
from roboflow import Roboflow
from ultralytics import YOLO
from google.colab.patches import cv2_imshow
from tensorflow.keras import backend as K
from google.colab import files
from datetime import datetime

##DATASET

###DATASET IMPORT

In [None]:
!git clone https://github.com/danort92/Bear-Detection.git
%cd Bear-Detection

###DATASET AUGMENTATION

In [None]:
# Define paths based on the directory structure
bear_path = 'ct/bear_ct'
other_path = 'ct/other_ct'
train_dir = 'train'
val_dir = 'val'

# Create directories for training and validation
os.makedirs(os.path.join(train_dir, 'bear'), exist_ok=True)
os.makedirs(os.path.join(train_dir, 'other'), exist_ok=True)
os.makedirs(os.path.join(val_dir, 'bear'), exist_ok=True)
os.makedirs(os.path.join(val_dir, 'other'), exist_ok=True)

# Get file paths
bear_files = [os.path.join(bear_path, f) for f in os.listdir(bear_path) if os.path.isfile(os.path.join(bear_path, f))]
other_files = [os.path.join(other_path, f) for f in os.listdir(other_path) if os.path.isfile(os.path.join(other_path, f))]

# Split dataset into training and validation sets
train_bear, val_bear = train_test_split(bear_files, test_size=0.2, random_state=42)
train_other, val_other = train_test_split(other_files, test_size=0.2, random_state=42)

# Copy files to respective directories
for f in train_bear:
    shutil.copy(f, os.path.join(train_dir, 'bear'))
for f in train_other:
    shutil.copy(f, os.path.join(train_dir, 'other'))
for f in val_bear:
    shutil.copy(f, os.path.join(val_dir, 'bear'))
for f in val_other:
    shutil.copy(f, os.path.join(val_dir, 'other'))

# Define paths to training and validation directories
train_dir = 'train'
val_dir = 'val'

# Data generators
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True
)
val_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(224, 224),
    batch_size=32,
    class_mode='binary'
)
val_generator = val_datagen.flow_from_directory(
    val_dir,
    target_size=(224, 224),
    batch_size=32,
    class_mode='binary',
    shuffle=False
)


###PLOT DATASET IMAGES

In [None]:
def plot_images_with_labels(generator, num_images=20):
    """
    Plot images from a data generator with their true labels.

    Args:
        generator: Data generator to provide images and labels.
        num_images: Number of images to plot.
    """
    # Get all images and labels from the generator
    #generator.reset()  # Ensure we start from the beginning of the generator
    images, labels = next(generator)  # Get the first batch
    all_images = images
    all_labels = labels

    # Collect all images and labels
    for _ in range(len(generator) - 1):
        images, labels = next(generator)
        all_images = np.concatenate([all_images, images])
        all_labels = np.concatenate([all_labels, labels])

    # Randomly sample indices
    num_samples = len(all_images)
    random_indices = np.random.choice(num_samples, num_images, replace=False)

    # Map labels to class names
    class_labels = {v: k for k, v in generator.class_indices.items()}

    # Plot a subset of images
    plt.figure(figsize=(15, 10))

    for i, idx in enumerate(random_indices):
        img = all_images[idx]
        label = all_labels[idx]
        label_index = int(label)
        true_label = class_labels[label_index]

        plt.subplot(4, 5, i + 1)
        plt.imshow(img)
        plt.title(f'True: {true_label}')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

print("Training images:")
plot_images_with_labels(train_generator, num_images=20)

print("Validation images:")
plot_images_with_labels(val_generator, num_images=20)


##BEAR - NO BEAR CLASSIFICATION MODEL

###TRAIN MODEL

In [None]:
K.clear_session()

# Compute class weights
class_weights = class_weight.compute_class_weight(
    'balanced',
    classes=np.unique(train_generator.classes),
    y=train_generator.classes
)
class_weights = dict(enumerate(class_weights))

# Load pre-trained model
base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(128, activation='relu')(x)
predictions = Dense(1, activation='sigmoid')(x)
model = Model(inputs=base_model.input, outputs=predictions)

# Freeze the base model layers
for layer in base_model.layers:
    layer.trainable = False

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy', Recall()])

# Define the early stopping callback
early_stopping = EarlyStopping(
    monitor='val_accuracy',    # Monitor validation accuracy
    patience=2,
    restore_best_weights=True  # Restore the best weights
)

# Train the model with early stopping
history = model.fit(
    train_generator,
    epochs=3,
    validation_data=val_generator,
    callbacks=[early_stopping],
    class_weight=class_weights
)

###PLOT MODEL PREDICTIONS TO SEE ACCURACY

In [None]:
THRESHOLD = 0.3

def plot_random_predictions(model, generator, num_images=20):
    """
    Plot random images from the validation set with their true and predicted labels.

    Args:
        model: Trained Keras model.
        generator: Data generator to provide images and labels.
        num_images: Number of random images to plot.
    """
    # Get all images and labels from the generator
    #generator.reset()  # Ensure we start from the beginning of the generator
    images, true_labels = next(generator)  # Get the first batch
    all_images = images
    all_true_labels = true_labels

    # Collect all images and labels
    for _ in range(len(generator) - 1):
        images, true_labels = next(generator)
        all_images = np.concatenate([all_images, images])
        all_true_labels = np.concatenate([all_true_labels, true_labels])

    # Predict labels
    predictions = model.predict(all_images)
    predicted_labels = (predictions > THRESHOLD).astype(int)

    # Map labels to class names
    class_labels = {v: k for k, v in generator.class_indices.items()}

    # Randomly sample indices
    num_samples = len(all_images)
    random_indices = np.random.choice(num_samples, num_images, replace=False)

    plt.figure(figsize=(15, 10))
    for i, idx in enumerate(random_indices):
        img = all_images[idx]
        true_label = class_labels[int(all_true_labels[idx])]
        predicted_label = class_labels[int(predicted_labels[idx])]

        plt.subplot(4, 5, i + 1)
        plt.imshow(img)
        plt.title(f'True: {true_label}\nPredicted: {predicted_label}')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

# Plot random images with true and predicted labels
plot_random_predictions(model, val_generator, num_images=20)

###PLOT MODEL METRICS

In [None]:
def plot_training_history(history):
    """
    Plot training and validation accuracy and loss.

    Args:
        history: Training history object returned by model.fit().
    """
    plt.figure(figsize=(14, 5))

    # Plot accuracy
    plt.subplot(1, 3, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Model Accuracy')
    plt.legend()

    # Plot recall
    plt.subplot(1, 3, 2)
    plt.plot(history.history['recall'], label='Train Recall')
    plt.plot(history.history['val_recall'], label='Validation Recall')
    plt.xlabel('Epoch')
    plt.ylabel('Recall')
    plt.title('Model Recall')
    plt.legend()

    # Plot loss
    plt.subplot(1, 3, 3)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Model Loss')
    plt.legend()

    plt.tight_layout()
    plt.show()

# Plot training history
plot_training_history(history)

In [None]:
def evaluate_with_threshold(model, generator, threshold=THRESHOLD):
    """
    Evaluate the model using a custom threshold and compute metrics.

    Args:
        model: Trained Keras model.
        generator: Data generator to provide images and labels.
        threshold: Classification threshold.
    """
    # Predict labels for the entire validation set
    all_predictions = []
    all_true_labels = []

    # Ensure generator starts from the beginning
    #generator.reset()

    # Iterate over the generator to get all images and labels
    for _ in range(len(generator)):
        images, true_labels = next(generator)
        predictions = model.predict(images)
        all_predictions.extend(predictions.flatten())
        all_true_labels.extend(true_labels.flatten())

    all_predictions = np.array(all_predictions)
    all_true_labels = np.array(all_true_labels)

    # Apply threshold to predictions
    predicted_labels = (all_predictions > threshold).astype(int)

    # Compute confusion matrix
    cm = confusion_matrix(all_true_labels, predicted_labels)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Other', 'Bear'])
    disp.plot(cmap=plt.cm.Blues)
    plt.title(f'Confusion Matrix (Threshold = {threshold})')
    plt.show()

    # Print classification report
    report = classification_report(all_true_labels, predicted_labels, target_names=['Other', 'Bear'])
    print(f"\n Classification Report (Threshold = {threshold}):\n{report}")

    return predicted_labels, all_true_labels

predicted_labels, true_labels = evaluate_with_threshold(model, val_generator, threshold=THRESHOLD)

In [10]:
def upload_and_predict(model, threshold=THRESHOLD):
    """
    Function to upload and predict on images using the trained model.
    Allows the user to upload either a single image or a zip file.
    Saves predicted Bear and Other images into separate timestamped folders.

    Args:
        model: Trained Keras model.
        threshold: Classification threshold.
    """
    # Create a predictions directory with a timestamp
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    base_dir = f"predictions/{timestamp}"
    bear_dir = os.path.join(base_dir, "predicted_bears")
    other_dir = os.path.join(base_dir, "predicted_others")

    # Create directories if they don't exist
    os.makedirs(bear_dir, exist_ok=True)
    os.makedirs(other_dir, exist_ok=True)

    upload_choice = input("Do you want to upload a new image or zip file? (yes/no): ").lower()

    if upload_choice in ['yes', 'y']:
        file_choice = input("Do you want to upload a zip file? (yes/no): ").lower()

        if file_choice in ['yes', 'y']:
            print("Please upload a zip file containing your images.")
            uploaded = files.upload()

            uploaded_dir = "uploaded_images"
            if os.path.exists(uploaded_dir):
                shutil.rmtree(uploaded_dir)
            os.makedirs(uploaded_dir)

            for filename in uploaded.keys():
                if filename.endswith('.zip'):
                    with zipfile.ZipFile(filename, 'r') as zip_ref:
                        zip_ref.extractall(uploaded_dir)
                    print(f"Extracted files to '{uploaded_dir}' directory.")

            uploaded_images = [os.path.join(uploaded_dir, f) for f in os.listdir(uploaded_dir) if f.endswith(('jpg', 'jpeg', 'png'))]

            bear_count = 0
            other_count = 0

            for image_path in uploaded_images:
                image = cv2.imread(image_path)
                if image is None:
                    print(f"Error loading image: {image_path}")
                    continue

                image_resized = cv2.resize(image, (224, 224))
                image_rescaled = image_resized / 255.0
                image_batch = np.expand_dims(image_rescaled, axis=0)

                print(f"Image Shape: {image_batch.shape}")
                print(f"Image Max Value: {np.max(image_batch)}, Min Value: {np.min(image_batch)}")

                prediction = model.predict(image_batch)
                print(f"Prediction Array: {prediction}")

                label = 'Bear' if prediction < (1-threshold) else 'Other'
                print(f"Image: {os.path.basename(image_path)}, Label: {label}")

                # Save the image in the appropriate folder
                if label == 'Bear':
                    bear_count += 1
                    save_path = os.path.join(bear_dir, os.path.basename(image_path))
                else:
                    other_count += 1
                    save_path = os.path.join(other_dir, os.path.basename(image_path))

                cv2.imwrite(save_path, image)  # Save the image in the corresponding directory

            print(f"\nSummary:\nTotal Images: {len(uploaded_images)}\nBear Images: {bear_count}\nOther Images: {other_count}")

        else:
            print("Please upload an image file.")
            uploaded = files.upload()

            for filename in uploaded.keys():
                image_path = filename

                image = cv2.imread(image_path)
                if image is None:
                    print(f"Error loading image: {image_path}")
                    continue

                image_resized = cv2.resize(image, (224, 224))
                image_rescaled = image_resized / 255.0
                image_batch = np.expand_dims(image_rescaled, axis=0)

                print(f"Image Shape: {image_batch.shape}")
                print(f"Image Max Value: {np.max(image_batch)}, Min Value: {np.min(image_batch)}")

                prediction = model.predict(image_batch)
                print(f"Prediction Array: {prediction}")

                label = 'Bear' if prediction < (1-threshold) else 'Other'
                print(f"Image: {image_path}, Label: {label}")

                # Save the image in the appropriate folder
                if label == 'Bear':
                    save_path = os.path.join(bear_dir, os.path.basename(image_path))
                else:
                    save_path = os.path.join(other_dir, os.path.basename(image_path))

                cv2.imwrite(save_path, image)  # Save the image in the corresponding directory

    else:
        print("No images uploaded.")

In [None]:
upload_and_predict(model, threshold=THRESHOLD)

###NOTES
The model effectively recognizes if bears are present or not in the camera trap images and saves uploaded user's pics in Predictions folder.

## BEAR DETECTION MODEL



###TRAIN YOLOv8 MODEL

In [None]:
# Function to find the data.yaml file in a given directory
def find_yaml_file(directory, filename="data.yaml"):
    for root, dirs, files in os.walk(directory):
        if filename in files:
            return os.path.join(root, filename)
    return None

# Function to find the train and validation folders automatically
def find_data_folders(base_dir):
    train_dir = None
    val_dir = None

    for dirpath, dirnames, filenames in os.walk(base_dir):
        if "train" in dirpath and "images" in dirpath:
            train_dir = dirpath
        elif "test" in dirpath and "images" in dirpath:
            val_dir = dirpath

    return train_dir, val_dir

def setup_bear_detection(api_key, workspace_name, project_name, version_number):
    rf = Roboflow(api_key=api_key)
    project = rf.workspace(workspace_name).project(project_name)
    version = project.version(version_number)
    dataset = version.download("yolov8")

    base_dir = f"/content/Bear-Detection/Bear-detection-{version_number}/"
    yaml_file_path = find_yaml_file(base_dir)
    train_path, val_path = find_data_folders(base_dir)

    if yaml_file_path and train_path and val_path:
        with open(yaml_file_path, 'r') as file:
            data = yaml.safe_load(file)
        data['train'] = train_path
        data['val'] = val_path

        with open(yaml_file_path, 'w') as file:
            yaml.safe_dump(data, file)

        return yaml_file_path, version_number

    else:
        raise FileNotFoundError("Required files or directories not found.")

def setup_bear_detection_from_local(base_dir):
    yaml_file_path = find_yaml_file(base_dir)
    train_path, val_path = find_data_folders(base_dir)

    if yaml_file_path and train_path and val_path:
        with open(yaml_file_path, 'r') as file:
            data = yaml.safe_load(file)
        data['train'] = train_path
        data['val'] = val_path

        with open(yaml_file_path, 'w') as file:
            yaml.safe_dump(data, file)

        return yaml_file_path

# Function to handle zip file upload and extraction
def handle_zip_upload(zip_file_path):
    # Extract the zip file
    extract_path = '/content/Bear-Detection/zip_extracted'
    os.makedirs(extract_path, exist_ok=True)
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    return extract_path

# Step 1: Ask the user if they want to use pre-trained weights
use_pretrained_weights = input("Do you want to use pre-trained weights? (yes/no): ").lower()

if use_pretrained_weights in ['yes', 'y']:
    print("Please upload the pre-trained weights file (or a zip containing the weights).")
    uploaded = files.upload()
    uploaded_file = list(uploaded.keys())[0]

    if uploaded_file.endswith('.zip'):
        extract_path = handle_zip_upload(uploaded_file)
        weights_path = os.path.join(extract_path, "best.pt")  # Adjust as needed based on the zip file contents
    else:
        weights_path = uploaded_file

    loaded_model = YOLO(weights_path)
    print("Pre-trained weights loaded successfully. Skipping training.")
else:
    # Step 2: Ask the user if they want to use the locally available dataset or their own Roboflow dataset
    dataset_choice = input("Do you want to use the pre-labeled dataset from your local files? (yes/no): ").lower()

    if dataset_choice in ['yes', 'y']:
        base_dir = "/content/Bear-Detection/Bear detection.v3i.yolov8-obb"  # Adjust this path if needed
        data_yaml_path = setup_bear_detection_from_local(base_dir)
    else:
        api_key = input("Enter your Roboflow API key: ")
        workspace_name = input("Enter your Roboflow workspace name: ")
        project_name = input("Enter your Roboflow project name: ")
        version_number = int(input("Enter the version number of the dataset: "))
        data_yaml_path, version_number = setup_bear_detection(api_key, workspace_name, project_name, version_number)

    # Load YOLOv8 model and train the model if no pre-saved weights are used
    loaded_model = YOLO("yolov8n.pt")
    loaded_model.train(
        data=data_yaml_path,
        epochs=1,
        imgsz=416,
        batch=16,
        optimizer="AdamW",
        lr0=0.001,
        weight_decay=0.0005,
        augment=False,
        half=True  # Enable mixed precision for faster computation
    )

    # After training, ask the user if they want to save the trained weights
    save_weights = input("Do you want to save the trained weights? (yes/no): ").lower()

    if save_weights in ['yes', 'y']:
        model_dir = "/content/Bear-Detection/runs/detect/train/weights"
        zip_file_path = "/content/exported_model.zip"
        shutil.make_archive("/content/exported_model", 'zip', model_dir)

        print("Zipped model files:", os.listdir("/content/"))

        # Download the zipped model file
        files.download(zip_file_path)

# Print confirmation
print("Process completed successfully!")


In [None]:
# Function to process video with YOLO
def process_video_with_yolo(video_path, model, output_path=None):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error opening video file: {video_path}")
        return

    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Create a VideoWriter object if output_path is provided
    if output_path:
        out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

    # Suppress output from the model prediction
    logging.getLogger('ultralytics').setLevel(logging.ERROR)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = model.predict(frame_rgb, verbose=False)  # Suppress verbose output

        # Draw bounding boxes
        for bbox in results[0].boxes.xyxy:
            x1, y1, x2, y2 = map(int, bbox)
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, 'Predicted BB', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2, cv2.LINE_AA)

        # Write the frame to the output video if output_path is provided
        if output_path:
            out.write(frame)

    cap.release()
    if output_path:
        out.release()
    cv2.destroyAllWindows()

def setup_and_process_videos():
    # Directories
    video_files_dir = '/content/Bear-Detection/video_files'
    processed_videos_dir = '/content/Bear-Detection/processed_videos'

    # Create directories if they do not exist
    os.makedirs(video_files_dir, exist_ok=True)
    os.makedirs(processed_videos_dir, exist_ok=True)

    # Upload video files
    print("Upload video files:")
    uploaded = files.upload()

    # List uploaded files
    video_files = list(uploaded.keys())
    print("Uploaded video files:", video_files)

    # Save uploaded files to video_files_dir
    for video_file in video_files:
        file_path = os.path.join(video_files_dir, video_file)
        with open(file_path, 'wb') as f:
            f.write(uploaded[video_file])
        print(f"Uploaded and saved {video_file} to {video_files_dir}")

    # Automatically find all .mp4 files in the video_files_dir
    video_files = [os.path.join(video_files_dir, f) for f in os.listdir(video_files_dir) if f.endswith('.mp4')]

    # Load your model here
    model = YOLO("yolov8n.pt")  # Adjust model as needed

    # Process each video file and save the output
    for video_file in video_files:
        output_file = os.path.join(processed_videos_dir, os.path.basename(video_file).replace('.mp4', '_processed.mp4'))
        print(f"Processing video: {video_file}")
        process_video_with_yolo(video_file, model, output_file)
        print(f"Processed video saved as: {output_file}")

    # Print the directory where the processed videos are saved
    print(f"\nAll processed videos are saved in: {processed_videos_dir}")

# Run the setup and processing function
setup_and_process_videos()


### NOTES

This project provides a comprehensive setup for bear detection using YOLOv8. It offers flexibility in using pre-trained models or training from scratch with local or cloud datasets. The video processing functionality further extends the project’s capabilities, allowing for real-time bear detection in video footage. This tool can be valuable for wildlife monitoring and research, offering an efficient method to identify and track bears in various environments.

It still improvable using a bigger camera traps set and adding more standing up bear images. Due to its flexibility the model can be easily adaptable to track animals different from bears with small code apatations.

Future developments, with enough available data: single bear detection (work in progress...)