In [3]:
# import os
# import zipfile
# import cv2
# import numpy as np
# from tensorflow.keras.preprocessing.image import ImageDataGenerator9
# from tensorflow import keras
# from tensorflow.keras import layers
# from sklearn.model_selection import train_test_split
# import matplotlib.pyplot as plt  # For displaying images if desired

In [4]:
# zip_file_name = "crime.zip"  # Ensure this file is in your working directory
# with zipfile.ZipFile(zip_file_name, 'r') as zip_ref:
#     zip_ref.extractall("./")  # Extract in the current directory

# print("Dataset extracted successfully!")

In [1]:
import os
import zipfile
import cv2
import numpy as np
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt  # For displaying images if desired

In [2]:
datagen = ImageDataGenerator(rotation_range=20, horizontal_flip=True, zoom_range=0.2)

def augment_images(folder, target_count):
    all_files = os.listdir(folder)
    img_list = [cv2.imread(os.path.join(folder, file)) for file in all_files]

    img_list = np.array([cv2.resize(img, (64, 64)) for img in img_list if img is not None])
    img_list = img_list / 255.0  # Normalize

    i = 0
    for img in datagen.flow(img_list, batch_size=1, save_to_dir=folder, save_prefix="aug", save_format="png"):
        i += 1
        if len(os.listdir(folder)) >= target_count:
            break
        
        # Calculate the number of images in each folder outside of the function
        anomaly_count = len(os.listdir("crime/Anomaly"))
        normal_count = len(os.listdir("crime/Normal"))
        max_count = max(anomaly_count, normal_count)

        augment_images("crime/Anomaly", max_count)
        augment_images("crime/Normal", max_count)

In [3]:

print(len(os.listdir("crime/Anomaly")), len(os.listdir("crime/Normal")))

1889 793


In [4]:
IMG_SIZE = (64, 64)
BATCH_SIZE = 32

def load_images_from_folder(folder, label):
    images = []
    labels = []
    for filename in os.listdir(folder):
        img_path = os.path.join(folder, filename)
        img = cv2.imread(img_path)
        if img is not None:
            img = cv2.resize(img, IMG_SIZE)
            img = img / 255.0  # Normalize
            images.append(img)
            labels.append(label)
    return images, labels

anomaly_path = 'crime/Anomaly'
normal_path = 'crime/Normal'
anomaly_images, anomaly_labels = load_images_from_folder(anomaly_path, 1)
normal_images, normal_labels = load_images_from_folder(normal_path, 0)

X = np.array(anomaly_images + normal_images)
y = np.array(anomaly_labels + normal_labels)

# Reduce dataset size to avoid memory error (optional: change 10000 to 5000 or smaller)
# X = X[:2000]
# y = y[:2000]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [5]:
model = keras.Sequential([
    layers.Conv2D(32, (3, 3), activation='relu', input_shape=(IMG_SIZE[0], IMG_SIZE[1], 3)),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),
    layers.Conv2D(64, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),
    layers.Conv2D(128, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Dropout(0.25),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=20, validation_data=(X_test, y_test), batch_size=BATCH_SIZE)
model.save('anomaly_detector.keras')

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/20
[1m68/68[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 52ms/step - accuracy: 0.7303 - loss: 0.5402 - val_accuracy: 1.0000 - val_loss: 8.0133e-04
Epoch 2/20
[1m68/68[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 46ms/step - accuracy: 0.9990 - loss: 0.0041 - val_accuracy: 1.0000 - val_loss: 2.4735e-05
Epoch 3/20
[1m68/68[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 54ms/step - accuracy: 1.0000 - loss: 0.0018 - val_accuracy: 1.0000 - val_loss: 1.2605e-05
Epoch 4/20
[1m68/68[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 75ms/step - accuracy: 0.9977 - loss: 0.0039 - val_accuracy: 0.9069 - val_loss: 0.1266
Epoch 5/20
[1m68/68[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 75ms/step - accuracy: 0.9687 - loss: 0.1094 - val_accuracy: 1.0000 - val_loss: 1.1143e-06
Epoch 6/20
[1m68/68[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 66ms/step - accuracy: 0.9993 - loss: 0.0037 - val_accuracy: 1.0000 - val_loss: 3.4617e-05
Epoch 7/20
[1

In [6]:
def predict_and_display_frame(video_path, model, frame_interval=10):
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    first_frame = None
    detected_anomaly = False
    output_frame = None

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % frame_interval == 0:
            if first_frame is None:
                first_frame = frame.copy()

            frame_resized = cv2.resize(frame, IMG_SIZE)
            frame_norm = frame_resized / 255.0
            input_frame = np.expand_dims(frame_norm, axis=0)
            prediction = model.predict(input_frame)[0][0]

            if prediction > 0.5:
                output_frame = frame.copy()
                cv2.putText(output_frame, "Anomaly", (10, 30),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
                detected_anomaly = True
                break
        frame_count += 1

    cap.release()

    if not detected_anomaly:
        if first_frame is not None:
            output_frame = first_frame.copy()
            cv2.putText(output_frame, "Normal", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        else:
            print("No frames processed. Check the video file.")
            return

    # Display the frame. For a GUI window:
    cv2.imshow("Output", output_frame)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
    # Alternatively, to display inline (e.g., in a Jupyter Notebook):
    # plt.imshow(cv2.cvtColor(output_frame, cv2.COLOR_BGR2RGB))
    # plt.axis("off")
    # plt.show()

In [7]:
video_path1 = "crime/Shoplifting005_x264.mp4"
cap = cv2.VideoCapture(video_path1)
if not cap.isOpened():
    print("Error: Could not open video.")
else:
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Total frames in video: {total_frames}")
cap.release()

Total frames in video: 1967


In [8]:
video_path2 = "crime/Shoplifting009_x264.mp4"
predict_and_display_frame(video_path2, model)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 118ms/step
