In [None]:
import cv2
import numpy as np
import pandas as pd
import os
from skimage.feature import hog, local_binary_pattern
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from concurrent.futures import ProcessPoolExecutor

In [None]:
labels_df = pd.read_csv('label.csv')

image_directory = 'data'

In [None]:
def extract_edges(image):
    edges = cv2.Canny(image, 100, 200)
    return edges.flatten()

# Function to extract ORB features
def extract_orb_features(image, max_features=128):
    orb = cv2.ORB_create()
    keypoints, descriptors = orb.detectAndCompute(image, None)
    if descriptors is not None:
        if descriptors.shape[0] > max_features:
            descriptors = descriptors[:max_features, :]
        elif descriptors.shape[0] < max_features:
            padding = np.zeros((max_features - descriptors.shape[0], descriptors.shape[1]))
            descriptors = np.vstack((descriptors, padding))
        return descriptors.flatten()
    else:
        return np.zeros(max_features * 32)

# Function to extract HOG features
def extract_hog_features(image):
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    features = hog(gray_image, pixels_per_cell=(24, 24), block_norm='L2-Hys')
    return features

# Function to extract LBP features
def extract_lbp_features(image, radii=[1, 2, 3], n_points=8, method='uniform'):
    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    lbp_features = []
    
    for radius in radii:
        lbp = local_binary_pattern(gray_image, n_points, radius, method=method)
        hist, _ = np.histogram(lbp.ravel(), bins=np.arange(0, n_points + 3), range=(0, n_points + 2))
        hist = hist.astype("float")
        hist /= (hist.sum() + 1e-7)
        lbp_features.extend(hist)
    
    return np.array(lbp_features) 

# Function to extract color histogram features
# def extract_color_histogram(image, bins=(8, 8, 8)):
#     hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
#     hist = cv2.calcHist([hsv_image], [0, 1, 2], None, bins, [0, 180, 0, 256, 0, 256])
#     hist = cv2.normalize(hist, hist).flatten()
#     return hist

# Function to extract color histogram features in RGB space
# def extract_color_histogram(image, bins=(8, 8, 8)):
#     # Convert BGR to RGB
#     rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
#     # Calculate the histogram for the RGB channels
#     hist = cv2.calcHist([rgb_image], [0, 1, 2], None, bins, [0, 256, 0, 256, 0, 256])
    
#     # Normalize the histogram
#     hist = cv2.normalize(hist, hist).flatten()
    
#     return hist

def extract_color_histogram(image):
    # Load the image
    
    # Compute the histogram for each color channel (B, G, R)
    hist_b = cv2.calcHist([image], [0], None, [256], [0, 256])
    hist_g = cv2.calcHist([image], [1], None, [256], [0, 256])
    hist_r = cv2.calcHist([image], [2], None, [256], [0, 256])
    
    # Normalize the histograms
    hist_b = cv2.normalize(hist_b, hist_b).flatten()
    hist_g = cv2.normalize(hist_g, hist_g).flatten()
    hist_r = cv2.normalize(hist_r, hist_r).flatten()

    # Concatenate the histograms into a single feature vector
    hist_features = np.concatenate([hist_b, hist_g, hist_r])

    # print(hist_features.shape)

    return hist_features


# Combine all features into a single feature vector
def extract_combined_features(image, pca_model):
    edges = extract_edges(image)
    if edges is not None:
        edges = pca_model.transform([edges])[0]  # Apply PCA

    orb_features = extract_orb_features(image)
    hog_features = extract_hog_features(image)
    lbp_features = extract_lbp_features(image)
    color_histogram = extract_color_histogram(image)

    # print(f"edges shape: {edges.shape}, orb_features shape: {orb_features.shape}, hog_features shape: {hog_features.shape}, lbp_features shape: {lbp_features.shape}, color_histogram shape: {color_histogram.shape}")

    combined_features = np.concatenate((edges, orb_features, hog_features, lbp_features, color_histogram))
    return combined_features

In [None]:
# image_path = "data/Image_1.jpg"
# image = cv2.imread(image_path)

# # Resize the image if needed
# resized_image = cv2.resize(image, (250, 200))

# # pca = PCA(n_components=100)
# # Extract combined features
# combined_features = extract_combined_features(resized_image)

In [None]:
all_edges = []

# First pass: Collect edges to fit PCA
for index, row in labels_df.iterrows():
    image_path = os.path.join(image_directory, row['filename'])
    image = cv2.imread(image_path)

    # Ensure the image is loaded correctly
    if image is None:
        continue
    
    # Resize the image if needed
    resized_image = cv2.resize(image, (250, 200))
    
    # Extract edges
    edges = extract_edges(resized_image)
    print(image_path, edges.shape, end='\r')

    if edges is not None:
        all_edges.append(edges)

# Fit PCA on the collected edge features
all_edges = np.array(all_edges)
pca = PCA(n_components=100)  # Choose appropriate number of components
pca.fit(all_edges)

print(f"PCA model fitted. {all_edges.shape[0]} samples with {all_edges.shape[1]} features.")

In [None]:
# header_written = False

# with open('extracted_features_pca.csv', 'w') as csvfile:
#     for index, row in labels_df.iterrows():
#         image_path = os.path.join(image_directory, row['filename'])
#         image = cv2.imread(image_path)

#         # Ensure the image is loaded correctly
#         if image is None:
#             continue
        
#         # Resize the image if needed
#         resized_image = cv2.resize(image, (250, 200))
        
#         # Extract combined features
#         combined_features = extract_combined_features(resized_image, pca)
#         print(image_path, combined_features.shape, end='\r')

#         # Normalize features
#         scaler = StandardScaler()
#         X = scaler.fit_transform(combined_features)
        
#         # Convert features to a DataFrame row with label and filename
#         combined_row = np.append(X, [row['label'], row['filename']])
        
#         # Convert to DataFrame
#         combined_df = pd.DataFrame([combined_row])
        
#         # Write the row to the CSV, writing the header only once
#         if not header_written:
#             combined_df.to_csv(csvfile, header=['feature_' + str(i) for i in range(len(combined_features))] + ['label', 'filename'], index=False, mode='a')
#             header_written = True
#         else:
#             combined_df.to_csv(csvfile, header=False, index=False, mode='a')

# print("Feature extraction completed and saved to CSV.")

In [None]:
def process_image(row_tuple):
    index, row = row_tuple
    image_path = os.path.join(image_directory, row['filename'])
    image = cv2.imread(image_path)

    if image is None:
        return None

    resized_image = cv2.resize(image, (250, 200))

    # Extract combined features
    combined_features = extract_combined_features(resized_image, pca)
    print(image_path, combined_features.shape, end='\r')

    # Return features along with label and filename
    return np.append(combined_features, [row['label'], row['filename']])

In [None]:
features_list = []

with ProcessPoolExecutor() as executor:
    # Directly pass the row tuples from labels_df.iterrows()
    results = list(executor.map(process_image, labels_df.iterrows()))

# Filter out None results (in case any image failed to load)
results = [result for result in results if result is not None]
features_list.extend(results)

# Convert to DataFrame for easier manipulation
features_df = pd.DataFrame(features_list)

# Normalize features using StandardScaler
scaler = StandardScaler()
features_df.iloc[:, :-2] = scaler.fit_transform(features_df.iloc[:, :-2])

# Write to CSV
features_df.columns = ['feature_' + str(i) for i in range(features_df.shape[1] - 2)] + ['label', 'filename']
features_df.to_csv('extracted_features_pca.csv', index=False)

print("Feature extraction completed and saved to CSV.")