In [54]:
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import os
from glob import glob
import cv2
import numpy as np
from scipy.fft import fft

In [55]:
# Load labeled frames and their corresponding labels
def load_labeled_frames(data_folder):
    images = []
    labels = []

    for image_path in glob(os.path.join(data_folder, '*.jpg')):
        image = cv2.imread(image_path)
        image = cv2.resize(image, (30, 30))

        label = os.path.splitext(os.path.basename(image_path))[0].split('_')[-1]

        images.append(image)

        if label == '0':
            labels.append(0)
        elif label == '1':
            labels.append(1)

    return images, labels

# Preprocess the data
def preprocess_data(images, labels):
    images = np.array(images)
    labels = np.array(labels)
    
    # duplicate the images and labels
    images = np.concatenate((images, images), axis=0)
    labels = np.concatenate((labels, labels), axis=0)

    # Normalize pixel values to be between 0 and 1
    images = images / 255.0

    # Shuffle and split the data into training and testing sets
    images, labels = shuffle(images, labels, random_state=46)
    X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=45)

    return X_train, X_test, y_train, y_test

In [56]:
# Example usage
output_folder = 'labeled_frames_binary_test'

# Load labeled frames and their corresponding labels
images, labels = load_labeled_frames(output_folder)

# Preprocess the data
X_train, X_test, y_train, y_test = preprocess_data(images, labels)

input_shape = X_train[0].shape
print(input_shape)

(30, 30, 3)


In [57]:
y_train

array([1, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1,
       1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 0, 1, 0, 1,
       1, 1, 1, 1, 0, 1, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1,
       1, 1, 0, 0, 1, 1, 0, 1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0])

In [105]:
# Build the model
model = models.Sequential()
model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
model.add(layers.MaxPooling2D())
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D())
model.add(layers.Conv2D(128, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D())
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))

# Compile the model
model.compile(optimizer=Adam(),
              loss='binary_crossentropy',  # Use binary crossentropy for binary classification
              metrics=['accuracy', 'AUC'])

# Train the model
history = model.fit(X_train, y_train, epochs=30, validation_data=(X_test, y_test))


Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


In [58]:
# Evaluate the model
test_loss, test_acc, test_auc = model.evaluate(X_test, y_test, verbose=2)

print('\nTest AUC:', test_auc)

1/1 - 3s - loss: 0.0059 - accuracy: 1.0000 - auc: 1.0000 - 3s/epoch - 3s/step

Test AUC: 1.0


In [59]:
framelist = []
data_folder = 'labeled_frames_fixed'
video = './vidscapstone/vid10.mp4'

for image_path in glob(os.path.join(data_folder, '*.jpg')):
    framelist.append(image_path)

# framelist = [
#              'labeled_frames_fixed\\vid9.mp4_00054_2.jpg', 
#              'labeled_frames_fixed\\vid9.mp4_00047_0.jpg', 
#              'labeled_frames_fixed\\vid9.mp4_00053_3.jpg', 
#              'labeled_frames_fixed\\vid9.mp4_00046_2.jpg',
#              ]
    
# Load model
model = tf.keras.models.load_model('testing_binary_class_classification/testing_binary_class_classification_model_Adam.h5')

In [60]:
def extract_blinking_lights(img):

    if img is None:
        print(f"Error: Unable to load image from {image_path}")
    else:
        # Convert the image to grayscale
        grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        
        # Apply thresholding to highlight the lights
        _, thresholded = cv2.threshold(grey, 200, 255, cv2.THRESH_BINARY)
        
        # Find contours in the thresholded image
        contours, _ = cv2.findContours(thresholded, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        # Extract ROIs based on contours
        rois = []
        for contour in contours:
            x, y, w, h = cv2.boundingRect(contour)
            w = w + 20
            h = h + 20
            x = x - 10
            y = y - 10
            
            rois.append((x,y,w,h))
        
        return rois

def are_similar_coordinates(coord1, coord2):
    x1, y1, w1, h1 = coord1
    x2, y2, w2, h2 = coord2
    
    # Adjust these threshold values based on your requirements
    position_threshold = 50  # Adjust as needed
    size_threshold = 50  # Adjust as needed
    
    # Check if the coordinates are similar in terms of position and size
    position_diff = abs(x1 - x2) + abs(y1 - y2)
    size_diff = abs(w1 - w2) + abs(h1 - h2)
    
    return position_diff < position_threshold and size_diff < size_threshold
    

In [61]:
merged_rois = []
all_rois = []

cap = cv2.VideoCapture(video)

fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

# Calculate the duration of the video in seconds
video_duration = total_frames / fps

print(f"Video Duration: {video_duration:.2f} seconds")

framelistVideo = []

if not cap.isOpened():
    print("Error: Could not open the video file.")
    exit()

while True:
    ret, frame = cap.read()

    if not ret:
        break

    img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    rois = extract_blinking_lights(img)
    for roi in rois:
        all_rois.append(roi)
    
    framelistVideo.append(img)

if all_rois is not None:
    for roi_coordinates in all_rois:
        add_to_merged = True

        for merged_roi_coordinates in merged_rois:
            if are_similar_coordinates(roi_coordinates, merged_roi_coordinates):
                add_to_merged = False
                break
        
        if add_to_merged:
            merged_rois.append(roi_coordinates)

Video Duration: 3.13 seconds


In [62]:
rois_in_frames = []
for img in framelistVideo:
    on_off = []
    breaker = False
    for roi_coordinates in merged_rois:
        x, y, w, h = roi_coordinates
        x = x - 15
        y = y - 15
        w = w + 30
        h = h + 30
        
        cv2.rectangle(img, (x, y), (x+w, y+h), (0, 255, 0), 2)
        
        # Extract the ROI
        roi = img[y:y+h, x:x+w]

        # Convert the ROI to grayscale and apply thresholding
        grey = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
        
        # Apply thresholding to highlight the lights
        _, thresholded = cv2.threshold(grey, 150, 255, cv2.THRESH_BINARY)
        
        # Find contours in the thresholded image
        contours, _ = cv2.findContours(thresholded, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        if len(contours) == 0:
            on_off.append(0)
        else:
            on_off.append(1)
    rois_in_frames.append(on_off)
print(rois_in_frames)


[[1, 1, 1, 0], [1, 1, 0, 1], [1, 1, 1, 1], [0, 1, 1, 1], [1, 0, 1, 1], [1, 1, 1, 1], [1, 1, 1, 0], [1, 1, 0, 1], [1, 1, 1, 1], [0, 1, 1, 1], [1, 0, 1, 1], [1, 1, 1, 1], [1, 1, 1, 0], [1, 1, 0, 1], [1, 1, 1, 1], [0, 1, 1, 1], [1, 0, 1, 1], [1, 1, 1, 1], [1, 1, 1, 0], [1, 1, 0, 1], [1, 1, 1, 1], [0, 1, 1, 1], [1, 0, 1, 1], [1, 1, 1, 1], [1, 1, 1, 0], [1, 1, 0, 1], [1, 1, 1, 1], [0, 1, 1, 1], [1, 0, 1, 1], [1, 1, 1, 1], [1, 1, 1, 0], [1, 1, 0, 1], [1, 1, 1, 1], [0, 1, 1, 1], [1, 0, 1, 1], [1, 1, 1, 1], [1, 1, 1, 0], [1, 1, 0, 1], [1, 1, 1, 1], [0, 1, 1, 1], [1, 0, 1, 1], [1, 1, 1, 1], [1, 1, 1, 0], [1, 1, 0, 1], [1, 1, 1, 1], [0, 1, 1, 1], [1, 0, 1, 1], [1, 1, 1, 1], [1, 1, 1, 0], [1, 1, 0, 1], [1, 1, 1, 1], [0, 1, 1, 1], [1, 0, 1, 1], [1, 1, 1, 1], [1, 1, 1, 0], [1, 1, 0, 1], [1, 1, 1, 1], [0, 1, 1, 1], [1, 0, 1, 1], [1, 1, 1, 1], [1, 1, 1, 0], [1, 1, 0, 1], [1, 1, 1, 1], [0, 1, 1, 1], [0, 0, 1, 1], [1, 0, 1, 1], [1, 1, 1, 0], [1, 1, 0, 1], [1, 1, 1, 1], [0, 1, 1, 1], [1, 0, 1, 1], [1, 1

In [63]:
# Transpose the array using zip
transposed_array = list(zip(*rois_in_frames))

for i, separated_array in enumerate(transposed_array):
    filtered_tuple = [separated_array[0]] 

    for i in range(1, len(separated_array)):
        if separated_array[i] != separated_array[i - 1]:
            filtered_tuple.append(separated_array[i])
            
    frequency = filtered_tuple.count(1)/video_duration
    print(f"Frequency of light {i}: {frequency}")

Frequency of light 93: 5.1063829787234045
Frequency of light 93: 4.787234042553192
Frequency of light 93: 5.425531914893617
Frequency of light 93: 5.1063829787234045


In [64]:
print(f'Number of blinking lights: {len(merged_rois)}')

#print all coordinates
for i, coordinates in enumerate(merged_rois):
    print(coordinates)
    #use coordinates to create image roi
    x, y, w, h = coordinates
    #extract from original image
    img = cv2.imread(framelist[0])
    roi = img[y:y+h, x:x+w]
    #display image
    cv2.imshow(f'ROI {i}', roi)

cv2.waitKey(0)
cv2.destroyAllWindows()

Number of blinking lights: 4
(1374, 773, 27, 27)
(458, 283, 27, 27)
(1403, 280, 23, 24)
(504, 824, 23, 22)


In [81]:
import cv2
import numpy as np
from scipy.fft import fft
import concurrent.futures

# Function to process each ROI
def process_roi(roi):
    x, y, w, h = roi
    roi_processed = frame[y:y+h, x:x+w]
    roi_processed = cv2.resize(roi_processed, (30, 30), interpolation=cv2.INTER_LINEAR)
    roi_processed = cv2.convertScaleAbs(roi_processed)
    roi_processed = np.array(roi_processed)
    roi_processed = roi_processed / 255.0
    roi_processed = np.expand_dims(roi_processed, axis=0)
    return roi_processed

# Function to calculate frequency using FFT
def calculate_frequency(signal, fps):
    threshold = np.mean(signal)
    
    # Find peaks (zero-crossings) in the signal
    peaks = np.where((signal[:-1] <= threshold) & (signal[1:] > threshold))[0]
    
    # Calculate time intervals between peaks
    time_intervals = np.diff(peaks) / fps
    
    # Skip frequency calculation if there are not enough data points
    if len(time_intervals) <= 1:
        return 0.0
    
    # Calculate the average frequency
    avg_frequency = 1 / np.mean(time_intervals)
    
    return avg_frequency

# Open the video file
video_path = './vidscapstone/vid10.mp4'
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print("Error opening video file")
    exit()

fps = cap.get(cv2.CAP_PROP_FPS)  # Get the frames per second from the video

# Initialize roi_frequencies for each ROI
roi_frequencies = {i: [] for i in range(len(merged_rois))}
reset_interval = 100 

frame_count = 0

while True:
    ret, frame = cap.read()

    if not ret:
        break

    # Process ROIs in parallel
    with concurrent.futures.ThreadPoolExecutor() as executor:
        rois_processed = list(executor.map(process_roi, merged_rois))

    rois_processed = np.vstack(rois_processed)  # Stack ROIs to create a batch

    # Calculate the intensity for each ROI
    for i, roi_coordinates in enumerate(merged_rois):
        x, y, w, h = roi_coordinates
        roi_intensity = np.mean(frame[y:y+h, x:x+w])
        roi_frequencies[i].append(roi_intensity)

    # Calculate and print the frequency for each ROI
    for i, frequencies in roi_frequencies.items():
        avg_frequency = calculate_frequency(frequencies, fps)
        print(f'Frequency for ROI {i}: {avg_frequency} Hz')

    frame_count += 1

    # Reset frequencies after a certain number of frames
    if frame_count % reset_interval == 0:
        roi_frequencies = {i: [] for i in range(len(merged_rois))}

cap.release()

# Close all OpenCV windows
cv2.destroyAllWindows()


Frequency for ROI 0: 0.0 Hz
Frequency for ROI 1: 0.0 Hz
Frequency for ROI 2: 0.0 Hz
Frequency for ROI 3: 0.0 Hz
Frequency for ROI 0: 0.0 Hz
Frequency for ROI 1: 0.0 Hz
Frequency for ROI 2: 0.0 Hz
Frequency for ROI 3: 0.0 Hz
Frequency for ROI 0: 0.0 Hz
Frequency for ROI 1: 0.0 Hz
Frequency for ROI 2: 0.0 Hz
Frequency for ROI 3: 0.0 Hz
Frequency for ROI 0: 0.0 Hz
Frequency for ROI 1: 0.0 Hz
Frequency for ROI 2: 0.0 Hz
Frequency for ROI 3: 0.0 Hz
Frequency for ROI 0: 0.0 Hz
Frequency for ROI 1: 0.0 Hz
Frequency for ROI 2: 0.0 Hz
Frequency for ROI 3: 0.0 Hz
Frequency for ROI 0: 0.0 Hz
Frequency for ROI 1: 0.0 Hz
Frequency for ROI 2: 0.0 Hz
Frequency for ROI 3: 0.0 Hz
Frequency for ROI 0: 0.0 Hz
Frequency for ROI 1: 0.0 Hz
Frequency for ROI 2: 0.0 Hz
Frequency for ROI 3: 0.0 Hz
Frequency for ROI 0: 0.0 Hz
Frequency for ROI 1: 0.0 Hz
Frequency for ROI 2: 0.0 Hz
Frequency for ROI 3: 0.0 Hz
Frequency for ROI 0: 0.0 Hz
Frequency for ROI 1: 0.0 Hz
Frequency for ROI 2: 0.0 Hz
Frequency for ROI 3:

In [80]:
import pickle

directory = 'testing_binary_class_classification/'

model.save(directory + 'testing_binary_class_classification_model_Adam.h5')

with open(directory + 'testing_binary_class_classification_model_Adam_history', 'wb') as file_pi:
    pickle.dump(history, file_pi)

with open(directory + 'testing_binary_class_classification_model_Adam_X_test', 'wb') as file_pi:
    pickle.dump(X_test, file_pi)

with open(directory + 'testing_binary_class_classification_model_Adam_y_test', 'wb') as file_pi:
    pickle.dump(y_test, file_pi)

  saving_api.save_model(
