In [None]:
import cv2
import numpy as np

In [None]:
video_path = r'C:\Users\es25591\Workspace\360dataset\content\saliency\coaster_saliency.mp4'
cap = cv2.VideoCapture(video_path)

# Check if video opened successfully
if cap.isOpened():
    print("Video loaded successfully")
    print(f"Frame count: {int(cap.get(cv2.CAP_PROP_FRAME_COUNT))}")
    print(f"FPS: {cap.get(cv2.CAP_PROP_FPS)}")
    print(f"Frame size: {int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))} x {int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}")
else:
    print("Error: Could not open video")

In [None]:
count = 0
attention_frames = []
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

while True:
    ret_f, f = cap.read()
    if not ret_f:
        break
    gray = cv2.cvtColor(f, cv2.COLOR_BGR2GRAY)
    
    attention_frames.append(gray)
    
    # print(f"Frame {count}: shape={gray.shape}, dtype={gray.dtype}")
    count += 1

In [None]:
class CacheService:
    """
    LRU Cache with a fixed disk capacity (in bytes) instead of a fixed
    number of items.

    Items are evicted (least recently used first) until there is enough
    free capacity to store the new item. Size is estimated using:
      - numpy arrays: value.nbytes
      - bytes / bytearray: len(value)
      - str: len(value.encode('utf-8'))
      - fallback: 1 (minimal placeholder)
    """
    def __init__(self, capacity_bytes=5_000_000):  # default ~5 MB
        self.capacity_bytes = int(capacity_bytes)
        self.cache = {}            # key -> (value, size_bytes)
        self.access_order = []     # LRU ordering (oldest at index 0)
        self.used_bytes = 0

    def _estimate_size(self, value):
        try:
            import numpy as _np
            if isinstance(value, _np.ndarray):
                return int(value.nbytes)
        except Exception:
            pass
        if isinstance(value, (bytes, bytearray)):
            return len(value)
        if isinstance(value, str):
            return len(value.encode('utf-8'))
        # Fallback minimal size
        return 1

    def available_bytes(self):
        return self.capacity_bytes - self.used_bytes

    def usage_ratio(self):
        if self.capacity_bytes == 0:
            return 0.0
        return self.used_bytes / self.capacity_bytes

    def get(self, key):
        if key in self.cache:
            # Update access order for LRU
            self.access_order.remove(key)
            self.access_order.append(key)
            return self.cache[key][0]
        return None

    def put(self, key, value):
        size = self._estimate_size(value)

        if size > self.capacity_bytes:
            print(f"[Cache] Skipping '{key}': item size {size} > capacity {self.capacity_bytes}")
            return False

        if key in self.cache:
            old_val, old_size = self.cache[key]
            self.access_order.remove(key)
            self.used_bytes -= old_size
            del self.cache[key]

        # Evict LRU until enough space
        while self.used_bytes + size > self.capacity_bytes and self.access_order:
            lru_key = self.access_order.pop(0)
            lru_val, lru_size = self.cache[lru_key]
            self.used_bytes -= lru_size
            del self.cache[lru_key]
            print(f"[Cache] Evicted '{lru_key}' to free {lru_size} bytes")

        # Store new item
        self.cache[key] = (value, size)
        self.access_order.append(key)
        self.used_bytes += size
        return True

    def clear(self):
        self.cache.clear()
        self.access_order.clear()
        self.used_bytes = 0

    def __len__(self):
        return len(self.cache)

    def __repr__(self):
        return (f"CacheService(items={len(self.cache)}, used={self.used_bytes}B, "
                f"capacity={self.capacity_bytes}B, usage={self.usage_ratio():.2%})")

In [None]:
# Create cache service instance with 2 MB capacity
cache = CacheService(capacity_bytes=20_000_000)

if __name__ == "__main__":
    print("=== Cache Service (Capacity-Based) Simulation ===\n")
    print("Initial:", cache)

    # Helper to report status
    def report(label):
        print(f"{label}: items={len(cache)}, used={cache.used_bytes}B, avail={cache.available_bytes()}B, usage={cache.usage_ratio():.2%}")

    # Add some frames to cache
    print("1. Adding frames to cache (attempting first 15 frames):")
    for i in range(0, 15):
        ok = cache.put(f"frame_{i}", attention_frames[i])
        status = "stored" if ok else "skipped"
        print(f"   {status:7} frame_{i} (size={cache._estimate_size(attention_frames[i])}B)")
    report("After initial load")

    # Access some cached frames (testing LRU reorder)
    print("\n2. Accessing some frames to update LRU order:")
    for i in [2, 5, 3]:
        fr = cache.get(f"frame_{i}")
        print(f"   Access frame_{i}: {'hit' if fr is not None else 'miss'}")
    report("After accesses")

    # Add more frames to trigger eviction
    print("\n3. Adding more frames to trigger LRU evictions:")
    for i in range(30, 40):
        ok = cache.put(f"frame_{i}", attention_frames[i])
        status = "stored" if ok else "skipped"
        print(f"   {status:7} frame_{i} (size={cache._estimate_size(attention_frames[i])}B)")
    report("After evictions")

    # Check presence of earlier frames
    print("\n4. Checking if early frames remain:")
    for i in [0, 2, 5, 10]:
        fr = cache.get(f"frame_{i}")
        print(f"   frame_{i}: {'present' if fr is not None else 'evicted'}")
    report("Post check")

    # Clear cache
    print("\n5. Clearing cache:")
    cache.clear()
    report("After clear")

In [None]:
def main():
    env = ...  # Assume env is already created and initialized
    episodes = 1  # Number of episodes to simulate
    n = 50  # Number of steps to simulate


    for ep in range(episodes):
        for step in range(n):
            action = env.action_space.sample()
            res = env.step(action)
            
            obs, reward, done, info = res    

            print(f"Step {step+1}/{n} - action: {action}, reward: {reward}, done: {done}")
            if done:
                break

    print("Episode finished.")

# if __name__ == "__main__":
#     main()