In [None]:
!pip install pyngrok
from pyngrok import ngrok
# Ganti YOUR_AUTH_TOKEN dengan token dari dashboard ngrok Anda
ngrok.set_auth_token("TOKEN")
# Start ngrok
public_url = ngrok.connect(8765)
print('Public URL:', public_url)

In [None]:
pip install websockets numpy opencv-python deepface nest_asyncio

In [None]:
pip install boto3

In [None]:
import asyncio
import websockets
import base64
import numpy as np
import cv2
from deepface import DeepFace
import json
from datetime import datetime
import boto3
from botocore.exceptions import ClientError
import nest_asyncio
from boto3.dynamodb.conditions import Key, Attr

# AWS credentials and constants
AWS_ACCESS_KEY = 'PASTEKEYANDA'
AWS_SECRET_KEY = 'PASTEKEYANDA'
AWS_REGION = 'ap-southeast-1'
BUCKET_NAME = 'my-face-images'
TABLE_NAME = 'Attendance'

# Setup AWS clients
s3_client = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION
)

dynamodb = boto3.resource(
    'dynamodb',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION
)

def check_image_format(image, name=""):
    """Helper function to check and print image format details"""
    print(f"\nChecking image format for {name}:")
    print(f"Shape: {image.shape}")
    print(f"Data type: {image.dtype}")
    return len(image.shape) == 3 and image.shape[2] == 3

def create_attendance_table():
    """Create DynamoDB table if it doesn't exist"""
    try:
        table = dynamodb.create_table(
            TableName=TABLE_NAME,
            KeySchema=[
                {
                    'AttributeName': 'PersonName',
                    'KeyType': 'HASH'  # Partition key
                },
                {
                    'AttributeName': 'Timestamp',
                    'KeyType': 'RANGE'  # Sort key
                }
            ],
            AttributeDefinitions=[
                {
                    'AttributeName': 'PersonName',
                    'AttributeType': 'S'
                },
                {
                    'AttributeName': 'Timestamp',
                    'AttributeType': 'S'
                },
                {
                    'AttributeName': 'AttendanceDate',
                    'AttributeType': 'S'
                }
            ],
            GlobalSecondaryIndexes=[
                {
                    'IndexName': 'AttendanceDateIndex',
                    'KeySchema': [
                        {
                            'AttributeName': 'AttendanceDate',
                            'KeyType': 'HASH'
                        },
                        {
                            'AttributeName': 'Timestamp',
                            'KeyType': 'RANGE'
                        }
                    ],
                    'Projection': {
                        'ProjectionType': 'ALL'
                    },
                    'ProvisionedThroughput': {
                        'ReadCapacityUnits': 5,
                        'WriteCapacityUnits': 5
                    }
                }
            ],
            ProvisionedThroughput={
                'ReadCapacityUnits': 5,
                'WriteCapacityUnits': 5
            }
        )
        print("Table created successfully")
        return table
    except ClientError as e:
        if e.response['Error']['Code'] == 'ResourceInUseException':
            print("Table already exists")
            return dynamodb.Table(TABLE_NAME)
        else:
            print(f"Error creating table: {e}")
            raise

async def get_reference_images():
    """Get and process reference images from S3"""
    reference_images = {}

    try:
        response = s3_client.list_objects_v2(
            Bucket=BUCKET_NAME,
            Prefix='faces/'
        )

        for obj in response.get('Contents', []):
            try:
                image_data = s3_client.get_object(
                    Bucket=BUCKET_NAME,
                    Key=obj['Key']
                )['Body'].read()

                # Decode image
                image_array = np.frombuffer(image_data, np.uint8)
                image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)

                # Convert BGR to RGB
                image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

                # Check image format
                if check_image_format(image, obj['Key']):
                    print("✓ Image is in correct RGB format")
                else:
                    print("✗ Image is not in RGB format, converting...")
                    if len(image.shape) == 2:  # Grayscale
                        image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
                    elif image.shape[2] == 4:  # RGBA
                        image = cv2.cvtColor(image, cv2.COLOR_RGBA2RGB)

                person_name = obj['Key'].split('/')[-1].split('.')[0]
                reference_images[person_name] = image
                print(f"✓ Loaded reference image for: {person_name}")

            except Exception as e:
                print(f"Error loading image {obj['Key']}: {e}")
                continue

    except ClientError as e:
        print(f"Error accessing S3: {e}")

    return reference_images

async def get_last_attendance(person_name):
    """Get last attendance record for a person"""
    table = dynamodb.Table(TABLE_NAME)
    today = datetime.now().strftime('%Y-%m-%d')

    try:
        response = table.query(
            KeyConditionExpression=Key('PersonName').eq(person_name),
            FilterExpression=Attr('AttendanceDate').eq(today),
            ScanIndexForward=False,  # descending order
            Limit=1
        )

        if response['Items']:
            return datetime.fromisoformat(response['Items'][0]['Timestamp'])
        return None

    except ClientError as e:
        print(f"Error querying last attendance: {e}")
        return None

async def can_record_attendance(person_name):
    """Check if enough time has passed since last attendance"""
    last_attendance = await get_last_attendance(person_name)

    if last_attendance is None:
        return True

    current_time = datetime.now()
    time_diff = current_time - last_attendance

    # Return True if more than 1 hour has passed
    # You can modify this value (3600 seconds = 1 hour)
    return time_diff.total_seconds() > 3600

async def record_attendance(person_name, timestamp):
    """Record attendance in DynamoDB with rate limiting"""
    if not await can_record_attendance(person_name):
        print(f"Skipping attendance for {person_name} - too soon since last record")
        return False

    table = dynamodb.Table(TABLE_NAME)
    attendance_date = datetime.now().strftime('%Y-%m-%d')

    try:
        response = table.put_item(
            Item={
                'PersonName': person_name,
                'Timestamp': timestamp,
                'AttendanceDate': attendance_date
            }
        )
        print(f"✓ Recorded attendance for {person_name} at {timestamp}")
        return True
    except ClientError as e:
        print(f"Error recording attendance: {e}")
        return False

def decode_frame(frame_base64):
    """Decode base64 frame to image"""
    frame_bytes = base64.b64decode(frame_base64)
    frame_array = np.frombuffer(frame_bytes, dtype=np.uint8)
    frame = cv2.imdecode(frame_array, cv2.IMREAD_COLOR)
    return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert to RGB

async def handle_websocket(websocket):
    """Handle WebSocket connections and process frames"""
    print("Client connected!")
    reference_images = await get_reference_images()

    try:
        while True:
            try:
                frame_base64 = await websocket.recv()
                frame = decode_frame(frame_base64)

                if not check_image_format(frame, "webcam frame"):
                    print("⚠ Warning: Webcam frame is not in RGB format")
                    continue

                results = []
                try:
                    faces = DeepFace.extract_faces(
                        frame,
                        enforce_detection=False,
                        align=True,
                        detector_backend='opencv'
                    )

                    for face_dict in faces:
                        face_img = face_dict['face']
                        facial_area = face_dict.get('facial_area', {})

                        bbox = [
                            facial_area.get('x', 0),
                            facial_area.get('y', 0),
                            facial_area.get('w', 0),
                            facial_area.get('h', 0)
                        ]

                        if len(face_img.shape) == 3 and face_img.shape[2] == 3:
                            for person_name, ref_img in reference_images.items():
                                try:
                                    result = DeepFace.verify(
                                        img1_path=face_img,
                                        img2_path=ref_img,
                                        enforce_detection=False,
                                        model_name="VGG-Face",
                                        distance_metric="cosine"
                                    )

                                    if result['verified']:
                                        timestamp = datetime.now().isoformat()
                                        if await record_attendance(person_name, timestamp):
                                            results.append({
                                                'name': person_name,
                                                'timestamp': timestamp,
                                                'confidence': float(result['distance']),
                                                'bbox': bbox
                                            })
                                            print(f"✓ Verified and recorded match for {person_name}")
                                except Exception as e:
                                    print(f"Verification error for {person_name}: {str(e)}")
                                    continue

                    await websocket.send(json.dumps(results))

                except Exception as e:
                    print(f"Face detection error: {str(e)}")
                    await websocket.send(json.dumps({"error": f"Face detection failed: {str(e)}"}))

            except Exception as e:
                print(f"Frame processing error: {str(e)}")
                await websocket.send(json.dumps({"error": f"Error processing frame: {str(e)}"}))

    except websockets.exceptions.ConnectionClosed:
        print("Client disconnected")
    except Exception as e:
        print(f"Unexpected error in handler: {str(e)}")

async def main():
    """Main function to run the server"""
    # Create DynamoDB table before starting server
    attendance_table = create_attendance_table()
    print("DynamoDB table is ready")

    async with websockets.serve(handle_websocket, '0.0.0.0', 8765):
        print("Server started at ws://localhost:8765")
        await asyncio.Future()  # run forever

if __name__ == "__main__":
    nest_asyncio.apply()
    asyncio.run(main())