<a href="https://colab.research.google.com/github/finduglobe/studies/blob/main/markakod_aws.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install boto3 opencv-python pillow numpy

In [None]:
import boto3
import json
import datetime
import pandas as pd
from botocore.exceptions import ClientError
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class Customer:
    face_id: str
    start_time: datetime.datetime
    last_seen_time: datetime.datetime
    queue_zone: str
    visit_history: List[str]
    loyalty_status: Optional[str] = None

@dataclass
class QueueZone:
    zone_id: str
    name: str
    wait_threshold: int  # seconds
    camera_ids: List[str]

class EnhancedQueueMonitoringSystem:
    def __init__(self, config_path: str, region: str = "eu-west-1"):
        # AWS clients
        self.kinesis_client = boto3.client('kinesisvideo', region_name=region)
        self.rekognition_client = boto3.client('rekognition', region_name=region)
        self.sns_client = boto3.client('sns', region_name=region)
        self.dynamodb = boto3.resource('dynamodb', region_name=region)

        # System state
        self.cameras: Dict[str, str] = {}  # camera_id: stream_arn
        self.zones: Dict[str, QueueZone] = {}
        self.customers: Dict[str, Customer] = {}
        self.customer_history = self.dynamodb.Table('CustomerHistory')

        # Load configuration
        self.load_config(config_path)

    def load_config(self, config_path: str):
        """Load system configuration from JSON file"""
        with open(config_path) as f:
            config = json.load(f)

        # Initialize queue zones
        for zone_config in config['zones']:
            zone = QueueZone(
                zone_id=zone_config['id'],
                name=zone_config['name'],
                wait_threshold=zone_config['threshold'],
                camera_ids=zone_config['cameras']
            )
            self.zones[zone.zone_id] = zone

        # Initialize cameras
        for camera_config in config['cameras']:
            stream_arn = self.start_stream(camera_config['id'])
            self.cameras[camera_config['id']] = stream_arn

    def start_stream(self, camera_id: str) -> str:
        """Start Kinesis Video Stream for a camera"""
        try:
            stream_info = self.kinesis_client.create_stream(
                StreamName=f"camera-{camera_id}",
                DataRetentionInHours=24,
                MediaType='video/h264'
            )
            return stream_info['StreamARN']
        except ClientError as e:
            print(f"Stream creation error for camera {camera_id}: {e}")
            return ""

    def process_video_frame(self, camera_id: str, frame_data: bytes, zone_id: str):
        """Process a video frame from a specific camera in a specific zone"""
        try:
            # Face detection and recognition
            face_response = self.rekognition_client.detect_faces(
                Image={'Bytes': frame_data},
                Attributes=['ALL']
            )

            # Additional object detection for queue analysis
            object_response = self.rekognition_client.detect_labels(
                Image={'Bytes': frame_data},
                MaxLabels=10
            )

            current_time = datetime.datetime.now()

            for face in face_response['FaceDetails']:
                face_id = face['FaceId']

                if face_id not in self.customers:
                    # New customer detection
                    customer = Customer(
                        face_id=face_id,
                        start_time=current_time,
                        last_seen_time=current_time,
                        queue_zone=zone_id,
                        visit_history=[zone_id]
                    )
                    self.customers[face_id] = customer
                    self.update_customer_history(face_id, "entered", zone_id)
                else:
                    # Update existing customer
                    customer = self.customers[face_id]
                    customer.last_seen_time = current_time

                    if customer.queue_zone != zone_id:
                        customer.queue_zone = zone_id
                        customer.visit_history.append(zone_id)
                        self.update_customer_history(face_id, "moved", zone_id)

                # Check waiting time threshold
                waiting_time = (current_time - customer.start_time).total_seconds()
                zone = self.zones[zone_id]

                if waiting_time > zone.wait_threshold:
                    self.send_alert(customer, waiting_time, zone)

            return True
        except ClientError as e:
            print(f"Frame processing error: {e}")
            return False

    def update_customer_history(self, customer_id: str, action: str, zone_id: str):
        """Update customer history in DynamoDB"""
        try:
            self.customer_history.put_item(
                Item={
                    'customer_id': customer_id,
                    'timestamp': datetime.datetime.now().isoformat(),
                    'action': action,
                    'zone_id': zone_id
                }
            )
        except ClientError as e:
            print(f"History update error: {e}")

    def send_alert(self, customer: Customer, waiting_time: float, zone: QueueZone):
        """Send alert for long wait times"""
        try:
            message = {
                'customer_id': customer.face_id,
                'waiting_time_minutes': waiting_time / 60,
                'zone_name': zone.name,
                'visit_history': customer.visit_history,
                'loyalty_status': customer.loyalty_status,
                'timestamp': datetime.datetime.now().isoformat()
            }

            self.sns_client.publish(
                TopicArn='arn:aws:sns:eu-west-1:123456789012:QueueAlerts',
                Message=json.dumps(message),
                Subject=f'Long Wait Alert - {zone.name}'
            )
        except ClientError as e:
            print(f"Alert sending error: {e}")

    def generate_report(self, start_time: datetime.datetime, end_time: datetime.datetime) -> Dict:
        """Generate analytics report"""
        try:
            # Query customer history
            response = self.customer_history.scan(
                FilterExpression='timestamp BETWEEN :start AND :end',
                ExpressionAttributeValues={
                    ':start': start_time.isoformat(),
                    ':end': end_time.isoformat()
                }
            )

            df = pd.DataFrame(response['Items'])

            report = {
                'period': {
                    'start': start_time.isoformat(),
                    'end': end_time.isoformat()
                },
                'total_customers': len(df['customer_id'].unique()),
                'zone_statistics': {},
                'average_wait_times': {},
                'peak_hours': {}
            }

            # Calculate zone-specific statistics
            for zone_id in self.zones:
                zone_data = df[df['zone_id'] == zone_id]
                report['zone_statistics'][zone_id] = {
                    'total_visits': len(zone_data),
                    'unique_customers': len(zone_data['customer_id'].unique())
                }

            return report
        except Exception as e:
            print(f"Report generation error: {e}")
            return {}

    def cleanup(self):
        """Clean up old records"""
        current_time = datetime.datetime.now()
        expired_customers = []

        for customer_id, customer in self.customers.items():
            if (current_time - customer.last_seen_time).total_seconds() > 3600:  # 1 hour
                expired_customers.append(customer_id)
                self.update_customer_history(customer_id, "exited", customer.queue_zone)

        for customer_id in expired_customers:
            del self.customers[customer_id]

In [None]:
import unittest
from datetime import datetime, timedelta
import json
import os

class TestQueueMonitoringSystem(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        # Create test configuration
        cls.test_config = {
            "zones": [
                {
                    "id": "main-queue",
                    "name": "Main Queue",
                    "threshold": 1800,
                    "cameras": ["cam1", "cam2"]
                },
                {
                    "id": "vip-queue",
                    "name": "VIP Queue",
                    "threshold": 900,
                    "cameras": ["cam3"]
                }
            ],
            "cameras": [
                {"id": "cam1", "location": "entrance"},
                {"id": "cam2", "location": "middle"},
                {"id": "cam3", "location": "vip"}
            ]
        }

        # Save test configuration
        with open('test_config.json', 'w') as f:
            json.dump(cls.test_config, f)

    def setUp(self):
        self.system = EnhancedQueueMonitoringSystem('test_config.json')

    def test_zone_initialization(self):
        """Test if zones are properly initialized"""
        self.assertEqual(len(self.system.zones), 2)
        self.assertIn('main-queue', self.system.zones)
        self.assertIn('vip-queue', self.system.zones)

    def test_camera_initialization(self):
        """Test if cameras are properly initialized"""
        self.assertEqual(len(self.system.cameras), 3)
        self.assertIn('cam1', self.system.cameras)

    def test_customer_tracking(self):
        """Test customer detection and tracking"""
        # Simulate a frame with a new customer
        dummy_frame = b'dummy_frame_data'
        self.system.process_video_frame('cam1', dummy_frame, 'main-queue')

        # Verify customer count
        self.assertGreaterEqual(len(self.system.customers), 0)

    def test_zone_transfer(self):
        """Test customer moving between zones"""
        # Add test customer
        customer_id = "test_customer"
        self.system.customers[customer_id] = Customer(
            face_id=customer_id,
            start_time=datetime.now(),
            last_seen_time=datetime.now(),
            queue_zone="main-queue",
            visit_history=["main-queue"]
        )

        # Simulate movement to VIP queue
        dummy_frame = b'dummy_frame_data'
        self.system.process_video_frame('cam3', dummy_frame, 'vip-queue')

        # Verify zone transfer
        customer = self.system.customers.get(customer_id)
        if customer:
            self.assertIn('vip-queue', customer.visit_history)

    def test_report_generation(self):
        """Test analytics report generation"""
        start_time = datetime.now() - timedelta(hours=1)
        end_time = datetime.now()

        report = self.system.generate_report(start_time, end_time)

        self.assertIsInstance(report, dict)
        self.assertIn('period', report)
        self.assertIn('zone_statistics', report)

    def test_cleanup(self):
        """Test cleanup of old records"""
        # Add test customer with old timestamp
        customer_id = "old_customer"
        self.system.customers[customer_id] = Customer(
            face_id=customer_id,
            start_time=datetime.now() - timedelta(hours=2),
            last_seen_time=datetime.now() - timedelta(hours=2),
            queue_zone="main-queue",
            visit_history=["main-queue"]
        )

        # Run cleanup
        self.system.cleanup()

        # Verify customer was removed
        self.assertNotIn(customer_id, self.system.customers)

    @classmethod
    def tearDownClass(cls):
        # Clean up test configuration file
        if os.path.exists('test_config.json'):
            os.remove('test_config.json')

if __name__ == '__main__':
    unittest.main()

E
ERROR: /root/ (unittest.loader._FailedTest)
----------------------------------------------------------------------
AttributeError: module '__main__' has no attribute '/root/'

----------------------------------------------------------------------
Ran 1 test in 0.002s

FAILED (errors=1)


SystemExit: True

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
# Testi çalıştırma
if __name__ == '__main__':
    unittest.main()

# Manuel test için
system = EnhancedQueueMonitoringSystem('config.json')

# Kamera simülasyonu
dummy_frame = b'dummy_frame_data'
system.process_video_frame('cam1', dummy_frame, 'main-queue')

# Rapor oluşturma
start_time = datetime.now() - timedelta(hours=24)
end_time = datetime.now()
report = system.generate_report(start_time, end_time)
print(json.dumps(report, indent=2))