In [1]:
import cv2
import numpy as np
import tensorflow as tf
import mediapipe as mp
import os
import json
import hashlib
from datetime import datetime
import time
from collections import deque
import logging
from flask import Flask, render_template, request, redirect, url_for, session, jsonify, Response
from werkzeug.serving import make_server
import threading

# Create Flask application
app = Flask(__name__, template_folder='templates')
app.secret_key = 'your-secret-key-here-change-it'

# Configure logging
logging.basicConfig(filename='auth_liveness.log', level=logging.INFO)

# Directories for data storage
DATA_DIR = "user_data"
USERS_DIR = os.path.join(DATA_DIR, "users")
os.makedirs(USERS_DIR, exist_ok=True)

# Detection parameters
IMG_HEIGHT = 224  
IMG_WIDTH = 224  
LIVENESS_CLASSES = ["Real", "Fake", "Photo", "Mask"]
ACCESSORY_CLASSES = ["No Accessory", "Glasses", "Headwear", "Medical Mask"]
SLIDING_WINDOW_SIZE = 10
TEST_DURATION = 5
PREDICTION_INTERVAL = 5

# Initialize Mediapipe
mp_face_detection = mp.solutions.face_detection
face_detection = mp_face_detection.FaceDetection(min_detection_confidence=0.7)

# Load models
try:
    liveness_model = tf.keras.models.load_model("liveness_model.h5")
    print("✓ Liveness model loaded")
except Exception as e:
    print(f"✗ Error loading liveness model: {e}")
    liveness_model = None

try:
    accessory_model = tf.keras.models.load_model("accessory_model.h5")
    print("✓ Accessory model loaded")
except Exception as e:
    print(f"✗ Error loading accessory model: {e}")
    accessory_model = None

# Global variables for video
current_camera = None
frame_counter = 0
detection_results = {
    'face_detected': False,
    'liveness_status': 'No Face',
    'accessory_status': 'No Face',
    'is_real': False,
    'no_accessories': False,
    'test_start_time': None,
    'test_passed': False,
    'redirect_pending': False,
    'video_active': True
}

# Functions for user management
def hash_password(password):
    """Hash password"""
    return hashlib.sha256(password.encode()).hexdigest()

def save_user(username, password):
    """Save user"""
    user_file = os.path.join(USERS_DIR, f"{username}.json")
    
    if os.path.exists(user_file):
        return False, "Пользователь уже существует"
    
    user_data = {
        'username': username,
        'password': hash_password(password),
        'created_at': datetime.now().isoformat()
    }
    
    with open(user_file, 'w', encoding='utf-8') as f:
        json.dump(user_data, f, ensure_ascii=False, indent=2)
    
    return True, "Пользователь создан"

def verify_user(username, password):
    """Verify user"""
    user_file = os.path.join(USERS_DIR, f"{username}.json")
    
    if not os.path.exists(user_file):
        return False, "Пользователь не найден"
    
    with open(user_file, 'r', encoding='utf-8') as f:
        user_data = json.load(f)
    
    if user_data['password'] != hash_password(password):
        return False, "Неверный пароль"
    
    return True, user_data

# Image processing functions
def preprocess_frame(frame):
    img = cv2.resize(frame, (IMG_WIDTH, IMG_HEIGHT))
    img = img.astype(np.float32) / 255.0
    img = np.expand_dims(img, axis=0)
    return img

def preprocess_image(image):
    img_lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    img_lab[:, :, 0] = clahe.apply(img_lab[:, :, 0])
    img = cv2.cvtColor(img_lab, cv2.COLOR_LAB2BGR)
    img = cv2.resize(img, (IMG_HEIGHT, IMG_WIDTH))
    img = img.astype(np.float32) / 255.0
    img = np.expand_dims(img, axis=0)
    return img

class VideoCamera:
    def __init__(self):
        self.video = cv2.VideoCapture(0)
        self.accessory_queue = deque(maxlen=SLIDING_WINDOW_SIZE)
        self.frame_counter = 0
    
    def __del__(self):
        self.video.release()
    
    def get_frame(self, process_detection=False):
        global detection_results
        success, frame = self.video.read()
        if not success:
            print("Ошибка: не удалось получить кадр с камеры.")
            detection_results['video_active'] = False
            return None
        
        detection_results['video_active'] = True
        
        if process_detection:
            try:
                frame = self.process_detection(frame)
            except Exception as e:
                print(f"Ошибка в process_detection: {e}")
                detection_results['video_active'] = False
                return frame
        
        ret, jpeg = cv2.imencode('.jpg', frame)
        return jpeg.tobytes()
    
    def process_detection(self, frame):
        """Process liveness and accessory detection"""
        global detection_results, frame_counter
        
        frame_counter += 1
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        detection = face_detection.process(frame_rgb)
        h, w, _ = frame.shape
        
        if not detection.detections:
            detection_results['face_detected'] = False
            print("Лицо не обнаружено")
            cv2.putText(frame, "No face detected", (10, 30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        else:
            detection_results['face_detected'] = True
            print("Лицо обнаружено")
            
            # Get face bounding box
            bbox = detection.detections[0].location_data.relative_bounding_box
            x = int(bbox.xmin * w)
            y = int(bbox.ymin * h)
            width = int(bbox.width * w)
            height = int(bbox.height * h)
            
            # Expand bounding box
            x_offset = int(width * 0.2)
            y_offset = int(height * 0.5)
            x = max(0, x - x_offset)
            y = max(0, y - y_offset)
            width = min(w - x, width + 2 * x_offset)
            height = min(h - y, height + y_offset)
            
            # Draw rectangle
            cv2.rectangle(frame, (x, y), (x + width, y + height), (0, 255, 0), 2)
            
            # Save face image
            face_img = frame[y:y+height, x:x+width]
            
            # Выполняем предсказания только каждые PREDICTION_INTERVAL кадров
            if frame_counter % PREDICTION_INTERVAL == 0:
                try:
                    # Liveness check
                    if liveness_model:
                        preprocessed = preprocess_frame(frame)
                        liveness_pred = liveness_model.predict(preprocessed, verbose=0)[0]
                        liveness_idx = np.argmax(liveness_pred)
                        detection_results['liveness_status'] = LIVENESS_CLASSES[liveness_idx]
                        detection_results['is_real'] = (LIVENESS_CLASSES[liveness_idx] == "Real")
                        print(f"Liveness prediction: {LIVENESS_CLASSES[liveness_idx]}, is_real: {detection_results['is_real']}")
                    else:
                        print("Liveness model not loaded")
                except Exception as e:
                    print(f"Ошибка в liveness_model.predict(): {e}")
                    detection_results['liveness_status'] = 'Error'
                    detection_results['is_real'] = False
                
                try:
                    # Accessory check
                    if accessory_model and face_img.size > 0:
                        processed_img = preprocess_image(face_img)
                        accessory_pred = accessory_model.predict(processed_img, verbose=0)[0]
                        self.accessory_queue.append(accessory_pred)
                        
                        if len(self.accessory_queue) > 5:
                            avg_probs = np.mean(self.accessory_queue, axis=0)
                            accessory_idx = np.argmax(avg_probs)
                            detection_results['accessory_status'] = ACCESSORY_CLASSES[accessory_idx]
                            detection_results['no_accessories'] = (ACCESSORY_CLASSES[accessory_idx] == "No Accessory")
                            print(f"Accessory prediction: {ACCESSORY_CLASSES[accessory_idx]}, no_accessories: {detection_results['no_accessories']}")
                    else:
                        print("Accessory model not loaded or face_img empty")
                except Exception as e:
                    print(f"Ошибка в accessory_model.predict(): {e}")
                    detection_results['accessory_status'] = 'Error'
                    detection_results['no_accessories'] = False
        
        # Устанавливаем test_start_time
        if detection_results['test_start_time'] is None:
            detection_results['test_start_time'] = time.time()
            print("Тест начался, test_start_time установлен:", detection_results['test_start_time'])
        
        elapsed_time = time.time() - detection_results['test_start_time']
        print(f"Прошло времени: {elapsed_time:.2f} секунд, Test passed: {detection_results['test_passed']}")
        
        if elapsed_time >= TEST_DURATION:
            detection_results['test_passed'] = detection_results['is_real'] and detection_results['no_accessories']
            if detection_results['test_passed']:
                detection_results['redirect_pending'] = True
                print("Тест завершен успешно, redirect_pending установлен в True")
            else:
                print("Тест завершен, но не пройден: is_real =", detection_results['is_real'], ", no_accessories =", detection_results['no_accessories'])
        
        # Add information to frame
        status_color = (0, 255, 0) if detection_results['is_real'] else (0, 0, 255)
        cv2.putText(frame, f"Liveness: {detection_results['liveness_status']}", 
                   (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, status_color, 2)
        
        accessory_color = (0, 255, 0) if detection_results['no_accessories'] else (0, 0, 255)
        cv2.putText(frame, f"Accessory: {detection_results['accessory_status']}", 
                   (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, accessory_color, 2)
        
        remaining_time = max(0, TEST_DURATION - elapsed_time)
        cv2.putText(frame, f"Time remaining: {int(remaining_time)}s", 
                   (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        if detection_results['test_passed']:
            cv2.putText(frame, "TEST PASSED!", (10, 120), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        else:
            cv2.putText(frame, "TEST FAILED!", (10, 120), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
        
        return frame

def gen(camera):
    while True:
        frame = camera.get_frame(process_detection=True)
        if frame is None:
            print("Прекращаем поток видео: кадры не поступают.")
            break
        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')

# Flask routes
@app.route('/')
def index():
    return render_template('index.html')

@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')
        
        if not username or not password:
            return render_template('register.html', error="Заполните все поля")
        
        success, message = save_user(username, password)
        if success:
            session['username'] = username
            session['stage'] = 'liveness_test'
            detection_results['test_start_time'] = None
            detection_results['test_passed'] = False
            detection_results['redirect_pending'] = False
            detection_results['video_active'] = True
            return redirect(url_for('liveness_test'))
        else:
            return render_template('register.html', error=message)
    
    return render_template('register.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')
        
        success, user_data = verify_user(username, password)
        if success:
            session['username'] = username
            session['stage'] = 'face_verification'
            detection_results['test_start_time'] = None
            detection_results['test_passed'] = False
            detection_results['redirect_pending'] = False
            detection_results['video_active'] = True
            return redirect(url_for('face_verification'))
        else:
            return render_template('login.html', error=user_data)
    
    return render_template('login.html')

@app.route('/liveness_test')
def liveness_test():
    if 'username' not in session:
        return redirect(url_for('index'))
    
    username = session['username']
    return render_template('liveness_test.html', username=username)

@app.route('/face_verification')
def face_verification():
    if 'username' not in session:
        return redirect(url_for('index'))
    
    username = session['username']
    return render_template('face_verification.html', username=username)

@app.route('/video_feed')
def video_feed():
    global current_camera
    if not current_camera:
        current_camera = VideoCamera()
    return Response(gen(current_camera),
                    mimetype='multipart/x-mixed-replace; boundary=frame')

@app.route('/get_detection_status')
def get_detection_status():
    global detection_results
    elapsed_time = 0
    if detection_results['test_start_time']:
        elapsed_time = time.time() - detection_results['test_start_time']
    else:
        print("test_start_time не установлен!")
    
    # Проверяем, завершился ли тест
    if elapsed_time >= TEST_DURATION and not detection_results['test_passed']:
        detection_results['test_passed'] = detection_results['is_real'] and detection_results['no_accessories']
        print(f"Тест завершен: test_passed={detection_results['test_passed']}, is_real={detection_results['is_real']}, no_accessories={detection_results['no_accessories']}")
        if detection_results['test_passed']:
            detection_results['redirect_pending'] = True
            print("Тест успешен, redirect_pending установлен в True")
    
    # Если перенаправление уже ожидается
    if detection_results['redirect_pending']:
        print("Отправляем auto_redirect: тест уже пройден.")
        response = {
            'auto_redirect': True,
            'redirect_url': url_for('dashboard'),
            'message': 'Тест пройден'
        }
        print("Ответ от /get_detection_status:", response)
        return jsonify(response)
    
    response = {
        **detection_results,
        'remaining_time': max(0, TEST_DURATION - elapsed_time),
        'test_completed': elapsed_time >= TEST_DURATION
    }
    print("Ответ от /get_detection_status:", response)
    return jsonify(response)

@app.route('/dashboard')
def dashboard():
    if 'username' not in session:
        return redirect(url_for('index'))
    
    username = session['username']
    return render_template('dashboard.html', username=username)

@app.route('/logout')
def logout():
    session.clear()
    global current_camera
    if current_camera:
        del current_camera
        current_camera = None
    detection_results['test_start_time'] = None
    detection_results['test_passed'] = False
    detection_results['redirect_pending'] = False
    detection_results['video_active'] = True
    return redirect(url_for('index'))

# Server thread class for Jupyter
class ServerThread(threading.Thread):
    def __init__(self, app):
        threading.Thread.__init__(self)
        self.server = make_server('127.0.0.1', 5000, app)
        self.ctx = app.app_context()
        self.ctx.push()

    def run(self):
        print('Server started at http://localhost:5000')
        self.server.serve_forever()

    def shutdown(self):
        self.server.shutdown()

# Jupyter functions
def start_server():
    """Start server in Jupyter"""
    global server
    server = ServerThread(app)
    server.start()
    print("Server started! Open http://localhost:5000")
    print("To stop the server, use: stop_server()")

def stop_server():
    """Stop server in Jupyter"""
    global server, current_camera
    if current_camera:
        del current_camera
        current_camera = None
    server.shutdown()
    print("Server stopped")

# Check if running directly (not in Jupyter)
if __name__ == '__main__' and '__file__' in globals():
    # Regular Python run
    app.run(debug=True, host='0.0.0.0', port=5000)
else:
    # Jupyter run
    print("\n" + "="*50)
    print("Flask application ready to run in Jupyter!")
    print("="*50)
    print("\nTo start the server, run:")
    print(">>> start_server()")
    print("\nTo stop the server, run:")
    print(">>> stop_server()")
    print("="*50)

✓ Liveness model loaded
✓ Accessory model loaded

Flask application ready to run in Jupyter!

To start the server, run:
>>> start_server()

To stop the server, run:
>>> stop_server()


In [2]:
start_server()

Server started at http://localhost:5000
Server started! Open http://localhost:5000
To stop the server, use: stop_server()


In [8]:
stop_server()

Server stopped
