# Poultry Disease Classification from Fecal Images

This notebook classifies poultry diseases based on fecal images into four classes:
- Healthy
- Coccidiosis (cocci)
- Salmonella (salmo)
- Newcastle Disease (ncd)

The model is optimized for Mac M2 with 8GB RAM.

## 1. Import Libraries

In [None]:

import os
import time
import zipfile
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('darkgrid')
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report


import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam, Adamax
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Activation, Dropout, BatchNormalization
from tensorflow.keras import regularizers

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")
print('Modules loaded')

## 2. Enable GPU Acceleration (for Mac M2)

In [None]:
# For Mac M2, enable Metal GPU acceleration
try:
    physical_devices = tf.config.list_physical_devices('GPU')
    if len(physical_devices) > 0:
        tf.config.experimental.set_memory_growth(physical_devices[0], True)
        print("GPU acceleration enabled")
except:
    print("No GPU found or GPU acceleration not available")

# Set smaller image size for better performance on 8GB RAM
IMG_SIZE = (160, 160)  # Reduced from 224x224
CHANNELS = 3
BATCH_SIZE = 32  # Reduced from 40
IMG_SHAPE = (IMG_SIZE[0], IMG_SIZE[1], CHANNELS)

## 3. Custom Callback for Training Monitoring

In [None]:
class MyCallback(keras.callbacks.Callback):
    def __init__(self, model, patience=1, stop_patience=3, threshold=0.9, factor=0.5, batches=None, epochs=None, ask_epoch=None):
        super(MyCallback, self).__init__()
        self._model = model
        self.patience = patience 
        self.stop_patience = stop_patience
        self.threshold = threshold
        self.factor = factor
        self.batches = batches
        self.epochs = epochs
        
        self.count = 0
        self.stop_count = 0
        self.best_epoch = 1
        
        try:
            self.current_lr = 0.001  # Default fallback
            if hasattr(model.optimizer, 'learning_rate'):
                lr = model.optimizer.learning_rate
                if hasattr(lr, 'numpy'):
                    self.current_lr = float(lr.numpy())
            elif hasattr(model.optimizer, 'lr'):
                lr = model.optimizer.lr
                if hasattr(lr, 'numpy'):
                    self.current_lr = float(lr.numpy())
        except:
            pass
            
        self.initial_lr = self.current_lr
        self.highest_tracc = 0.0
        self.lowest_vloss = np.inf
        self.best_weights = self._model.get_weights()
        self.initial_weights = self._model.get_weights()

    def on_train_begin(self, logs=None):
        msg = '{0:^8s}{1:^10s}{2:^9s}{3:^9s}{4:^9s}{5:^9s}{6:^9s}{7:^10s}{8:10s}{9:^8s}'.format('Epoch', 'Loss', 'Accuracy', 'V_loss', 'V_acc', 'LR', 'Next LR', 'Monitor','% Improv', 'Duration')
        print(msg)
        self.start_time = time.time()

    def on_train_end(self, logs=None):
        stop_time = time.time()
        tr_duration = stop_time - self.start_time
        hours = tr_duration // 3600
        minutes = (tr_duration - (hours * 3600)) // 60
        seconds = tr_duration - ((hours * 3600) + (minutes * 60))

        msg = f'Training elapsed time was {str(hours)} hours, {minutes:4.1f} minutes, {seconds:4.2f} seconds'
        print(msg)

        self._model.set_weights(self.best_weights)

    def on_train_batch_end(self, batch, logs=None):
        # get batch accuracy and loss
        acc = logs.get('accuracy') * 100
        loss = logs.get('loss')

        # prints over on the same line to show running batch count
        msg = '{0:20s}processing batch {1:} of {2:5s}-   accuracy=  {3:5.3f}   -   loss: {4:8.5f}'.format(' ', str(batch), str(self.batches), acc, loss)
        print(msg, '\r', end='')

    def on_epoch_begin(self, epoch, logs=None):
        self.ep_start = time.time()
    

    def _update_lr(self, new_lr):
        
        self.current_lr = new_lr
        
        # modern way to update learning rate - recreate optimizer
        optimizer_config = self._model.optimizer.get_config()
        if 'learning_rate' in optimizer_config:
            optimizer_config['learning_rate'] = new_lr
        elif 'lr' in optimizer_config:
            optimizer_config['lr'] = new_lr
            
        optimizer_name = self._model.optimizer.__class__.__name__
        new_optimizer = None
        
        # create the new optimizer with the updated learning rate
        if optimizer_name == 'Adamax':
            new_optimizer = tf.keras.optimizers.Adamax.from_config(optimizer_config)
        elif optimizer_name == 'Adam':
            new_optimizer = tf.keras.optimizers.Adam.from_config(optimizer_config)
        else:
            # For other optimizers, try generic approach
            new_optimizer = getattr(tf.keras.optimizers, optimizer_name).from_config(optimizer_config)
            
        # Compile with the new optimizer
        self._model.compile(
            optimizer=new_optimizer,
            loss=self._model.loss,
            metrics=self._model.metrics_names
        )
        
        return new_lr

    def on_epoch_end(self, epoch, logs=None):
        ep_end = time.time()
        duration = ep_end - self.ep_start

        # get current learning rate safely
        current_lr = self.current_lr
        acc = logs.get('accuracy')
        v_acc = logs.get('val_accuracy')
        loss = logs.get('loss')
        v_loss = logs.get('val_loss')
        
        # default next learning rate
        next_lr = current_lr

        if acc < self.threshold:
            monitor = 'accuracy'
            if epoch == 0:
                pimprov = 0.0
            else:
                pimprov = (acc - self.highest_tracc) * 100 / self.highest_tracc

            if acc > self.highest_tracc:
                self.highest_tracc = acc
                self.best_weights = self._model.get_weights()
                self.count = 0
                self.stop_count = 0
                if v_loss < self.lowest_vloss:
                    self.lowest_vloss = v_loss
                self.best_epoch = epoch + 1
            else:
                if self.count >= self.patience - 1:
                    next_lr = current_lr * self.factor
                    # update learning rate safely
                    self._update_lr(next_lr)
                    self.count = 0
                    self.stop_count = self.stop_count + 1
                    if v_loss < self.lowest_vloss:
                        self.lowest_vloss = v_loss
                else:
                    self.count = self.count + 1
        else:
            monitor = 'val_loss'
            if epoch == 0:
                pimprov = 0.0
            else:
                pimprov = (self.lowest_vloss - v_loss) * 100 / self.lowest_vloss

            if v_loss < self.lowest_vloss:
                self.lowest_vloss = v_loss
                self.best_weights = self._model.get_weights()
                self.count = 0
                self.stop_count = 0
                self.best_epoch = epoch + 1
            else:
                if self.count >= self.patience - 1:
                    next_lr = current_lr * self.factor
                    # update learning rate safely
                    self._update_lr(next_lr)
                    self.stop_count = self.stop_count + 1
                    self.count = 0
                else:
                    self.count = self.count + 1

                if acc > self.highest_tracc:
                    self.highest_tracc = acc

        msg = f'{str(epoch + 1):^3s}/{str(self.epochs):4s} {loss:^9.3f}{acc * 100:^9.3f}{v_loss:^9.5f}{v_acc * 100:^9.3f}{current_lr:^9.5f}{next_lr:^9.5f}{monitor:^11s}{pimprov:^10.2f}{duration:^8.2f}'
        print(msg)

        if self.stop_count > self.stop_patience - 1:
            msg = f'Training has been halted at epoch {epoch + 1} after {self.stop_patience} adjustments of learning rate with no improvement'
            print(msg)
            self.model.stop_training = True

## 4. Data Preparation Functions

In [None]:
def extract_zip_files(zip_dir, extract_dir):
    """
    Extract all zip files containing the image data
    """
    # create extract directory if it doesn't exist
    os.makedirs(extract_dir, exist_ok=True)
    
    # list of expected zip files
    expected_zips = ['healthy.zip', 'cocci.zip', 'salmo.zip', 'ncd.zip']
    
    for zip_file in expected_zips:
        zip_path = os.path.join(zip_dir, zip_file)
        if os.path.exists(zip_path):
            print(f"Extracting {zip_file}...")
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                # extract to a subfolder named after the class (removing .zip extension)
                class_name = zip_file.split('.')[0]
                class_dir = os.path.join(extract_dir, class_name)
                os.makedirs(class_dir, exist_ok=True)
                zip_ref.extractall(class_dir)
            print(f"Extracted {zip_file} to {class_dir}")
        else:
            print(f"Warning: {zip_file} not found in {zip_dir}")

def create_csv_from_directory(data_dir, output_csv_path):
    """
    Create a CSV file with image paths and labels from directory structure
    """
    filepaths = []
    labels = []
    
    # Iterate through class directories
    for class_name in os.listdir(data_dir):
        class_dir = os.path.join(data_dir, class_name)
        if os.path.isdir(class_dir):
            # Iterate through images in the class directory
            for image_file in os.listdir(class_dir):
                if image_file.endswith(('.jpg', '.jpeg', '.png')):
                    # Use relative paths so it works in different environments
                    image_path = os.path.join(class_name, image_file)
                    filepaths.append(image_path)
                    labels.append(class_name)
    

    df = pd.DataFrame({'filepaths': filepaths, 'labels': labels})
    df.to_csv(output_csv_path, index=False)
    print(f"Created CSV file with {len(df)} images")
    return df

def split_data(data_dir, csv_path):
    """
    Split the data into train, validation, and test sets
    """

    if os.path.exists(csv_path):
        df = pd.read_csv(csv_path)
        df.columns = ['filepaths', 'labels']
    else:
        df = create_csv_from_directory(data_dir, csv_path)
    
    # Add full paths
    df['filepaths'] = df['filepaths'].apply(lambda x: os.path.join(data_dir, x))
    
    # Create train df
    strat = df['labels']
    train_df, dummy_df = train_test_split(df, train_size=0.8, shuffle=True, random_state=123, stratify=strat)
    
    # Valid and test dataframe
    strat = dummy_df['labels']
    valid_df, test_df = train_test_split(dummy_df, train_size=0.5, shuffle=True, random_state=123, stratify=strat)
    
    return train_df, valid_df, test_df

## 5. Data Generator Functions

In [None]:
def create_gens(train_df, valid_df, test_df, batch_size, img_size=(160, 160)):
    """
    Create image data generators for train, validation, and test sets
    """
    channels = 3
    color = 'rgb'
    
    # Calculate test batch size
    ts_length = len(test_df)
    test_batch_size = max(sorted([ts_length // n for n in range(1, ts_length + 1) 
                                  if ts_length % n == 0 and ts_length/n <= 80]))
    
    # Function for preprocessing
    def scalar(img):
        return img
    
    # Create generators with augmentation for training
    tr_gen = ImageDataGenerator(
        preprocessing_function=scalar,
        horizontal_flip=True,
        rotation_range=20,
        width_shift_range=0.2,
        height_shift_range=0.2,
        brightness_range=[0.8, 1.2],
        zoom_range=0.2
    )
    
    ts_gen = ImageDataGenerator(preprocessing_function=scalar)
    
    # Flow from dataframes
    train_gen = tr_gen.flow_from_dataframe(
        train_df, x_col='filepaths', y_col='labels', target_size=img_size,
        class_mode='categorical', color_mode=color, shuffle=True, batch_size=batch_size
    )
    
    valid_gen = ts_gen.flow_from_dataframe(
        valid_df, x_col='filepaths', y_col='labels', target_size=img_size,
        class_mode='categorical', color_mode=color, shuffle=True, batch_size=batch_size
    )
    
    test_gen = ts_gen.flow_from_dataframe(
        test_df, x_col='filepaths', y_col='labels', target_size=img_size,
        class_mode='categorical', color_mode=color, shuffle=False, batch_size=test_batch_size
    )
    
    return train_gen, valid_gen, test_gen

def show_images(gen):
    """
    Show sample images from the generator
    """
    g_dict = gen.class_indices
    classes = list(g_dict.keys())
    images, labels = next(gen)
    
    length = len(labels)
    sample = min(length, 25)
    
    plt.figure(figsize=(20, 20))
    for i in range(sample):
        plt.subplot(5, 5, i + 1)
        image = images[i] / 255
        plt.imshow(image)
        index = np.argmax(labels[i])
        class_name = classes[index]
        plt.title(class_name, color='blue', fontsize=12)
        plt.axis('off')
    plt.show()

## 6. Visualization Functions

In [None]:
def plot_training(hist):
    """
    Plot training history
    """
    tr_acc = hist.history['accuracy']
    tr_loss = hist.history['loss']
    val_acc = hist.history['val_accuracy']
    val_loss = hist.history['val_loss']
    
    index_loss = np.argmin(val_loss)
    val_lowest = val_loss[index_loss]
    index_acc = np.argmax(val_acc)
    acc_highest = val_acc[index_acc]
    
    Epochs = [i+1 for i in range(len(tr_acc))]
    loss_label = f'best epoch= {str(index_loss + 1)}'
    acc_label = f'best epoch= {str(index_acc + 1)}'
    
    plt.figure(figsize=(20, 8))
    plt.style.use('fivethirtyeight')
    
    plt.subplot(1, 2, 1)
    plt.plot(Epochs, tr_loss, 'r', label='Training loss')
    plt.plot(Epochs, val_loss, 'g', label='Validation loss')
    plt.scatter(index_loss + 1, val_lowest, s=150, c='blue', label=loss_label)
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(Epochs, tr_acc, 'r', label='Training Accuracy')
    plt.plot(Epochs, val_acc, 'g', label='Validation Accuracy')
    plt.scatter(index_acc + 1, acc_highest, s=150, c='blue', label=acc_label)
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion Matrix', cmap=plt.cm.Blues):
    """
    Plot confusion matrix
    """
    plt.figure(figsize=(10, 10))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print('Normalized Confusion Matrix')
    else:
        print('Confusion Matrix, Without Normalization')
    
    print(cm)
    
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, cm[i, j], horizontalalignment='center', 
                     color='white' if cm[i, j] > thresh else 'black')
    
    plt.tight_layout()
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()

## 7. Model Building and Evaluation Functions

In [None]:
def build_model(class_count, img_shape=(160, 160, 3)):
    """
    Build the model (using EfficientNetB0 instead of B3 for better performance on Mac M2)
    """
    # Create pre-trained model - using B0 which is smaller than B3
    base_model = tf.keras.applications.EfficientNetB0(
        include_top=False, 
        weights="imagenet", 
        input_shape=img_shape, 
        pooling='max'
    )
    
    # Freeze base model layers
    base_model.trainable = False
    
    model = Sequential([
        base_model,
        BatchNormalization(axis=-1, momentum=0.99, epsilon=0.001),
        Dense(128, kernel_regularizer=regularizers.l2(0.016),  # Fixed: removed 'l=' parameter
              activity_regularizer=regularizers.l1(0.006),
              bias_regularizer=regularizers.l1(0.006), activation='relu'),
        Dropout(rate=0.45, seed=123),
        Dense(class_count, activation='softmax')
    ])
    
    model.compile(
        Adamax(learning_rate=0.001), 
        loss='categorical_crossentropy', 
        metrics=['accuracy']
    )
    
    return model

def train_model(model, train_gen, valid_gen, epochs=20):
    """
    Train the model with custom callback
    """
    patience = 1
    stop_patience = 3
    threshold = 0.9
    factor = 0.5
    ask_epoch = 5
    batch_size = BATCH_SIZE
    batches = int(np.ceil(len(train_gen.labels) / batch_size))
    
    callbacks = [MyCallback(
        model=model, patience=patience, stop_patience=stop_patience, 
        threshold=threshold, factor=factor, batches=batches, 
        epochs=epochs, ask_epoch=ask_epoch
    )]
    
    history = model.fit(
        x=train_gen, epochs=epochs, verbose=0, callbacks=callbacks,
        validation_data=valid_gen, validation_steps=None, shuffle=False
    )
    
    return history

def evaluate_model(model, test_gen):
    """
    Evaluate model on test set and display results
    """
    # Reset generator to start
    test_gen.reset()
    
    # Predict
    predictions = model.predict(test_gen, steps=len(test_gen), verbose=1)
    predicted_classes = np.argmax(predictions, axis=1)
    
    # Get true classes
    true_classes = test_gen.classes
    class_labels = list(test_gen.class_indices.keys())
    
    # Create confusion matrix
    cm = confusion_matrix(true_classes, predicted_classes)
    plot_confusion_matrix(cm, class_labels, title='Confusion Matrix')
    
    # Print classification report
    print('\nClassification Report')
    print(classification_report(true_classes, predicted_classes, target_names=class_labels))
    
    # Calculate overall accuracy
    accuracy = np.sum(predicted_classes == true_classes) / len(true_classes)
    print(f'\nOverall Accuracy: {accuracy:.4f}')

## 8. Data Preparation and Model Training

In [None]:
# Set your directories here
ZIP_DIR = "chicken_feces_Zips"  
EXTRACT_DIR = "extracted_images"
CSV_PATH = "poultry_data.csv"

# Step 1: Extract zip files if needed
print("Step 1: Extract zip files if needed")
if not os.path.exists(EXTRACT_DIR) or len(os.listdir(EXTRACT_DIR)) < 4:
    extract_zip_files(ZIP_DIR, EXTRACT_DIR)

    

In [None]:
# Set directories
ZIP_DIR = "chicken_feces_Zips"  
EXTRACT_DIR = "extracted_images"
CSV_PATH = "poultry_data.csv"


print("\nPrepare data")
def create_fixed_csv():
    filepaths = []
    labels = []
    
    # Function to recursively find all image files in a directory and its subdirectories
    def find_images_recursive(base_dir, class_name):
        count = 0
        for root, dirs, files in os.walk(base_dir):
            for file in files:
                if file.lower().endswith(('.jpg', '.jpeg', '.png')):
                    file_path = os.path.join(root, file)
                    filepaths.append(file_path)
                    labels.append(class_name)
                    count += 1
        return count
    
    # Process each class directory
    total_count = 0
    for class_name in ['healthy', 'cocci', 'salmo', 'ncd']:
        class_dir = os.path.join(EXTRACT_DIR, class_name)
        if os.path.isdir(class_dir):
            print(f"Processing {class_name} directory...")
            found = find_images_recursive(class_dir, class_name)
            print(f"  Found {found} images in {class_name}")
            total_count += found
    
    # Create DataFrame
    df = pd.DataFrame({'filepaths': filepaths, 'labels': labels})
    
    # Save to CSV
    df.to_csv(CSV_PATH, index=False)
    print(f"Created CSV file with {len(df)} images ({total_count} total)")
    return df


full_df = create_fixed_csv()

if len(full_df) > 0:
    # Split into train and temp dfs
    strat = full_df['labels']
    train_df, dummy_df = train_test_split(full_df, train_size=0.8, shuffle=True, random_state=123, stratify=strat)
    
    # Split temp into valid and test
    strat = dummy_df['labels']
    valid_df, test_df = train_test_split(dummy_df, train_size=0.5, shuffle=True, random_state=123, stratify=strat)
    
    print(f"Training samples: {len(train_df)}")
    print(f"Validation samples: {len(valid_df)}")
    print(f"Test samples: {len(test_df)}")
    

    print("\nCreate generators")
    train_gen, valid_gen, test_gen = create_gens(train_df, valid_df, test_df, BATCH_SIZE, img_size=IMG_SIZE)
    
    print("\nBuilding the model")
    class_count = len(train_gen.class_indices)
    model = build_model(class_count, img_shape=IMG_SHAPE)
    model.summary()


    print("\nTrain the model")
    EPOCHS = 20  # Set your desired epochs
    history = train_model(model, train_gen, valid_gen, epochs=EPOCHS)


    print("\nPlot training history")
    plot_training(history)


    print("\nEvaluate model on test set")
    evaluate_model(model, test_gen)


    print("\n Save the model")
    model.save('poultry_disease_model.h5')
    print("Model saved successfully!")

else:
    print("No images found! Make sure the zip files were properly extracted with image files inside.")
    print("Check the following directories:")
    for class_name in ['healthy', 'cocci', 'salmo', 'ncd']:
        path = os.path.join(EXTRACT_DIR, class_name)
        if os.path.exists(path):
            print(f"  • {path} exists")
            # List some files in this directory
            files = os.listdir(path)
            print(f"    - Contains {len(files)} files/directories")
            if files:
                print(f"    - Sample items: {files[:5]}")
        else:
            print(f"  • {path} does not exist")