In [2]:
import cv2
import mediapipe as mp
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam


In [3]:
# Initialize Mediapipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh.FaceMesh(max_num_faces=1)


In [4]:
# Function to extract lip region
def extract_lips(video_path, output_dir):
    cap = cv2.VideoCapture(video_path)
    frame_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Convert frame to RGB for Mediapipe
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = mp_face_mesh.process(rgb_frame)

        if results.multi_face_landmarks:
            # Extract lip landmarks
            landmarks = results.multi_face_landmarks[0]
            h, w, _ = frame.shape
            lip_landmarks = [(int(lm.x * w), int(lm.y * h)) for lm in landmarks.landmark[61:81]]
            
            # Crop the lip region
            min_x = min([x for x, y in lip_landmarks])
            max_x = max([x for x, y in lip_landmarks])
            min_y = min([y for x, y in lip_landmarks])
            max_y = max([y for x, y in lip_landmarks])
            lip_roi = frame[min_y:max_y, min_x:max_x]

            # Resize the cropped region
            if lip_roi.size > 0:
                lip_roi_resized = cv2.resize(lip_roi, (100, 50))
                # Save the frame
                frame_path = os.path.join(output_dir, f"frame_{frame_count:04d}.jpg")
                cv2.imwrite(frame_path, lip_roi_resized)
                frame_count += 1

    cap.release()


In [5]:
import cv2

def find_working_camera():
    for cam_index in range(5):  # Test up to 5 indices
        cap = cv2.VideoCapture(cam_index)
        if cap.isOpened():
            print(f"Camera {cam_index} is accessible.")
            ret, frame = cap.read()
            if ret:
                cv2.imshow(f"Camera {cam_index}", frame)
                if cv2.waitKey(2000) & 0xFF == ord('q'):  # Press 'q' to close window
                    cv2.destroyAllWindows()
                    cap.release()
                    return cam_index
            cap.release()
        else:
            print(f"Camera {cam_index} not accessible.")
    cv2.destroyAllWindows()
    return -1

camera_index = find_working_camera()
if camera_index != -1:
    print(f"Working camera index: {camera_index}")
else:
    print("No accessible cameras found.")


Camera 0 not accessible.
Camera 1 is accessible.
Camera 2 is accessible.
Camera 3 not accessible.
Camera 4 not accessible.
No accessible cameras found.


In [6]:
# Define the LipReadingModel
class LipReadingModel(nn.Module):
    def __init__(self, vocab_size):
        super(LipReadingModel, self).__init__()
        
        # Conv3d for processing the sequence of frames
        self.conv1 = nn.Conv3d(1, 32, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
        self.conv2 = nn.Conv3d(32, 64, kernel_size=(3, 3, 3), stride=(1, 1, 1), padding=(1, 1, 1))
        
        # LSTM layer to capture temporal relationships
        self.lstm = nn.LSTM(input_size=64 * 100 * 50, hidden_size=128, num_layers=2, batch_first=True)
        
        # Output layer
        self.fc = nn.Linear(128, vocab_size)

    def forward(self, x):
        # Apply Conv3d layers
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        
        # Flatten the spatial dimensions for LSTM
        x = x.view(x.size(0), x.size(2), -1)  # [batch_size, sequence_length, features]
        
        # LSTM to process the sequence
        x, _ = self.lstm(x)
        
        # Fully connected layer
        x = self.fc(x)
        
        return x


In [10]:
class LipReadingDataset(Dataset):
    def __init__(self, frames, texts):
        self.frames = frames  # List of frame sequences
        self.texts = texts  # List of corresponding transcriptions
    
    def __len__(self):
        return len(self.frames)
    
    def __getitem__(self, idx):
        return torch.tensor(self.frames[idx]), torch.tensor(self.texts[idx])


In [11]:
# Example data (replace with actual data)
frames = [torch.randn(30, 1, 100, 50)]  # List of sequences of lip images (e.g., 30 frames)
texts = [[1, 2, 3]]  # Corresponding transcriptions (encoded as integers)


In [12]:
# Create dataset and dataloader
dataset = LipReadingDataset(frames, texts)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

# Define model, optimizer, and loss function
vocab_size = 40  # Example vocab size (can be expanded)
model = LipReadingModel(vocab_size)

optimizer = Adam(model.parameters(), lr=0.001)
ctc_loss = nn.CTCLoss()


In [13]:
# Training loop
for epoch in range(10):  # Example 10 epochs
    for frames, texts in dataloader:
        optimizer.zero_grad()
        
        # Reshape frames to [batch_size, channels, frames, height, width]
        frames = frames.view(frames.size(0), 1, 30, 100, 50)
        
        output = model(frames)
        
        # Calculate lengths for CTC loss
        input_lengths = torch.full((frames.size(0),), output.size(1), dtype=torch.long)
        target_lengths = torch.tensor([len(text) for text in texts], dtype=torch.long)
        
        # Compute CTC loss
        loss = ctc_loss(output.permute(1, 0, 2), texts, input_lengths, target_lengths)
        loss.backward()
        optimizer.step()
        
    print(f"Epoch {epoch + 1}, Loss: {loss.item()}")


  return torch.tensor(self.frames[idx]), torch.tensor(self.texts[idx])


Epoch 1, Loss: -4.999701976776123
Epoch 2, Loss: -5.078636646270752
Epoch 3, Loss: -5.055662631988525
Epoch 4, Loss: -4.725643634796143
Epoch 5, Loss: -4.349334239959717
Epoch 6, Loss: -3.7118170261383057
Epoch 7, Loss: -3.292073965072632
Epoch 8, Loss: -2.6562249660491943
Epoch 9, Loss: -1.8031082153320312
Epoch 10, Loss: -0.7379097938537598


In [None]:
# Initialize webcam
camera_index = 0  # Replace with the correct index for DroidCam (e.g., 1 or 2 if 0 doesn't work)
cap = cv2.VideoCapture(camera_index)

if not cap.isOpened():
    print(f"Failed to open camera at index {camera_index}. Exiting...")
    exit()

sequence = []

# Loop to capture frames from the webcam
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        print("Failed to read frame. Exiting...")
        break

    # Preprocess frame to extract lip region
    try:
        lip_roi = preprocess_frame(frame)  # Ensure this function is implemented
        if lip_roi is not None:  # Only append if a valid ROI is extracted
            sequence.append(lip_roi)
    except Exception as e:
        print(f"Error during preprocessing: {e}")
        continue

    # If we have 30 frames, process the sequence
    if len(sequence) == 30:
        try:
            input_tensor = torch.tensor(sequence).unsqueeze(0).unsqueeze(0).float()  # Add batch dimensions
            with torch.no_grad():
                prediction = model(input_tensor)  # Pass through the model
            predicted_text = decode_prediction(prediction)  # Decode predictions
            print("Predicted Text:", predicted_text)
        except Exception as e:
            print(f"Error during prediction or decoding: {e}")

        # Reset the sequence for the next batch of frames
        sequence = []

    # Display the live feed (Optional: Disable this if it slows down processing)
    cv2.imshow("Webcam Feed", frame)

    # Exit the loop when 'q' is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
cap.release()
cv2.destroyAllWindows()
