<h1 align="center">Machine Learning and Vision Lab</h1>
<h4 align="center">Dr. Mohammadzadeh</h4>
<h4 align="center">Sharif University of Technology, Fall 2023</h4>
<h4 align="center">Amir Hossein Yari - 99102507</h4>
<h4 align="center">Lab 8 - Optical Flow</h4>

In [148]:
# Import required package
import cv2
import numpy as np
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier

#### Sparse Optical Flow

In [2]:
# Open a video capture object (camera)
cap = cv2.VideoCapture(0)

# Hyperparameters
maxCorners = 100

# Generate random colors for visualizing feature points
color = np.random.randint(0, 255, (maxCorners, 3))

# Parameters for goodFeaturesToTrack
feature_params = dict(maxCorners=maxCorners,
                      qualityLevel=0.3,
                      minDistance=7,
                      blockSize=7)

# Parameters for Lucas-Kanade optical flow
lk_params = dict(winSize=(15, 15),
                 maxLevel=2,
                 criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# Take the first frame and find corners in it
ret, old_frame = cap.read()
old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)

# Find good features to track in the first frame
p0 = cv2.goodFeaturesToTrack(old_gray, mask=None, **feature_params)

# Create a mask image for drawing purposes
mask = np.zeros_like(old_frame)

while True:
    # Read the current frame
    ret, frame = cap.read()
    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Calculate optical flow using Lucas-Kanade method
    p1, st, err = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)

    # Select good points
    good_new = p1[st == 1]
    good_old = p0[st == 1]

    # Draw the tracks
    for i, (new, old) in enumerate(zip(good_new, good_old)):
        a, b = new.ravel().astype(int)
        c, d = old.ravel().astype(int)

        # Draw line and circle for each feature point
        mask = cv2.line(mask, (a, b), (c, d), color[i].tolist(), 2)
        frame = cv2.circle(frame, (a, b), 5, color[i].tolist(), -1)

    # Combine the frame with the mask to visualize the tracks
    img = cv2.add(frame, mask)

    # Display the result
    cv2.imshow('Optical Flow', img)

    # Press 'Esc' to exit the loop
    k = cv2.waitKey(25)
    if k == 27:
        break

    # Update the previous frame and feature points
    old_gray = frame_gray.copy()
    p0 = good_new.reshape(-1, 1, 2)

# Release the video capture object and close all windows
cv2.destroyAllWindows()
cap.release()

#### Dense Optical Flow

In [3]:
# Open a video capture object (camera)
cap = cv2.VideoCapture(0)

# Read the first frame
ret, first_frame = cap.read()

# Convert the first frame to grayscale
prev_gray = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)

# Create a mask with maximum saturation
mask = np.zeros_like(first_frame)
mask[..., 1] = 255  # Saturation to maximum

while True:
    # Read the current frame
    ret, frame = cap.read()

    # Display the input frame
    cv2.imshow("Input", frame)

    # Convert the current frame to grayscale
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Calculate dense optical flow using the Farneback method
    flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)

    # Calculate magnitude and angle of the 2D vectors
    magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])

    # Set image hue according to the optical flow direction
    mask[..., 0] = angle * 180 / np.pi / 2

    # Set image value according to the optical flow magnitude (normalized)
    mask[..., 2] = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX)

    # Convert HSV to RGB (BGR) color representation
    rgb = cv2.cvtColor(mask, cv2.COLOR_HSV2BGR)

    # Display the dense optical flow
    cv2.imshow("Dense Optical Flow", rgb)

    # Update the previous frame
    prev_gray = gray

    # Press 'Esc' to exit the loop
    k = cv2.waitKey(25)
    if k == 27:
        break

# Release the video capture object and close all windows
cap.release()
cv2.destroyAllWindows()

#### Happy Video Dense Optical Flow

In [25]:
# Set the path to the main folder
main_folder = "happy videos"

# Create a dictionary to store optical flow results for each subfolder
optical_flow_dict_happy = []

# Iterate through each subfolder
for subfolder in os.listdir(main_folder):
    subfolder_path = os.path.join(main_folder, subfolder)
    hist_list = []

    first_time = True
    for image_file in os.listdir(subfolder_path):
        image_path = os.path.join(subfolder_path, image_file)

        if first_time:
            # Read the first frame
            prev_frame = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
            prev_frame = prev_frame[100:750, 350:800]
            first_time = False
            continue

        current_frame = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
        current_frame = current_frame[100:750, 350:800]

        # Calculate dense optical flow using Farneback method
        flow = cv2.calcOpticalFlowFarneback(prev_frame, current_frame, None, 0.5, 3, 15, 3, 5, 1.2, 0)

        # Calculate histograms of flow magnitudes in both x and y directions
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        hist= np.histogram(ang.ravel(), bins=np.linspace(0,2*np.pi,9))[0]

        # Store the optical flow results for the current frame
        hist_list.append(hist)

        # Update the previous frame for the next iteration
        prev_frame = current_frame

    hists_array = np.array(hist_list)
    combine_results = np.concatenate((np.mean(hists_array, axis=0), np.max(hists_array, axis=0)))
    # Add the list of optical flow results to the dictionary with the subfolder name as the key
    optical_flow_dict_happy.append(combine_results)

#### Surprise Video Dense Optical Flow

In [26]:
# Set the path to the main folder
main_folder = "surprise videos"

# Create a dictionary to store optical flow results for each subfolder
optical_flow_dict_surprise = []

# Iterate through each subfolder
for subfolder in os.listdir(main_folder):
    subfolder_path = os.path.join(main_folder, subfolder)
    hist_list = []

    first_time = True
    for image_file in os.listdir(subfolder_path):
        image_path = os.path.join(subfolder_path, image_file)

        if first_time:
            # Read the first frame
            prev_frame = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
            prev_frame = prev_frame[100:750, 350:800]
            first_time = False
            continue

        current_frame = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
        current_frame = current_frame[100:750, 350:800]

        # Calculate dense optical flow using Farneback method
        flow = cv2.calcOpticalFlowFarneback(prev_frame, current_frame, None, 0.5, 3, 15, 3, 5, 1.2, 0)

        # Calculate histograms of flow magnitudes in both x and y directions
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        hist= np.histogram(ang.ravel(), bins=np.linspace(0,2*np.pi,9))[0]

        # Store the optical flow results for the current frame
        hist_list.append(hist)

        # Update the previous frame for the next iteration
        prev_frame = current_frame

    hists_array = np.array(hist_list)
    combine_results = np.concatenate((np.mean(hists_array, axis=0), np.max(hists_array, axis=0)))
    # Add the list of optical flow results to the dictionary with the subfolder name as the key
    optical_flow_dict_surprise.append(combine_results)

In [29]:
optical_flow_dict_surprise = np.array(optical_flow_dict_surprise)
optical_flow_dict_happy = np.array(optical_flow_dict_happy)

#### SVM Classifier

In [164]:
surprise_labels = np.zeros(len(optical_flow_dict_surprise))
happy_labels = np.ones(len(optical_flow_dict_happy))

# Concatenate all data and labels
all_data = np.concatenate((optical_flow_dict_surprise, optical_flow_dict_happy))
all_labels = np.concatenate((surprise_labels, happy_labels))

# Split data into train and test sets
train_data, test_data, train_labels, test_labels = train_test_split(
    all_data, all_labels, test_size=0.5, random_state=0
)

# Reshape data for SVM
train_data_reshaped = train_data.reshape(len(train_data), -1)
test_data_reshaped = test_data.reshape(len(test_data), -1)

scaler = StandardScaler()
scaler.fit(train_data_reshaped)
train_data_reshaped = scaler.transform(train_data_reshaped)

scaler.fit(test_data_reshaped)
test_data_reshaped = scaler.transform(test_data_reshaped)

# Create and train the SVM classifier
classifier = SVC(C=100)
classifier.fit(train_data_reshaped, train_labels)
# Make predictions on the training set
train_predictions = classifier.predict(train_data_reshaped)
# Evaluate the performance on the training set
train_accuracy = accuracy_score(train_labels, train_predictions)
# Make predictions on the test set
predictions = classifier.predict(test_data_reshaped)
# Evaluate the performance
accuracy = accuracy_score(test_labels, predictions)

print(f"Training Accuracy: {train_accuracy:.2f}")
print(f"SVM Classifier Accuracy: {accuracy:.2f}")

Training Accuracy: 1.00
SVM Classifier Accuracy: 0.73


#### Random Forest Classifier

In [165]:
# Split data into train and test sets
train_data, test_data, train_labels, test_labels = train_test_split(
    all_data, all_labels, test_size=0.1, random_state=0
)

# Reshape data for SVM
train_data_reshaped = train_data.reshape(len(train_data), -1)
test_data_reshaped = test_data.reshape(len(test_data), -1)

scaler = StandardScaler()
scaler.fit(train_data_reshaped)
train_data_reshaped = scaler.transform(train_data_reshaped)

scaler.fit(test_data_reshaped)
test_data_reshaped = scaler.transform(test_data_reshaped)

# Create and train the Random Forest classifier
rf_classifier = RandomForestClassifier(n_estimators=50, random_state=0)
rf_classifier.fit(train_data_reshaped, train_labels)

# Make predictions on the training set
train_predictions = rf_classifier.predict(train_data_reshaped)

# Evaluate the performance on the training set
train_accuracy = accuracy_score(train_labels, train_predictions)

# Make predictions on the test set
rf_predictions = rf_classifier.predict(test_data_reshaped)

# Evaluate the performance of Random Forest Classifier
rf_accuracy = accuracy_score(test_labels, rf_predictions)

print(f"Training Accuracy: {train_accuracy:.2f}")
print(f"Random Forest Classifier Accuracy: {rf_accuracy:.2f}")

Training Accuracy: 1.00
Random Forest Classifier Accuracy: 0.83


#### Ada Boost Classifier

In [166]:
# Split data into train and test sets
train_data, test_data, train_labels, test_labels = train_test_split(
    all_data, all_labels, test_size=0.3, random_state=0
)

# Reshape data for SVM
train_data_reshaped = train_data.reshape(len(train_data), -1)
test_data_reshaped = test_data.reshape(len(test_data), -1)

scaler = StandardScaler()
scaler.fit(train_data_reshaped)
train_data_reshaped = scaler.transform(train_data_reshaped)

scaler.fit(test_data_reshaped)
test_data_reshaped = scaler.transform(test_data_reshaped)

# Create a base Decision Tree classifier
base_classifier = DecisionTreeClassifier(max_depth=3)

# Create and train the AdaBoost classifier with the base classifier
adaboost_classifier = AdaBoostClassifier(base_classifier, n_estimators=15, random_state=0)
adaboost_classifier.fit(train_data_reshaped, train_labels)

# Make predictions on the training set
train_predictions = adaboost_classifier.predict(train_data_reshaped)

# Evaluate the performance on the training set
train_accuracy = accuracy_score(train_labels, train_predictions)


# Make predictions on the test set
adaboost_predictions = adaboost_classifier.predict(test_data_reshaped)

# Evaluate the performance of AdaBoost Classifier
adaboost_accuracy = accuracy_score(test_labels, adaboost_predictions)

print(f"Training Accuracy: {train_accuracy:.2f}")
print(f"AdaBoost Classifier Accuracy: {adaboost_accuracy:.2f}")

Training Accuracy: 1.00
AdaBoost Classifier Accuracy: 0.67
