<a href="https://colab.research.google.com/github/Amanollahi/CI-CD-for-ML/blob/main/data_structure.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import unittest

In [None]:
class Node:
    def __init__(self, key, value):
        self.key = key
        self.value = value
        self.prev = None
        self.next = None

class LRUCache:
    def __init__(self, capacity: int):
        self.capacity = capacity
        self.cache = {}  # map keys to nodes
        # Initialize dummy head and tail nodes
        self.head = Node(0, 0)
        self.tail = Node(0, 0)
        self.head.next = self.tail
        self.tail.prev = self.head

    def _remove(self, node):
        """Removes a node from the linked list."""
        prev = node.prev
        next_node = node.next
        prev.next = next_node
        next_node.prev = prev

    def _add_to_head(self, node):
        """Adds a node right after the head."""
        node.prev = self.head
        node.next = self.head.next
        self.head.next.prev = node
        self.head.next = node

    def get(self, key: int) -> int:
        if key in self.cache:
            node = self.cache[key]
            # Move the accessed node to the head
            self._remove(node)
            self._add_to_head(node)
            return node.value
        return -1

    def put(self, key: int, value: int) -> None:
        if key in self.cache:
            # Update the value and move to head
            node = self.cache[key]
            node.value = value
            self._remove(node)
            self._add_to_head(node)
        else:
            if len(self.cache) >= self.capacity:
                # Remove the LRU item
                lru = self.tail.prev
                self._remove(lru)
                del self.cache[lru.key]
            # Insert the new node at the head
            new_node = Node(key, value)
            self.cache[key] = new_node
            self._add_to_head(new_node)


In [None]:
import unittest

class TestLRUCache(unittest.TestCase):

    def setUp(self):
        # Initialize a new cache for each test with capacity 2
        self.cache = LRUCache(2)

    def test_basic_operations(self):
        self.cache.put(1, 1)
        self.cache.put(2, 2)
        self.assertEqual(self.cache.get(1), 1)  # returns 1
        self.assertEqual(self.cache.get(2), 2)  # returns 2

    def test_eviction_policy(self):
        self.cache.put(1, 1)
        self.cache.put(2, 2)
        self.cache.put(3, 3)  # evicts key 1
        self.assertEqual(self.cache.get(1), -1)  # returns -1 (not found)
        self.assertEqual(self.cache.get(2), 2)   # returns 2
        self.assertEqual(self.cache.get(3), 3)   # returns 3

    def test_update_existing_key(self):
        self.cache.put(1, 1)
        self.cache.put(2, 2)
        self.cache.put(1, 10)  # updates key 1 with new value 10
        self.assertEqual(self.cache.get(1), 10)  # returns 10
        self.cache.put(3, 3)  # evicts key 2 as it is the least recently used
        self.assertEqual(self.cache.get(2), -1)  # returns -1 (not found)

    def test_multiple_evictions(self):
        self.cache.put(1, 1)
        self.cache.put(2, 2)
        self.cache.put(3, 3)  # evicts key 1
        self.assertEqual(self.cache.get(1), -1)  # returns -1 (not found)
        self.cache.put(4, 4)  # evicts key 2
        self.assertEqual(self.cache.get(2), -1)  # returns -1 (not found)
        self.assertEqual(self.cache.get(3), 3)   # returns 3
        self.assertEqual(self.cache.get(4), 4)   # returns 4

    def test_get_on_empty_cache(self):
        self.assertEqual(self.cache.get(1), -1)  # returns -1 (cache is empty)

if __name__ == '__main__':
    unittest.main(argv=[''], verbosity=2, exit=False)


test_basic_operations (__main__.TestLRUCache) ... ok
test_eviction_policy (__main__.TestLRUCache) ... ok
test_get_on_empty_cache (__main__.TestLRUCache) ... ok
test_multiple_evictions (__main__.TestLRUCache) ... ok
test_update_existing_key (__main__.TestLRUCache) ... ok

----------------------------------------------------------------------
Ran 5 tests in 0.018s

OK


In [None]:
def longest_consecutive(nums):
    num_set = set(nums)
    longest_streak = 0

    for num in num_set:
        # Only start counting if num is the start of a sequence
        if num - 1 not in num_set:
            current_num = num
            current_streak = 1

            # Count the streak
            while current_num + 1 in num_set:
                current_num += 1
                current_streak += 1

            longest_streak = max(longest_streak, current_streak)

    return longest_streak


In [None]:
import unittest

class TestLongestConsecutive(unittest.TestCase):

    def test_basic_sequence(self):
        self.assertEqual(longest_consecutive([100, 4, 200, 1, 3, 2]), 4)  # [1, 2, 3, 4]

    def test_empty_list(self):
        self.assertEqual(longest_consecutive([]), 0)  # No elements, so no sequence

    def test_single_element(self):
        self.assertEqual(longest_consecutive([1]), 1)  # Only one element, so max sequence is 1

    def test_all_consecutive(self):
        self.assertEqual(longest_consecutive([1, 2, 3, 4, 5]), 5)  # Full sequence [1, 2, 3, 4, 5]

    def test_no_consecutive_elements(self):
        self.assertEqual(longest_consecutive([10, 5, 100]), 1)  # No consecutive numbers, so max sequence is 1

    def test_mixed_consecutive_and_non_consecutive(self):
        self.assertEqual(longest_consecutive([1, 9, 3, 10, 2, 20]), 3)  # [1, 2, 3] is the longest sequence

    def test_duplicates(self):
        self.assertEqual(longest_consecutive([1, 2, 2, 3, 4]), 4)  # [1, 2, 3, 4], duplicates don't affect the sequence

if __name__ == '__main__':
    unittest.main(argv=[''], verbosity=2, exit=False)


test_basic_operations (__main__.TestLRUCache) ... ok
test_eviction_policy (__main__.TestLRUCache) ... ok
test_get_on_empty_cache (__main__.TestLRUCache) ... ok
test_multiple_evictions (__main__.TestLRUCache) ... ok
test_update_existing_key (__main__.TestLRUCache) ... ok
test_all_consecutive (__main__.TestLongestConsecutive) ... ok
test_basic_sequence (__main__.TestLongestConsecutive) ... ok
test_duplicates (__main__.TestLongestConsecutive) ... ok
test_empty_list (__main__.TestLongestConsecutive) ... ok
test_mixed_consecutive_and_non_consecutive (__main__.TestLongestConsecutive) ... ok
test_no_consecutive_elements (__main__.TestLongestConsecutive) ... ok
test_single_element (__main__.TestLongestConsecutive) ... ok

----------------------------------------------------------------------
Ran 12 tests in 0.038s

OK


In [None]:
import heapq

class KthLargest:
    def __init__(self, k: int, nums: list):
        self.k = k
        self.min_heap = nums
        heapq.heapify(self.min_heap)

        # Ensure heap size is at most k
        while len(self.min_heap) > k:
            heapq.heappop(self.min_heap)

    def add(self, val: int) -> int:
        # Add the new element to the heap
        heapq.heappush(self.min_heap, val)
        # If heap exceeds k, pop the smallest element
        if len(self.min_heap) > self.k:
            heapq.heappop(self.min_heap)

        # The root of the heap is the k-th largest element
        return self.min_heap[0]


In [None]:
import unittest

class TestKthLargest(unittest.TestCase):

    def test_basic_functionality(self):
        # Initialize with k=3 and a list [4, 5, 8, 2]
        kthLargest = KthLargest(3, [4, 5, 8, 2])
        self.assertEqual(kthLargest.add(3), 4)   # [4, 5, 8] -> 3 added, k-th largest is 4
        self.assertEqual(kthLargest.add(5), 5)   # [5, 5, 8] -> 5 added, k-th largest is 5
        self.assertEqual(kthLargest.add(10), 5)  # [5, 8, 10] -> 10 added, k-th largest is 5
        self.assertEqual(kthLargest.add(9), 8)   # [8, 9, 10] -> 9 added, k-th largest is 8
        self.assertEqual(kthLargest.add(4), 8)   # [8, 9, 10] -> 4 added, k-th largest is 8

    def test_initial_size_smaller_than_k(self):
        # Initialize with k=3 and a list with fewer than k elements
        kthLargest = KthLargest(3, [5, 6])
        self.assertEqual(kthLargest.add(3), 3)   # Now [3, 5, 6], k-th largest is 3
        self.assertEqual(kthLargest.add(10), 5)  # [5, 6, 10], k-th largest is 5
        self.assertEqual(kthLargest.add(9), 6)   # [6, 9, 10], k-th largest is 6
        self.assertEqual(kthLargest.add(4), 6)   # [6, 9, 10], k-th largest remains 6

    def test_adding_larger_elements(self):
        # Initialize with k=2 and a list [1]
        kthLargest = KthLargest(2, [1])
        self.assertEqual(kthLargest.add(2), 1)   # [1, 2], k-th largest is 1
        self.assertEqual(kthLargest.add(3), 2)   # [2, 3], k-th largest is 2
        self.assertEqual(kthLargest.add(4), 3)   # [3, 4], k-th largest is 3

    def test_adding_smaller_elements(self):
        # Initialize with k=2 and a list [10, 20]
        kthLargest = KthLargest(2, [10, 20])
        self.assertEqual(kthLargest.add(5), 10)   # [10, 20], k-th largest remains 10
        self.assertEqual(kthLargest.add(15), 15)  # [15, 20], k-th largest is now 15

if __name__ == '__main__':
    unittest.main(argv=[''], verbosity=2, exit=False)


test_adding_larger_elements (__main__.TestKthLargest) ... ok
test_adding_smaller_elements (__main__.TestKthLargest) ... ok
test_basic_functionality (__main__.TestKthLargest) ... ok
test_initial_size_smaller_than_k (__main__.TestKthLargest) ... ok
test_basic_operations (__main__.TestLRUCache) ... ok
test_eviction_policy (__main__.TestLRUCache) ... ok
test_get_on_empty_cache (__main__.TestLRUCache) ... ok
test_multiple_evictions (__main__.TestLRUCache) ... ok
test_update_existing_key (__main__.TestLRUCache) ... ok
test_all_consecutive (__main__.TestLongestConsecutive) ... ok
test_basic_sequence (__main__.TestLongestConsecutive) ... ok
test_duplicates (__main__.TestLongestConsecutive) ... ok
test_empty_list (__main__.TestLongestConsecutive) ... ok
test_mixed_consecutive_and_non_consecutive (__main__.TestLongestConsecutive) ... ok
test_no_consecutive_elements (__main__.TestLongestConsecutive) ... ok
test_single_element (__main__.TestLongestConsecutive) ... ok

-----------------------------

In [None]:
import time
from collections import defaultdict

class RateLimiter:
    def __init__(self, max_requests: int, time_window: int):
        """
        :param max_requests: Maximum allowed requests within the time window
        :param time_window: Time window in seconds for rate limiting
        """
        self.max_requests = max_requests
        self.time_window = time_window
        self.user_data = defaultdict(lambda: {"tokens": max_requests, "last_request_time": time.time()})

    def is_allowed(self, user_id: str) -> bool:
        """
        Check if a request from the given user_id is allowed based on the rate limit.

        :param user_id: The identifier for the user making the request
        :return: True if the request is allowed, False if rate limit is exceeded
        """
        current_time = time.time()
        user_info = self.user_data[user_id]

        # Calculate elapsed time since last request and replenish tokens
        time_passed = current_time - user_info["last_request_time"]
        tokens_to_add = int(time_passed / self.time_window * self.max_requests)
        user_info["tokens"] = min(self.max_requests, user_info["tokens"] + tokens_to_add)
        user_info["last_request_time"] = current_time

        # Check if there are enough tokens to allow the request
        if user_info["tokens"] > 0:
            user_info["tokens"] -= 1
            return True
        else:
            return False


In [None]:
import unittest
import time

class TestRateLimiter(unittest.TestCase):

    def setUp(self):
        # Initialize the RateLimiter with 3 requests per 10 seconds
        self.rate_limiter = RateLimiter(max_requests=3, time_window=10)
        self.user_id = "user_123"

    def test_allow_requests_within_limit(self):
        # Allow up to max_requests within the time window
        self.assertTrue(self.rate_limiter.is_allowed(self.user_id))  # 1st request
        self.assertTrue(self.rate_limiter.is_allowed(self.user_id))  # 2nd request
        self.assertTrue(self.rate_limiter.is_allowed(self.user_id))  # 3rd request

    def test_deny_requests_beyond_limit(self):
        # Exceed the max_requests limit within the time window
        self.rate_limiter.is_allowed(self.user_id)  # 1st request
        self.rate_limiter.is_allowed(self.user_id)  # 2nd request
        self.rate_limiter.is_allowed(self.user_id)  # 3rd request
        self.assertFalse(self.rate_limiter.is_allowed(self.user_id))  # 4th request should be denied

    def test_replenish_tokens_after_time_window(self):
        # Exceed the limit first
        self.rate_limiter.is_allowed(self.user_id)
        self.rate_limiter.is_allowed(self.user_id)
        self.rate_limiter.is_allowed(self.user_id)

        # Verify the 4th request is denied
        self.assertFalse(self.rate_limiter.is_allowed(self.user_id))

        # Wait for the time window to replenish tokens
        time.sleep(10)

        # Tokens should now be replenished, allowing 3 more requests
        self.assertTrue(self.rate_limiter.is_allowed(self.user_id))  # 1st new request
        self.assertTrue(self.rate_limiter.is_allowed(self.user_id))  # 2nd new request
        self.assertTrue(self.rate_limiter.is_allowed(self.user_id))  # 3rd new request

    def test_multiple_users_independent_limits(self):
        user_id_1 = "user_123"
        user_id_2 = "user_456"

        # Each user can independently make up to 3 requests
        for _ in range(3):
            self.assertTrue(self.rate_limiter.is_allowed(user_id_1))
            self.assertTrue(self.rate_limiter.is_allowed(user_id_2))

        # Additional requests should be denied for both users
        self.assertFalse(self.rate_limiter.is_allowed(user_id_1))
        self.assertFalse(self.rate_limiter.is_allowed(user_id_2))

if __name__ == '__main__':
    unittest.main(argv=[''], verbosity=2, exit=False)


test_adding_larger_elements (__main__.TestKthLargest) ... ok
test_adding_smaller_elements (__main__.TestKthLargest) ... ok
test_basic_functionality (__main__.TestKthLargest) ... ok
test_initial_size_smaller_than_k (__main__.TestKthLargest) ... ok
test_basic_operations (__main__.TestLRUCache) ... ok
test_eviction_policy (__main__.TestLRUCache) ... ok
test_get_on_empty_cache (__main__.TestLRUCache) ... ok
test_multiple_evictions (__main__.TestLRUCache) ... ok
test_update_existing_key (__main__.TestLRUCache) ... ok
test_all_consecutive (__main__.TestLongestConsecutive) ... ok
test_basic_sequence (__main__.TestLongestConsecutive) ... ok
test_duplicates (__main__.TestLongestConsecutive) ... ok
test_empty_list (__main__.TestLongestConsecutive) ... ok
test_mixed_consecutive_and_non_consecutive (__main__.TestLongestConsecutive) ... ok
test_no_consecutive_elements (__main__.TestLongestConsecutive) ... ok
test_single_element (__main__.TestLongestConsecutive) ... ok
test_allow_requests_within_lim

In [None]:
import os
from collections import defaultdict

def categorize_files_by_size(directory):
    size_collection = defaultdict(list)

    # مرور فایل‌های داخل دایرکتوری
    for filename in os.listdir(directory):
        file_path = os.path.join(directory, filename)

        # بررسی اگر فایل است
        if os.path.isfile(file_path):
            # محاسبه سایز فایل
            file_size = os.path.getsize(file_path)
            # اضافه کردن فایل به دسته‌بندی مربوط به سایز آن
            size_collection[file_size].append(filename)

    return size_collection
