In [1]:
import cv2
import librosa
import mediapipe as mp
import numpy as np
from scipy.signal import resample
from sklearn.preprocessing import StandardScaler
import pandas as pd
from scipy.linalg import svd
from IPython.display import clear_output
import matplotlib.pyplot as plt
import math
import subprocess
import os
from sklearn.cross_decomposition import CCA
from scipy.stats import spearmanr
import warnings
from scipy.spatial import distance as dist
from scipy.signal import find_peaks
warnings.filterwarnings("ignore", category=FutureWarning, module="librosa")

In [2]:
# Calculate the Eye Aspect Ratio (EAR)
def eye_aspect_ratio(eye_landmarks):
    # Compute the Euclidean distances between the vertical eye landmarks
    A = dist.euclidean(eye_landmarks[1], eye_landmarks[5])
    B = dist.euclidean(eye_landmarks[2], eye_landmarks[4])
    # Compute the Euclidean distance between the horizontal eye landmarks
    C = dist.euclidean(eye_landmarks[0], eye_landmarks[3])
    # Compute the eye aspect ratio
    ear = (A + B) / (C)
    return ear
def extract_video_features(video_path):
    """Extracts lip landmarks from the video using MediaPipe."""
    mp_face_mesh = mp.solutions.face_mesh
    face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, refine_landmarks=True)
    cap = cv2.VideoCapture(video_path)
    # Get video frame rate using OpenCV    
    fps = cap.get(cv2.CAP_PROP_FPS)  # Frames per second    
    upper_lip_points = [185,40,39,37,0,267,269,270,409]
    lower_lip_points = [146,91,181,84,17,314,405,321,375]    

    # Variables for blink detection
    EYE_STATUS = 0  # OPEN = 0 and CLOSED = 1
    total_blinks = 0  # Total number of blinks detected
    EAR = 0
    # Landmark indices for left and right eyes
    LEFT_EYE_INDICES = [362, 385, 387, 263, 373, 380]
    RIGHT_EYE_INDICES = [33, 160, 158, 133, 153, 144]
    lip_features = []
    # Constants for blink detection
    EAR_THRESHOLD = 0.25  # Eye aspect ratio threshold for blink detection    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = face_mesh.process(frame_rgb)
        if results.multi_face_landmarks:
            for landmarks in results.multi_face_landmarks:                           
                face_width = math.sqrt((landmarks.landmark[323].x - landmarks.landmark[93].x)**2 + (landmarks.landmark[323].y - landmarks.landmark[93].y)**2)
                
                # Lip movement feature extraction
                upper_lips = [(lm.x, lm.y) for i, lm in enumerate(landmarks.landmark) if i in upper_lip_points]
                lower_lips = [(lm.x, lm.y) for i, lm in enumerate(landmarks.landmark) if i in lower_lip_points]
                distances = [math.sqrt((ul[0] - ll[0])**2 + (ul[1] - ll[1])**2) / face_width for ul, ll in zip(upper_lips, lower_lips)]                            
                lip_features.append(np.array(distances))  # Append the list of distances for each frame to lip_features

                # Eye movement feature extraction                
                # Extract landmarks for left and right eyes
                left_eye = []
                right_eye = []
                for idx in LEFT_EYE_INDICES:
                    landmark = landmarks.landmark[idx]
                    x = int(landmark.x * frame.shape[1])
                    y = int(landmark.y * frame.shape[0])
                    left_eye.append((x, y))
                for idx in RIGHT_EYE_INDICES:
                    landmark = landmarks.landmark[idx]
                    x = int(landmark.x * frame.shape[1])
                    y = int(landmark.y * frame.shape[0])
                    right_eye.append((x, y))

                # Calculate EAR for both eyes
                left_ear = eye_aspect_ratio(left_eye)
                right_ear = eye_aspect_ratio(right_eye)
                EAR = (left_ear + right_ear) / 2.0
                
                if EAR < EAR_THRESHOLD:
                    EYE_STATUS = 1                    
                else:
                    if EYE_STATUS == 1:
                        total_blinks += 1
                    EYE_STATUS = 0                    
    cap.release()
    return np.array(lip_features),total_blinks, fps
def get_audio_energy(video_path,fps):
    # Load audio using librosa
    y, sr = librosa.load(video_path, sr=16000, mono=True)        
    
    # Calculate hop_length and frame_length to match video frames
    hop_length = int(sr / fps)  # Hop length to match video frame rate
    frame_length = int(sr * 0.05)  # 50ms window (can be adjusted)
    
    # Calculate short-time energy
    energy = np.array([
        sum(abs(y[i:i+frame_length])**2)
        for i in range(0, len(y), hop_length)
    ])
    
    return energy
# Min-max normalization function
def min_max_normalize(data):
    return (data - np.min(data)) / (np.max(data) - np.min(data))
def moving_average(data, window_size):
    # Ensure the window size is odd
    if window_size % 2 == 0:
        raise ValueError("Window size must be odd to maintain symmetry.")
    
    # Create a window of ones and normalize it
    window = np.ones(window_size) / window_size
    
    # Pad the data to handle edges
    pad_size = window_size // 2
    padded_data = np.pad(data, (pad_size, pad_size), mode='edge')  # Reflect padding
    
    # Apply convolution to compute the moving average
    smoothed_data = np.convolve(padded_data, window, mode='valid')
    
    return smoothed_data
def detect_peaks(data):
    # Compute absolute difference between consecutive frames
    movement = np.abs(np.diff(data))
    peaks, _ = find_peaks(movement, height=np.mean(movement) + np.std(movement),
                      prominence=0.05, width=1, distance=2)
    return peaks    

In [3]:
def get_video_features(video_path):
    start_delay = 10
    window_size = 9
    #print(f'[Processing] Video')

    lip_feat,blink_count,fps = extract_video_features(video_path)
    mean_dist = np.sum(lip_feat,axis=1)
    

    audio_energy = get_audio_energy(video_path,fps)

    min_len = min(len(mean_dist),len(audio_energy))
    normalized_dist = min_max_normalize(mean_dist)[start_delay:min_len]
    normalized_audio = min_max_normalize(audio_energy)[start_delay:min_len]    
    avg_dist = moving_average(normalized_dist, window_size)
    avg_audio = moving_average(normalized_audio, window_size)

    # Detect lip movement
    lip_movements = len(detect_peaks(avg_dist))
    
    # Detect Audio Change
    audio_changes = len(detect_peaks(audio_energy))

    # Calculate Pearson correlation coefficient
    correlation, _ = spearmanr(avg_dist, avg_audio)    

    return blink_count,lip_movements,audio_changes,correlation     

In [4]:
df = pd.DataFrame(columns=['blink_count','lip_movements','audio_changes','correlation','label'])

In [None]:
test = 'real'
sets = ['real','fake']

#video_folder_path = f'C:/Users/aanki/Downloads/live_videos'

start_delay = 10
for test_set in sets:
    video_folder_path = f'D:/Programming/Python/AI/Basics/AMNIL Tech/Liveness Detection/lip_movement/video/{test_set}'
    #video_folder_path = f'C:/Users/aanki/Downloads/live_videos/{test_set}'
    videos = os.listdir(f'{video_folder_path}')
    for video in videos:
        video_path = f'{video_folder_path}/{video}'
        print(f'[Processing] {video_path}')
        blink_count,lip_movements,audio_changes,correlation = get_video_features(video_path)      
        label = 1 if test_set == 'real' else 0           
        df.loc[len(df)] = [blink_count,lip_movements,audio_changes,correlation,label]
        

[Processing] C:/Users/aanki/Downloads/live_videos/real/0001d815c0--61dd5f24a14d0d2b3b211c83.mp4


  y, sr = librosa.load(video_path, sr=16000, mono=True)


[Processing] C:/Users/aanki/Downloads/live_videos/real/0001d815c0--61de4edf6292055ac305e885.mp4


  y, sr = librosa.load(video_path, sr=16000, mono=True)


[Processing] C:/Users/aanki/Downloads/live_videos/real/0001d815c0--61e28d132e6cac25f79a3346.MOV


  y, sr = librosa.load(video_path, sr=16000, mono=True)


[Processing] C:/Users/aanki/Downloads/live_videos/real/0001d815c0--61ece94de8e9ba2b3bc01c59.mkv


  y, sr = librosa.load(video_path, sr=16000, mono=True)


[Processing] C:/Users/aanki/Downloads/live_videos/real/0001d815c0--61f276f01c8b5c57e89244a1.mp4


  y, sr = librosa.load(video_path, sr=16000, mono=True)


[Processing] C:/Users/aanki/Downloads/live_videos/real/0001d815c0--61f5617b66629e059dbeaef5.webm


  y, sr = librosa.load(video_path, sr=16000, mono=True)


[Processing] C:/Users/aanki/Downloads/live_videos/real/0001d815c0--61f82a2956ef241fa1f94ff0.mp4


  y, sr = librosa.load(video_path, sr=16000, mono=True)


[Processing] C:/Users/aanki/Downloads/live_videos/real/0001d815c0--6203fa2ad1a403449593c5cb.mp4


  y, sr = librosa.load(video_path, sr=16000, mono=True)


[Processing] C:/Users/aanki/Downloads/live_videos/real/0001d815c0--620984e9f002b8749797ec67.mp4


  y, sr = librosa.load(video_path, sr=16000, mono=True)


[Processing] C:/Users/aanki/Downloads/live_videos/real/0001db3fa7--61e8f5380569a62870798ef9.mp4


  y, sr = librosa.load(video_path, sr=16000, mono=True)


In [7]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

In [24]:
X = df[['blink_count','lip_movements','audio_changes','correlation']]
y = df['label']

In [9]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [11]:
# Normalize features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)



In [12]:
# Train model (Random Forest)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Predict and evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Model Accuracy: {accuracy:.2f}")

Model Accuracy: 0.80


In [20]:
print('Pred: ',y_pred)
print('Actual: ',y_test.values)

Pred:  [1. 1. 0. 1. 1.]
Actual:  [1. 0. 0. 1. 1.]


In [21]:
y_pred_train = model.predict(X_train)
accuracy = accuracy_score(y_train, y_pred_train)
print(f"Model Accuracy: {accuracy:.2f}")
print('Pred: ',y_pred_train)
print('Actual: ',y_train.values)

Model Accuracy: 1.00
Pred:  [1. 0. 1. 0. 0. 0. 1. 1. 0. 1. 0. 1. 0. 0. 0. 1.]
Actual:  [1. 0. 1. 0. 0. 0. 1. 1. 0. 1. 0. 1. 0. 0. 0. 1.]


In [25]:
y_pred_X = model.predict(X)
accuracy = accuracy_score(y, y_pred_X)
print(f"Model Accuracy: {accuracy:.2f}")

Model Accuracy: 0.81




In [35]:
d_path = r"C:\Users\aanki\OneDrive\Pictures\Camera Roll\WIN_20250129_17_58_48_Pro.mp4"

blink_count,lip_movements,audio_changes,correlation = get_video_features(d_path)
df_test = pd.DataFrame(columns=['blink_count','lip_movements','audio_changes','correlation'])
df_test.loc[len(df_test)] = [blink_count,lip_movements,audio_changes,correlation]

  y, sr = librosa.load(video_path, sr=16000, mono=True)


In [36]:
model.predict(df_test)



array([1.])

In [37]:
df_test

Unnamed: 0,blink_count,lip_movements,audio_changes,correlation
0,0.0,1.0,8.0,0.60786
