<a href="https://colab.research.google.com/github/abdohamdyy/Leaf-Disease-Classification-using-Pretrained-ResNet-50/blob/main/2DCNN%2BLSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
!unzip /content/drive/MyDrive/Shop_DataSet.zip -d /content/drive/MyDrive/Shop_DataSet

Archive:  /content/drive/MyDrive/Shop_DataSet.zip
   creating: /content/drive/MyDrive/Shop_DataSet/Shop DataSet/non shop lifters/
  inflating: /content/drive/MyDrive/Shop_DataSet/Shop DataSet/non shop lifters/shop_lifter_n_0.mp4  
  inflating: /content/drive/MyDrive/Shop_DataSet/Shop DataSet/non shop lifters/shop_lifter_n_0_1.mp4  
  inflating: /content/drive/MyDrive/Shop_DataSet/Shop DataSet/non shop lifters/shop_lifter_n_1.mp4  
  inflating: /content/drive/MyDrive/Shop_DataSet/Shop DataSet/non shop lifters/shop_lifter_n_1_1.mp4  
  inflating: /content/drive/MyDrive/Shop_DataSet/Shop DataSet/non shop lifters/shop_lifter_n_10.mp4  
  inflating: /content/drive/MyDrive/Shop_DataSet/Shop DataSet/non shop lifters/shop_lifter_n_10_1.mp4  
  inflating: /content/drive/MyDrive/Shop_DataSet/Shop DataSet/non shop lifters/shop_lifter_n_100.mp4  
  inflating: /content/drive/MyDrive/Shop_DataSet/Shop DataSet/non shop lifters/shop_lifter_n_100_1.mp4  
  inflating: /content/drive/MyDrive/Shop_DataSet

In [12]:
import cv2
import numpy as np
import os
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import Sequence
import multiprocessing
from functools import partial

def extract_frames(video_path, target_frames=10, target_size=(112, 112)):
    """Extract a fixed number of frames from a video, with reduced resolution."""
    frames = []
    video = cv2.VideoCapture(video_path)
    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))

    if total_frames == 0:
        return None

    step = max(total_frames // target_frames, 1)
    frame_indices = range(0, min(total_frames, target_frames * step), step)

    for i in frame_indices:
        video.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = video.read()
        if ret:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = cv2.resize(frame, target_size)
            frames.append(frame)

    video.release()

    # Pad or truncate to ensure we have exactly target_frames
    if len(frames) < target_frames:
        last_frame = frames[-1]
        frames.extend([last_frame] * (target_frames - len(frames)))
    elif len(frames) > target_frames:
        frames = frames[:target_frames]

    return np.array(frames)

def process_video(video_path, label, target_frames=10, target_size=(112, 112)):
    """Process a single video and return frames and label."""
    frames = extract_frames(video_path, target_frames, target_size)
    return frames, label

def load_video_paths(data_dir):
    """Load video paths and labels from the directory structure."""
    video_paths = []
    labels = []
    for label in ['shop_lifters', 'non_shop_lifters']:
        label_dir = os.path.join(data_dir, label)
        if not os.path.isdir(label_dir):
            print(f"Warning: {label_dir} not found.")
            continue
        for video_file in os.listdir(label_dir):
            if video_file.endswith(('.mp4', '.avi', '.mov')):
                video_paths.append(os.path.join(label_dir, video_file))
                labels.append(label)
    return video_paths, labels

class VideoDataGenerator(Sequence):
    def __init__(self, video_paths, labels, batch_size=32, target_frames=10, target_size=(112, 112)):
        self.video_paths = video_paths
        self.labels = labels
        self.batch_size = batch_size
        self.target_frames = target_frames
        self.target_size = target_size
        self.label_encoder = LabelEncoder()
        self.encoded_labels = self.label_encoder.fit_transform(self.labels)

    def __len__(self):
        return int(np.ceil(len(self.video_paths) / float(self.batch_size)))
    def __getitem__(self, idx):
        batch_paths = self.video_paths[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_labels = self.encoded_labels[idx * self.batch_size:(idx + 1) * self.batch_size]

        # Create a list of tuples (video_path, label) for the batch
        batch_data = list(zip(batch_paths, [self.labels[i] for i in batch_labels]))

        with multiprocessing.Pool() as pool:
            # Use pool.starmap to unpack the tuples and provide both arguments to process_video
            results = pool.starmap(partial(process_video,
                                          target_frames=self.target_frames,
                                          target_size=self.target_size),
                                  batch_data)

        batch_frames, _ = zip(*results)
        batch_frames = np.array(batch_frames).astype('float32') / 255.0
        batch_labels = to_categorical(batch_labels)

        return batch_frames, batch_labels

# Example usage:
data_dir = '/content/drive/MyDrive/Shop_DataSet/Shop_DataSet'  # Should contain 'fall' and 'no_fall' subfolders
video_paths, labels = load_video_paths(data_dir)

# Create train and validation generators
from sklearn.model_selection import train_test_split

train_paths, val_paths, train_labels, val_labels = train_test_split(video_paths, labels, test_size=0.2, random_state=42)

train_generator = VideoDataGenerator(train_paths, train_labels)
val_generator = VideoDataGenerator(val_paths, val_labels)

print("Number of training samples:", len(train_paths))
print("Number of validation samples:", len(val_paths))

# Now you can use these generators with your model
# model.fit(train_generator, validation_data=val_generator, epochs=50, ...)

Number of training samples: 684
Number of validation samples: 171


In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, LSTM, Dense, Dropout, Flatten, TimeDistributed
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint



def create_model(input_shape, num_classes):
    model = Sequential([
        TimeDistributed(Conv2D(32, (3, 3), activation='relu'), input_shape=input_shape),
        TimeDistributed(MaxPooling2D((2, 2))),
        TimeDistributed(Conv2D(64, (3, 3), activation='relu')),
        TimeDistributed(MaxPooling2D((2, 2))),
        TimeDistributed(Conv2D(64, (3, 3), activation='relu')),
        TimeDistributed(MaxPooling2D((2, 2))),
        TimeDistributed(Flatten()),
        LSTM(100, return_sequences=True),
        LSTM(100),
        Dense(64, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    return model

# Set up model parameters
input_shape = (10, 112, 112, 3)  # (frames, height, width, channels)
num_classes = 2  # fall and no_fall

# Create the model
model = create_model(input_shape, num_classes)

# Compile the model
model.compile(optimizer=Adam(learning_rate=0.001),
              loss='categorical_crossentropy',
              metrics=['accuracy'])
# Define callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
#model_checkpoint = ModelCheckpoint('best_model.keras', save_best_only=True, monitor='val_accuracy') # Change .h5 to .keras

# Train the model
history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=50,
    callbacks=[early_stopping],
)



Epoch 1/50


  self._warn_if_super_not_called()


[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m532s[0m 18s/step - accuracy: 0.5507 - loss: 0.6922 - val_accuracy: 0.6082 - val_loss: 0.6702
Epoch 2/50
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m472s[0m 16s/step - accuracy: 0.6356 - loss: 0.6655 - val_accuracy: 0.6082 - val_loss: 0.6688
Epoch 3/50
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m468s[0m 16s/step - accuracy: 0.6335 - loss: 0.6649 - val_accuracy: 0.6082 - val_loss: 0.6718
Epoch 4/50
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m469s[0m 16s/step - accuracy: 0.6375 - loss: 0.6619 - val_accuracy: 0.6082 - val_loss: 0.6710
Epoch 5/50
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m515s[0m 18s/step - accuracy: 0.6499 - loss: 0.6634 - val_accuracy: 0.6082 - val_loss: 0.6653
Epoch 6/50
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m507s[0m 16s/step - accuracy: 0.6549 - loss: 0.6421 - val_accuracy: 0.6082 - val_loss: 0.7182
Epoch 7/50
[1m22/22[0m [32m━━━━━━━━━

In [None]:
# Evaluate the model
val_loss, val_accuracy = model.evaluate(val_generator)
print(f"Validation Loss: {val_loss:.4f}")
print(f"Validation Accuracy: {val_accuracy:.4f}")

# Plot training history
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()