In [1]:
# Import necessary libraries
import cv2
import numpy as np
import pandas as pd
from collections import defaultdict
import matplotlib.pyplot as plt
import seaborn as sns
import time
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler

In [None]:
import cv2
import numpy as np
import time
from deep_sort_realtime.deepsort_tracker import DeepSort
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict

# Initialize DeepSORT tracker
tracker = DeepSort(max_age=30)

class CustomerTracker:
    def __init__(self, store_layout_zones=None, dwell_time_threshold=60):
        self.tracks = defaultdict(list)
        self.dwell_times = defaultdict(float)
        self.heatmap_data = np.zeros((720, 1280))
        self.store_zones = store_layout_zones or {
            'entrance': [(0, 0, 400, 720)],
            'checkout': [(880, 0, 1280, 720)],
            'aisle': [(400, 0, 880, 720)]
        }
        self.zone_visits = defaultdict(int)
        self.last_positions = {}
        self.start_times = {}
        self.dwell_time_threshold = dwell_time_threshold

    def update_tracking(self, person_id, x, y, w, h):
        center_x, center_y = x + w // 2, y + h // 2
        self.tracks[person_id].append((center_x, center_y))
        self.heatmap_data[max(0, y):min(720, y + h), max(0, x):min(1280, x + w)] += 1

        current_zone = self.get_current_zone(center_x, center_y)
        if person_id not in self.last_positions:
            self.last_positions[person_id] = current_zone
            self.start_times[person_id] = time.time()
        else:
            if self.last_positions[person_id] != current_zone:
                dwell_time = time.time() - self.start_times[person_id]
                self.dwell_times[self.last_positions[person_id]] += dwell_time
                self.start_times[person_id] = time.time()
                self.zone_visits[current_zone] += 1

            if self.dwell_times.get(self.last_positions[person_id], 0) > self.dwell_time_threshold:
                self.trigger_alert(self.last_positions[person_id], self.dwell_times[self.last_positions[person_id]])

            self.last_positions[person_id] = current_zone

    def trigger_alert(self, zone, dwell_time):
        print(f"Alert: Customer has been in {zone} for {dwell_time:.2f} seconds. Assistance or restocking may be needed.")

    def get_current_zone(self, x, y):
        for zone_name, boxes in self.store_zones.items():
            for (x1, y1, x2, y2) in boxes:
                if x1 <= x <= x2 and y1 <= y <= y2:
                    return zone_name
        return 'other'

    def generate_heatmap(self):
        plt.figure(figsize=(12, 8))
        sns.heatmap(self.heatmap_data, cmap='YlOrRd')
        plt.title('Customer Movement Heatmap')
        plt.savefig('customer_heatmap.png')
        plt.close()

# Video analysis function
def analyze_video(video_path):
    net = cv2.dnn.readNet("yolov4-tiny.weights", "yolov4-tiny.cfg")
    with open("coco.names", "r") as f:
        classes = [line.strip() for line in f.readlines()]
    
    tracker_obj = CustomerTracker()
    cap = cv2.VideoCapture(video_path)
    
    if not cap.isOpened():
        print("Error: Could not open video file")
        return

    # Define the video writer for fast output
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  
    output_video = cv2.VideoWriter("output_fast.mp4", fourcc, 30.0, 
                                   (int(cap.get(3)), int(cap.get(4))))

    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_count += 1
        if frame_count % 2 != 0:  # Skip every other frame for speed
            continue
        
        height, width, _ = frame.shape
        frame_resized = cv2.resize(frame, (640, 480))  # Process smaller frame
        blob = cv2.dnn.blobFromImage(frame_resized, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
        net.setInput(blob)
        outs = net.forward(net.getUnconnectedOutLayersNames())
        
        boxes, confidences, class_ids = [], [], []
        for out in outs:
            for detection in out:
                scores = detection[5:]
                class_id = np.argmax(scores)
                confidence = scores[class_id]
                if confidence > 0.3 and classes[class_id] == "person":
                    center_x = int(detection[0] * width)
                    center_y = int(detection[1] * height)
                    w = int(detection[2] * width)
                    h = int(detection[3] * height)
                    x = int(center_x - w / 2)
                    y = int(center_y - h / 2)
                    boxes.append([x, y, w, h])
                    confidences.append(float(confidence))
                    class_ids.append(class_id)
        
        indexes = cv2.dnn.NMSBoxes(boxes, confidences, 0.3, 0.4)
        detections = []
        
        if indexes is not None and len(indexes) > 0:
            for i in indexes.flatten():
                if 0 <= i < len(boxes):
                    x, y, w, h = boxes[i]
                    detections.append(([x, y, x + w, y + h], confidences[i], class_ids[i]))
        
        tracked_objects = tracker.update_tracks(detections, frame=frame)
        for track in tracked_objects:
            if not track.is_confirmed():
                continue
            track_id, bbox = track.track_id, track.to_tlbr()
            x1, y1, x2, y2 = map(int, bbox)
            tracker_obj.update_tracking(track_id, x1, y1, x2 - x1, y2 - y1)
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, f"ID {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        output_video.write(frame)  # Save processed frame to output video
    
    cap.release()
    output_video.release()
    cv2.destroyAllWindows()
    tracker_obj.generate_heatmap()

# Run analysis
video_paths = [
    "D:/Rohit/Rohit/video/HD CCTV Camera video 3MP 4MP iProx CCTV HDCCTVCamerasnet retail store.mp4",
    "D:/Rohit/Rohit/video/HD CCTV Camera video 3MP 4MP iProx CCTV HDCCTVCamerasnet retail store-3-23.mp4",
    "D:/Rohit/Rohit/video/new_codec_HD CCTV Camera video 3MP 4MP iProx CCTV HDCCTVCamerasnet retail store-3-23.mp4"
]

for video_path in video_paths:
    analyze_video(video_path)  # Call function separately for each video


Alert: Customer has been in aisle for 62.09 seconds. Assistance or restocking may be needed.
Alert: Customer has been in aisle for 62.09 seconds. Assistance or restocking may be needed.
Alert: Customer has been in aisle for 62.09 seconds. Assistance or restocking may be needed.
Alert: Customer has been in aisle for 62.09 seconds. Assistance or restocking may be needed.
Alert: Customer has been in aisle for 62.09 seconds. Assistance or restocking may be needed.
Alert: Customer has been in aisle for 62.09 seconds. Assistance or restocking may be needed.
Alert: Customer has been in aisle for 62.09 seconds. Assistance or restocking may be needed.
Alert: Customer has been in aisle for 62.09 seconds. Assistance or restocking may be needed.
Alert: Customer has been in aisle for 62.09 seconds. Assistance or restocking may be needed.
Alert: Customer has been in aisle for 62.09 seconds. Assistance or restocking may be needed.
Alert: Customer has been in aisle for 62.09 seconds. Assistance or res