In [None]:
import tensorflow as tf
from tensorflow.keras.models import load_model
import numpy as np
import cv2
from flask import Flask
from flask_socketio import SocketIO
from flask_cors import CORS
import eventlet
import time
import base64

In [None]:
cam_port = 0
mean = 128.33125
std = 15.842568


def preprocess_image(img, top, bottom, left, right): # img is a np array (captured image)
        # Crop center 256x256 pixel array
        cropped = img[top:bottom, left:right]

        # Change to grayscale
        gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)

        # Scale the image to (128, 128)
        resized = cv2.resize(gray, (0,0), fx = 0.5, fy = 0.5)

        # Scale pixel values to [-1, 1]
        normalized = (resized - mean) / std

        # Add batch size (1) and channel dimension (grayscale is 1 channel)
        processed_image = np.expand_dims(normalized, axis=(0, -1))  # Shape: (1, 128, 128, 1)

        return processed_image


def main():
    cam = cv2.VideoCapture(cam_port)

    modelPath = '../models/COSC307_limited_data_CNN2.keras'

    # Load the trained Keras model
    model = load_model(modelPath)  # Replace with the path to your model file

    valid, frame = cam.read()

    frameSize = frame.shape
    x = frameSize[1]
    y = frameSize[0]

    left = int((x - 256) / 2)
    right = left + 256
    top = int((y - 256) / 2)
    bottom = top + 256


    start_point = (left, top)    # top left corner of box
    end_point = (right, bottom)      # bottom right corner of box
    color = (0, 0, 255)         # red (B,G,R)
    thickness = 2

    counter = 0
    while True:

        valid, frame = cam.read()   # frame holds the image object (np array)

        # rectangle is 256x256
        rectangleOverlay = cv2.rectangle(frame, start_point, end_point, color, thickness)
        cv2.imwrite("../image_assets/frameRect.png", rectangleOverlay)

        cv2.imshow("Current Frame", rectangleOverlay)

        if counter >= 5:
            counter = 0

            processed_image = preprocess_image(frame, top, bottom, left, right)

            # Make a prediction
            prediction = model.predict(processed_image, batch_size=1)

            letters = ['P','Y','R','O','V']
            maxIndex = prediction[0].argmax()

            print(f"Most Likely:  {letters[maxIndex]}, {int(prediction[0][maxIndex] * 100)}% chance")
        
        else:
            counter += 1

        if cv2.waitKey(1) == ord('q'):
            break


    cam.release()
    cv2.destroyAllWindows()


main()

In [None]:
import tensorflow as tf
from tensorflow.keras.models import load_model
import numpy as np
import cv2
from flask import Flask
from flask_socketio import SocketIO
from flask_cors import CORS
from gevent import monkey
import time
import base64

app = Flask(__name__)
CORS(app,resources={r"/*":{"origins":"*"}})
socketio = SocketIO(app, cors_allowed_origins="*")  # Allow any frontend to connect

send_data_flag = True

cam_port = 0
mean = 128.33125
std = 15.842568


def preprocess_image(img, top, bottom, left, right): # img is a np array (captured image)
        # Crop center 256x256 pixel array
        cropped = img[top:bottom, left:right]

        # Change to grayscale
        gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)

        # Scale the image to (128, 128)
        resized = cv2.resize(gray, (0,0), fx = 0.5, fy = 0.5)

        # Scale pixel values to [-1, 1]
        normalized = (resized - mean) / std

        # Add batch size (1) and channel dimension (grayscale is 1 channel)
        processed_image = np.expand_dims(normalized, axis=(0, -1))  # Shape: (1, 128, 128, 1)

        return processed_image


def encode_image_to_base64(image):
    _, buffer = cv2.imencode('.png', image)  # Encode as PNG
    byte_image = buffer.tobytes()  # Convert to byte array
    encoded_image = base64.b64encode(byte_image).decode('utf-8')  # Base64 encode
    return encoded_image


def send_data():
    cam = cv2.VideoCapture(cam_port)

    modelPath = '../models/COSC307_limited_data_CNN2.keras'

    # Load the trained Keras model
    model = load_model(modelPath)  # Replace with the path to your model file

    valid, frame = cam.read()

    frameSize = frame.shape
    x = frameSize[1]
    y = frameSize[0]

    left = int((x - 256) / 2)
    right = left + 256
    top = int((y - 256) / 2)
    bottom = top + 256


    start_point = (left, top)    # top left corner of box
    end_point = (right, bottom)      # bottom right corner of box
    color = (0, 0, 255)         # red (B,G,R)
    thickness = 2

    counter = 0
    while send_data_flag:

        valid, frame = cam.read()   # frame holds the image object (np array)

        # rectangle is 256x256
        rectangleOverlay = cv2.rectangle(frame, start_point, end_point, color, thickness)
        cv2.imwrite("../image_assets/frameRect.png", rectangleOverlay)

        # cv2.imshow("Current Frame", rectangleOverlay)

        if counter >= 5:
            counter = 0

            processed_image = preprocess_image(frame, top, bottom, left, right)

            # Make a prediction
            prediction = model.predict(processed_image, batch_size=1)

            letters = ['P','Y','R','O','V']
            maxIndex = prediction[0].argmax()

            print(f"Most Likely:  {letters[maxIndex]}, {int(prediction[0][maxIndex] * 100)}% chance")
        
        else:
            counter += 1

        # socketio.emit('server_event', {'data': 'Hello from Flask! ' + str(i)})

        encoded_image = encode_image_to_base64(rectangleOverlay)

        print(f"Encoded image length: {len(encoded_image)}")
        # print(f"First 100 chars of encoded image: {encoded_image[:100]}")
        
        socketio.emit('receive_data', {'image': encoded_image, 'data': 'Hello from Flask!'})

        time.sleep(1)

    cam.release()
    cv2.destroyAllWindows()


@socketio.on('connect')
def handle_connect():
    print("Client connected")
    socketio.start_background_task(send_data)
    global send_data_flag
    send_data_flag = True


@socketio.on('disconnect')
def handle_disconnect():
    global send_data_flag
    print('Client Disconnected')
    send_data_flag = False


if __name__ == '__main__':
    monkey.patch_all()

    socketio.run(app, port=5001)

For later reference?

In [None]:
import threading
import cv2

def capture_video(cam_port):
    cam = cv2.VideoCapture(cam_port)
    while True:
        valid, frame = cam.read()
        if not valid:
            break
        # Process your frame
    cam.release()

# In your `send_data` function, run the video capture in a separate thread
threading.Thread(target=capture_video, args=(0,)).start()
