# 0.1  collections.deque()

`deque` (pronounced "deck") stands for "double-ended queue." It's a generalization of a stack and a queue, allowing you to add and remove elements from both ends. In Python, `deque` is part of the `collections` module.

Here's a breakdown of what it is, where it's used, and why you might use it:

### What is `deque`?

Think of a regular list or array. If you want to add or remove an element from the beginning of a large list, all subsequent elements need to be shifted, which can be inefficient (O(n) time complexity).

A `deque`, on the other hand, is implemented using a doubly linked list. This means that adding or removing elements from *either* end (the "left" or "right") is an extremely efficient operation, typically taking constant time (O(1)).

**Key characteristics:**

  * **Appends/Pops from both ends:** You can add elements to the left (`appendleft()`) or right (`append()`), and remove elements from the left (`popleft()`) or right (`pop()`).
  * **Efficient:** O(1) time complexity for appending and popping from either end.
  * **Iterable:** You can iterate over a `deque` just like a list.
  * **Fixed-size (optional):** You can create a `deque` with a `maxlen` argument, which will automatically discard elements from the opposite end when new elements are added, maintaining a fixed size.

### Where is `deque` used?

`deque` is particularly useful in scenarios where you need efficient additions and removals from both ends of a sequence. Common use cases include:

1.  **Implementing Queues and Stacks:**

      * **Queue (FIFO - First-In, First-Out):** You can use `append()` to add to one end and `popleft()` to remove from the other. This is more efficient than using a standard list for a queue, where `pop(0)` is slow.
      * **Stack (LIFO - Last-In, First-Out):** You can use `append()` to push onto the stack and `pop()` to pop from the stack. While a list is also efficient for a stack (`append()` and `pop()` are O(1)), `deque` offers the flexibility of also being a queue.

2.  **Breadth-First Search (BFS) in Graphs and Trees:**

      * BFS algorithms explore a graph level by level. A `deque` is ideal for storing the nodes to visit, as you add new neighbors to one end and process nodes from the other.

3.  **Recent History or Log Files:**

      * If you need to keep track of the last N items (e.g., last 10 commands, last 5 search queries), a `deque` with a `maxlen` is perfect. When a new item is added, the oldest item is automatically discarded.

4.  **Sliding Window Problems:**

      * In algorithms that involve a "sliding window" over a sequence (e.g., finding the maximum in a sliding window), a `deque` can efficiently store and manage elements within that window.

5.  **Undo/Redo Functionality:**

      * You can use two deques (one for undo, one for redo) to manage actions that can be reversed and then reapplied.

6.  **Producer-Consumer Scenarios:**

      * When one part of your program produces data and another consumes it, a `deque` can act as a thread-safe buffer (though in multi-threaded contexts, you'd typically use `queue.Queue` for thread safety, which often uses a `deque` internally).

### Why use `deque`?

You should use `deque` when:

1.  **Performance is critical for appends/pops from both ends:** If your operations primarily involve adding or removing elements from the beginning or end of a sequence, `deque` will significantly outperform a standard Python list.

      * **List `insert(0, item)` and `pop(0)` are O(n).**
      * **`deque` `appendleft()` and `popleft()` are O(1).**
      * **List `append()` and `pop()` are O(1).**
      * **`deque` `append()` and `pop()` are O(1).**

2.  **You need a fixed-size collection that automatically discards old items:** The `maxlen` argument is a very convenient feature for managing limited-size historical data.

3.  **You are implementing algorithms that naturally fit the double-ended queue pattern:** As seen in BFS, sliding windows, and undo/redo systems.

**Example of `deque` usage:**

```python
from collections import deque

# Basic deque
d = deque()
d.append('a')
d.append('b')
d.appendleft('c')
print(d)  # deque(['c', 'a', 'b'])

d.pop()
print(d)  # deque(['c', 'a'])

d.popleft()
print(d)  # deque(['a'])

# Deque with a maximum length
history = deque(maxlen=3)
history.append('search 1')
history.append('search 2')
history.append('search 3')
print(history)  # deque(['search 1', 'search 2', 'search 3'])

history.append('search 4')
print(history)  # deque(['search 2', 'search 3', 'search 4'])
# 'search 1' was automatically discarded

```

In [16]:
from collections import deque

# make a new deque with three items 
d = deque('ghi')

for element in d:
    print(element.upper())

    

G
H
I


In [18]:
# add a new entry to the right side 
d.append('j')

# add new entry to the left side 
d.appendleft('f')


# remvoe the right item 
d.pop()

# remove the lift item 
d.popleft()


# list the content from the deque 
list(d)

print(d)


deque(['g', 'h', 'i'])
deque(['g', 'h', 'i'])


# 0.2 unitest

you can run this function in in the <u>test.py</u> file

In [None]:
import unittest


class TestStringMethods(unittest.TestCase):

    def test_upper(self):
        self.assertEqual('foo'.upper(), 'FOO')

    def test_isupper(self):
        self.assertTrue('FOO'.isupper())
        self.assertFalse('Foo'.isupper())

    def test_split(self):
        s = 'hello world'
        self.assertEqual(s.split(), ['hello', 'world'])
        # check that s.split fails when the seqperator is not string 
        with self.assertRaises(TypeError):
            s.split(2)


            
if __name__ == "__main__":
    unittest.main()

    

# 0.3 torch.distributed

## [A] torch.distributed.is_available()
- This `torch.distributed.is_available` package which is essential for parallel and distributed computing, is available on the current system 

In [1]:
import torch 

if torch.distributed.is_available():
    print("torch.distributed package is available on this system.")

else:
    print("The torch.distributed package Don't available on this system.")

torch.distributed package is available on this system.


## [B] torch.distributed.is_initialization()

- The `torch.distributed.is_initialization()` function is a crucial check in PyTorch for distributed training.
- It returns `True` if the default distributed process group has been initialized and `False` otherwise.

In [None]:
import torch 
import torch.distributed as dist 
import os 


def setup(rank, world_size):
    """ 
    Initializes the distributed environment.
    """

    # set the MASTER_ADDR and MASTER_PORT environment variables 
    # This is a common way to set up the communication
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # Initialize the process group with a backend (e.g. 'gloo' for CPU)
    dist.init_process_group(backend='gloo',
                            rank=rank,
                            world_size=world_size)
    

def cleanup():

    """Destroys the distributed process group."""

    dist.destroy_process_group()


def run_distributed_job(rank,
                        world_size):
    
    print(f"Rank {rank}: Checking if distributed is initialized...")
    # This will be False before the setup function is called 
    print(f"Rank {rank}: Initialized status before setup: {dist.is_initialized()}")

    # Now that the process group is initialized, we can perform distributed operation:
    # for example, a simple All-Reduce to sum tensors across all processes 
    tensor = torch.tensor([float(rank)]) # Each process has a different value 
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM)

    print(f"Rank {rank}: After all_reduce, the tensor value is {tensor.item()}")

    cleanup()
    print(f"Rank {rank}: Initialized status after cleanup: {dist.is_initialized()}")



if __name__ == "__main__":


    world_size = 2 
    print("A proper run would look like: `torchrun --nproc_per_node=2 your_script.py`")

    try:
        dist.init_process_group('gloo', rank=0, world_size=1)
        print("Example with a single process:")
        print(f"Is distributed initialized ? {dist.is_initialized()}")
        dist.destroy_process_group()
        print(f"Is distributed initialized after cleanup ? {dist.is_initialized()}")

    except Exception as e:
        print(e)
        
    





In [11]:
x = torch.randn(2, 32, 16, 512, 512)
temb = torch.randn(2, 512)

x_temb = temb[:, None, None, None, :]
output = x + x_temb
output.shape

torch.Size([2, 32, 16, 512, 512])

In [None]:
x = x = torch.randn(2, 32, 8, 512, 512)
temb = torch.randn(2, 32)
temb = temb[:, :, None, None]

# output = x * (1 + temb) + temb

# [2, 32, 8, 512, 512] * [2, 32, 1, 1]
output1 = x * (1 + temb)
output1.shape



RuntimeError: The size of tensor a (8) must match the size of tensor b (512) at non-singleton dimension 2

## 0.4 dataclass

- for more info: https://docs.python.org/3/library/dataclasses.html

In [1]:
from dataclasses import dataclass

@dataclass
class InventoryItem:
    """Class for keeping track of an item in inventory."""
    name: str
    unit_price: float
    quantity_on_hand: int = 0

    def total_cost(self) -> float:
        return self.unit_price * self.quantity_on_hand

will add, among other things, a __init__() that looks like:

In [2]:
def __init__(self, name: str, unit_price: float, quantity_on_hand: int = 0):
    self.name = name
    self.unit_price = unit_price
    self.quantity_on_hand = quantity_on_hand

## 0.5 dataclass.field

**dataclasses.field(*, default=MISSING, default_factory=MISSING, init=True, repr=True, hash=None, compare=True, metadata=None, kw_only=MISSING)**

- For common and simple use cases, no other functionality is required. There are, however, some dataclass features that require additional per-field information. To satisfy this need for additional information, you can replace the default field value with a call to the provided field() function. For example:

In [3]:
from dataclasses import field

@dataclass
class C:
    mylist: list[int] = field(default_factory=list)

c = C()
c.mylist += [1, 2, 3]

## 0.6 dataclass.is_dataclass

- Return True if its parameter is a dataclass (including subclasses of a dataclass) or an instance of one, otherwise return False.

- If you need to know if a class is an instance of a dataclass (and not a dataclass itself), then add a further check for not isinstance(obj, type):

In [11]:
from dataclasses import is_dataclass

def is_dataclass_instance(obj):
    return is_dataclass(obj) and not isinstance(obj, type)




## 0.7 functools.partial

In [None]:
from functools import partial
basetwo = partial(int, base=2)
basetwo.__doc__ = 'Convert base 2 string to an int.'
basetwo('10010') # this is the binary of 16

18

## 0.8 torch.searchsorted()

In [15]:
import torch 


values = torch.tensor([3, 1, 9])


sorted_sequence_1d = torch.tensor([1, 3, 5, 7, 9])
# sorted_sequence_1d
print(torch.searchsorted(sorted_sequence_1d, values))

tensor([1, 0, 4])


## 0.9 torch.autograd.grad()

`torch.autograd.grad()` is a low-level function in PyTorchâ€™s automatic differentiation system that computes and returns the gradients of given tensors with respect to some other tensors.

In [16]:
import torch 

x = torch.tensor(2.0, requires_grad=True)

# A simple function: y = x^2 + 3x 
y = x **2 + 3 * x 

# compute dy/dx using torch.autograd.grad 
grad_x = torch.autograd.grad(outputs=y,
                             inputs=x)

print(f"y: {y.item()}")
print("dy/dx:", grad_x[0].item())

y: 10.0
dy/dx: 7.0


## 0.10 max()

In [1]:
a = max(4, 3)
print(a)

4


## 0.11 torch.numel()

In [4]:
import torch

a = torch.randn(1, 2, 3, 4, 5)
torch.numel(a)

# a = torch.zeros(4,4)
# torch.numel(a)

120