In [8]:
import os
import numpy as np
import tensorflow as tf
import keras as ks
from tensorflow.python.keras.utils import np_utils

In [17]:
print(tf.keras.__version__)
print(ks.__version__)

3.6.0
3.6.0


In [19]:
to_categorical = ks.utils.to_categorical
img_to_array = ks.preprocessing.image.img_to_array
load_img = ks.preprocessing.image.load_img

In [22]:
# Function to load original data and save as .npy files
def load_and_save_data(data_dir, img_shape=(80, 112), num_frames=60, save_path=''):
    X = []
    y = []

    # Collect unique word names from the dataset folder
    all_folders = [folder for folder in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, folder))]
    unique_words = set(folder.split('_')[0] for folder in all_folders)

    # Create a label dictionary
    label_dict = {word: idx for idx, word in enumerate(sorted(unique_words))}

    # Filter for only _1 and _11 folders for each word
    original_folders = [folder for folder in all_folders if folder.endswith('_1') or folder.endswith('_11')]

    # Load data from original folders
    for folder in original_folders:
        print(f"Processing folder: {folder}")  # Print statement to indicate the current folder being processed
        folder_path = os.path.join(data_dir, folder)
        frames = []
        for i in range(num_frames):
            img_path = os.path.join(folder_path, f"{i}.png")
            if os.path.exists(img_path):
                img = load_img(img_path, target_size=img_shape)
                img_array = img_to_array(img)
                frames.append(img_array)
            else:
                break
        if len(frames) == num_frames:
            X.append(np.stack(frames, axis=0))
            y.append(label_dict[folder.split('_')[0]])

    # Convert to numpy arrays and normalize X
    X = np.array(X) / 255.0  # Normalize images
    y = to_categorical(y, num_classes=len(label_dict))

    # Save X and y as .npy files
    np.save(os.path.join(save_path, 'X_data.npy'), X)
    np.save(os.path.join(save_path, 'y_data.npy'), y)

    print(f"Data saved successfully at {save_path} as 'X_data.npy' and 'y_data.npy'.")

    return X, y

base_dir = r'D:\PycharmProjects\pro_dis_2\collected_data\complete_datasets_crop_six'
save_path = r'D:\PycharmProjects\pro_dis_2\collected_data\npy_files_seven'

X, y = load_and_save_data(base_dir, save_path=save_path)

Processing folder: bat_1
Processing folder: bat_11
Processing folder: big_1
Processing folder: big_11
Processing folder: book_1
Processing folder: book_11
Processing folder: car_1
Processing folder: car_11
Processing folder: cat_1
Processing folder: cat_11
Processing folder: cup_1
Processing folder: cup_11
Processing folder: dog_1
Processing folder: dog_11
Processing folder: drop_1
Processing folder: drop_11
Processing folder: eat_1
Processing folder: eat_11
Processing folder: fast_1
Processing folder: fast_11
Processing folder: fish_1
Processing folder: fish_11
Processing folder: fun_1
Processing folder: fun_11
Processing folder: go_1
Processing folder: go_11
Processing folder: hat_1
Processing folder: hat_11
Processing folder: hot_1
Processing folder: hot_11
Processing folder: jump_1
Processing folder: jump_11
Processing folder: kick_1
Processing folder: kick_11
Processing folder: leg_1
Processing folder: leg_11
Processing folder: milk_1
Processing folder: milk_11
Processing folder: 

In [23]:
# Function to load and save augmented data
def load_and_save_augmented_data(data_dir, suffixes, img_shape=(80, 112), num_frames=60, save_path=''):
    X = []
    y = []

    # Collect all folders matching the specified suffixes
    all_folders = [folder for folder in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, folder))]
    selected_folders = [folder for folder in all_folders if any(folder.endswith(f'_{suffix}') for suffix in suffixes)]

    # Create a label dictionary
    unique_words = set(folder.split('_')[0] for folder in selected_folders)
    label_dict = {word: idx for idx, word in enumerate(sorted(unique_words))}

    # Load data from the selected folders
    for folder in selected_folders:
        print(f"Processing folder: {folder}")  # Print statement to track progress
        folder_path = os.path.join(data_dir, folder)
        frames = []
        for i in range(num_frames):
            img_path = os.path.join(folder_path, f"{i}.png")
            if os.path.exists(img_path):
                img = load_img(img_path, target_size=img_shape)
                img_array = img_to_array(img)
                frames.append(img_array)
            else:
                break
        if len(frames) == num_frames:
            X.append(np.stack(frames, axis=0))
            y.append(label_dict[folder.split('_')[0]])

    # Convert to numpy arrays and normalize X
    X = np.array(X) / 255.0  # Normalize images
    y = to_categorical(y, num_classes=len(label_dict))

    # Save X and y as .npy files
    np.save(os.path.join(save_path, 'X_augmented_data.npy'), X)
    np.save(os.path.join(save_path, 'y_augmented_data.npy'), y)

    print(f"Augmented data saved successfully at {save_path} as 'X_augmented_data.npy' and 'y_augmented_data.npy'.")

# Specify the suffixes to include (_2 to _10)
suffixes = range(2, 11)  # _2 to _10
base_dir = r'D:\PycharmProjects\pro_dis_2\collected_data\complete_datasets_crop_six'
save_path = r'D:\PycharmProjects\pro_dis_2\collected_data\npy_files_seven'

# Run the function to save the data locally
load_and_save_augmented_data(base_dir, suffixes, save_path=save_path)

Processing folder: bat_10
Processing folder: bat_2
Processing folder: bat_3
Processing folder: bat_4
Processing folder: bat_5
Processing folder: bat_6
Processing folder: bat_7
Processing folder: bat_8
Processing folder: bat_9
Processing folder: big_10
Processing folder: big_2
Processing folder: big_3
Processing folder: big_4
Processing folder: big_5
Processing folder: big_6
Processing folder: big_7
Processing folder: big_8
Processing folder: big_9
Processing folder: book_10
Processing folder: book_2
Processing folder: book_3
Processing folder: book_4
Processing folder: book_5
Processing folder: book_6
Processing folder: book_7
Processing folder: book_8
Processing folder: book_9
Processing folder: car_10
Processing folder: car_2
Processing folder: car_3
Processing folder: car_4
Processing folder: car_5
Processing folder: car_6
Processing folder: car_7
Processing folder: car_8
Processing folder: car_9
Processing folder: cat_10
Processing folder: cat_2
Processing folder: cat_3
Processing 

In [24]:
# Function to load and save additional augmented data
def load_and_save_augmented_data(data_dir, suffixes, img_shape=(80, 112), num_frames=60, save_path=''):
    X = []
    y = []

    # Collect all folders matching the specified suffixes
    all_folders = [folder for folder in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, folder))]
    selected_folders = [folder for folder in all_folders if any(folder.endswith(f'_{suffix}') for suffix in suffixes)]

    # Create a label dictionary
    unique_words = set(folder.split('_')[0] for folder in selected_folders)
    label_dict = {word: idx for idx, word in enumerate(sorted(unique_words))}

    # Load data from the selected folders
    for folder in selected_folders:
        print(f"Processing folder: {folder}")  # Print statement to track progress
        folder_path = os.path.join(data_dir, folder)
        frames = []
        for i in range(num_frames):
            img_path = os.path.join(folder_path, f"{i}.png")
            if os.path.exists(img_path):
                img = load_img(img_path, target_size=img_shape)
                img_array = img_to_array(img)
                frames.append(img_array)
            else:
                break
        if len(frames) == num_frames:
            X.append(np.stack(frames, axis=0))
            y.append(label_dict[folder.split('_')[0]])

    # Convert to numpy arrays and normalize X
    X = np.array(X) / 255.0  # Normalize images
    y = to_categorical(y, num_classes=len(label_dict))

    # Save X and y as .npy files
    np.save(os.path.join(save_path, 'X_additional_augmented_data.npy'), X)
    np.save(os.path.join(save_path, 'y_additional_augmented_data.npy'), y)

    print(f"Additional augmented data saved successfully at {save_path} as 'X_additional_augmented_data.npy' and 'y_additional_augmented_data.npy'.")

# Specify the suffixes to include (_12 to _20)
suffixes = range(12, 21)  # _12 to _20
base_dir = r'D:\PycharmProjects\pro_dis_2\collected_data\complete_datasets_crop_six'
save_path = r'D:\PycharmProjects\pro_dis_2\collected_data\npy_files_seven'

# Run the function to save the data locally
load_and_save_augmented_data(base_dir, suffixes, save_path=save_path)

Processing folder: bat_12
Processing folder: bat_13
Processing folder: bat_14
Processing folder: bat_15
Processing folder: bat_16
Processing folder: bat_17
Processing folder: bat_18
Processing folder: bat_19
Processing folder: bat_20
Processing folder: big_12
Processing folder: big_13
Processing folder: big_14
Processing folder: big_15
Processing folder: big_16
Processing folder: big_17
Processing folder: big_18
Processing folder: big_19
Processing folder: big_20
Processing folder: book_12
Processing folder: book_13
Processing folder: book_14
Processing folder: book_15
Processing folder: book_16
Processing folder: book_17
Processing folder: book_18
Processing folder: book_19
Processing folder: book_20
Processing folder: car_12
Processing folder: car_13
Processing folder: car_14
Processing folder: car_15
Processing folder: car_16
Processing folder: car_17
Processing folder: car_18
Processing folder: car_19
Processing folder: car_20
Processing folder: cat_12
Processing folder: cat_13
Pro

In [6]:
import dlib
import cv2
import os
from imutils import face_utils
from PIL import Image
from concurrent.futures import ThreadPoolExecutor

# Initialize Dlib's face detector and shape predictor once in the main process
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor('shape_predictor_68_face_landmarks.dat')

# Function to detect, crop, and resize the mouth region without compression
def crop_and_resize_mouth(image_path, output_path, output_size=(112, 112)):
    image = cv2.imread(image_path)
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    faces = detector(gray)

    for face in faces:
        shape = predictor(gray, face)
        shape = face_utils.shape_to_np(shape)

        # Extract the mouth region (landmarks 48 to 67)
        (x, y, w, h) = cv2.boundingRect(shape[48:68])

        # Determine the center of the mouth region
        center_x = x + w // 2
        center_y = y + h // 2

        # Calculate the side length of the square to fit the mouth and surrounding context
        side_length = max(w, h) * 1.1  # Adjust this value to capture the desired area
        half_side = side_length // 2

        # Calculate new coordinates for the square crop
        new_x = max(0, int(center_x - half_side))
        new_y = max(0, int(center_y - half_side))
        new_w = min(image.shape[1] - new_x, int(side_length))
        new_h = min(image.shape[0] - new_y, int(side_length))

        # Crop the image with equal padding around the mouth region
        mouth_region = image[new_y:new_y + new_h, new_x:new_x + new_w]

        # Resize to 112x112 pixels without distortion
        resized_mouth = cv2.resize(mouth_region, output_size)

        # Convert to RGB format for saving with PIL
        resized_mouth_rgb = cv2.cvtColor(resized_mouth, cv2.COLOR_BGR2RGB)
        pil_image = Image.fromarray(resized_mouth_rgb)

        # Save as .png
        pil_image.save(output_path, format='PNG')

# Function to process all frames in a word folder
def process_word_folder(word_path, output_word_path, set_name, word_name):
    print(f"Processing set: {set_name}, word folder: {word_name}")  # Print progress message

    if not os.path.exists(output_word_path):
        os.makedirs(output_word_path, exist_ok=True)

    image_names = os.listdir(word_path)
    futures = []

    # Use ThreadPoolExecutor to process images in parallel within the word folder
    with ThreadPoolExecutor() as executor:
        for image_name in image_names:
            input_image_path = os.path.join(word_path, image_name)
            output_image_path = os.path.join(output_word_path, image_name)
            # Submit tasks to the executor
            futures.append(executor.submit(crop_and_resize_mouth, input_image_path, output_image_path))

        # Wait for all futures to complete to ensure completeness
        for future in futures:
            future.result()

# Main folder processing loop
input_folder = r'D:\PycharmProjects\pro_dis_2\collected_data\complete_datasets_not_crop_three'
output_folder = r'D:\PycharmProjects\pro_dis_2\collected_data\cropped_datasets_four'

for set_folder in os.listdir(input_folder):
    set_path = os.path.join(input_folder, set_folder)
    if os.path.isdir(set_path):
        for word_folder in os.listdir(set_path):
            word_path = os.path.join(set_path, word_folder)
            output_word_path = os.path.join(output_folder, set_folder, word_folder)
            # Print current set and word folder being processed
            process_word_folder(word_path, output_word_path, set_folder, word_folder)

Processing set: set_1, word folder: bat
Processing set: set_1, word folder: big
Processing set: set_1, word folder: book
Processing set: set_1, word folder: car
Processing set: set_1, word folder: cat
Processing set: set_1, word folder: cup
Processing set: set_1, word folder: dog
Processing set: set_1, word folder: drop
Processing set: set_1, word folder: eat
Processing set: set_1, word folder: fast
Processing set: set_1, word folder: fish
Processing set: set_1, word folder: fun
Processing set: set_1, word folder: go
Processing set: set_1, word folder: hat
Processing set: set_1, word folder: hot
Processing set: set_1, word folder: jump
Processing set: set_1, word folder: kick
Processing set: set_1, word folder: leg
Processing set: set_1, word folder: milk
Processing set: set_1, word folder: no
Processing set: set_1, word folder: pen
Processing set: set_1, word folder: pick
Processing set: set_1, word folder: red
Processing set: set_1, word folder: run
Processing set: set_1, word folder