# Collections: Counter, namedtuple, OrderDict, defaultdict, deque

In [13]:
from collections import Counter, namedtuple, defaultdict, OrderedDict, deque

## 1. Counter

In [4]:
a = "aaaaaabbbbdddfd"
count = Counter(a)
count

Counter({'a': 6, 'b': 4, 'd': 4, 'f': 1})

In [5]:
count.keys()

dict_keys(['a', 'b', 'd', 'f'])

In [6]:
count.values()

dict_values([6, 4, 4, 1])

In [7]:
count.most_common(1)

[('a', 6)]

In [9]:
count.most_common(1)[0][0]

'a'

In [11]:
list(count.elements())

['a', 'a', 'a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'd', 'd', 'd', 'd', 'f']

## 2. namedtuple

In [14]:
point = namedtuple('point', 'x, y')
pt = point(1, -1)
pt

point(x=1, y=-1)

## 3. OrderedDict

In [None]:
ordered = OrderedDict()
ordered = {} # Remembers the order of insertion
ordered['a'] = 1
ordered['b'] = 2
ordered['c'] = 3
ordered['d'] = 4
ordered

{'a': 1, 'b': 2, 'c': 3, 'd': 1}

## 4. defaultdict

In [19]:
d = defaultdict(int)
d['a'] = 1
d['b'] = 2
d['c'] = 3
d['d'] = 4
print(d['b'])

2


In [20]:
print(d['e']) # Returns 0, not an error

0


## 4. deque

In [21]:
d = deque()
d.append(1)
d.append(2)
d.append(3)
d

deque([1, 2, 3])

In [22]:
d.pop()
d

deque([1, 2])

In [23]:
d.popleft()
d

deque([2])

In [25]:
d.clear()
d

deque([])

In [26]:
d.extend([1, 2, 3])
d

deque([1, 2, 3])

In [27]:
d.extendleft([4, 5, 6])
d

deque([6, 5, 4, 1, 2, 3])

In [28]:
d.rotate(1)
d

deque([3, 6, 5, 4, 1, 2])

In [29]:
d.rotate(-1)
d

deque([6, 5, 4, 1, 2, 3])

# Itertools

In [54]:
from itertools import product, permutations, combinations,combinations_with_replacement, accumulate, groupby, count, cycle, repeat
import operator

## 1. product

In [None]:
a = [1, 2, 3]
b = [4, 5, 6]
prod = product(a, b) # Cartesian Product
list(prod)

[(1, 4), (1, 5), (1, 6), (2, 4), (2, 5), (2, 6), (3, 4), (3, 5), (3, 6)]

In [39]:
prod = product([1, 2], [8, 9], repeat = 1)
list(prod)

[(1, 8), (1, 9), (2, 8), (2, 9)]

## 2. permutations

In [40]:
perm = permutations(a)
list(perm)

[(1, 2, 3), (1, 3, 2), (2, 1, 3), (2, 3, 1), (3, 1, 2), (3, 2, 1)]

In [41]:
perm = permutations([1, 2, 3], 2)
list(perm)

[(1, 2), (1, 3), (2, 1), (2, 3), (3, 1), (3, 2)]

## 3. combinations

In [42]:
a = [1, 2, 3, 4]
comb = combinations(a, 2)
list(comb)

[(1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (3, 4)]

### combination_with_replacement

In [46]:
comb = combinations_with_replacement(a, 2)
list(comb)

[(1, 1),
 (1, 2),
 (1, 3),
 (1, 4),
 (2, 2),
 (2, 3),
 (2, 4),
 (3, 3),
 (3, 4),
 (4, 4)]

## 4. accumulate

In [49]:
acc = accumulate(a) # Works kinda like a prefix sum
print(a)
list(acc)

[1, 2, 3, 4]


[1, 3, 6, 10]

In [51]:
acc = accumulate(a, func=operator.mul) # Works kinda like a prefix product
print(a)
list(acc)

[1, 2, 3, 4]


[1, 2, 6, 24]

## 5. groupby

In [53]:
grp = groupby(a, key= lambda x: x > 2)
for k, v in grp:
    print(k, list(v))


False [1, 2]
True [3, 4]


## 6. count, cycle, repeat

In [58]:
for i in count(10):
    print(i)
    if i == 100: break

10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100


In [59]:
for i in cycle('abc'):
    print(i)
    if i == 'c': break

a
b
c


In [62]:
for i in repeat(10):
    print(i)
    if i == 10: break

10


# LOGGING

In [63]:
import logging

In [73]:
# force=True is used to override the default logging configuration
# and set the logging level to DEBUG
# This wil run the .debug and .info methods
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', force=True)
logging.debug("This is a debug message")
logging.info("This is an info message")
logging.warning("This is a warning message")
logging.error("This is an error message")
logging.critical("This is a critical message")

2025-05-13 11:59:23 - DEBUG - This is a debug message
2025-05-13 11:59:23 - INFO - This is an info message
2025-05-13 11:59:23 - ERROR - This is an error message
2025-05-13 11:59:23 - CRITICAL - This is a critical message


In [74]:
logger = logging.getLogger(__name__)
logger.info("Hello from the helper")

2025-05-13 12:09:15 - INFO - Hello from the helper


# JSON

In [13]:
import json
from json import JSONEncoder

In [3]:
person = {'name': 'John', 'age': 30, 'city': 'New York'}
personJSON = json.dumps(person, indent = 4, sort_keys = True)

In [4]:
personJSON

'{\n    "age": 30,\n    "city": "New York",\n    "name": "John"\n}'

In [82]:
with open('person.json', 'w') as f:
    json.dump(person, f, indent = 4)

In [9]:
with open('person.json', 'r') as file:
    person = json.load(file)

person

{'name': 'John', 'age': 30, 'city': 'New York'}

In [5]:
person = json.loads(personJSON)
person

{'age': 30, 'city': 'New York', 'name': 'John'}

In [12]:
class User:
    def __init__(self, name, age):
        self.name = name
        self.age = age

user = User('Ishu', 27)

def encode(o):
    if isinstance(o, User):
        return {'name': o.name, 'age': o.age, o.__class__.__name__: True}
    else:
        raise TypeError("Object of type user is not JSON serializable")
    

userJSON = json.dumps(user, default= encode)
userJSON

'{"name": "Ishu", "age": 27, "User": true}'

In [14]:
class UserEncoder(JSONEncoder):
    def default(self, o):
        if isinstance(o, User):
            return {'name': o.name, 'age': o.age, o.__class__.__name__: True}
        return JSONEncoder.default(self, o)
    
userJSON = json.dumps(user, cls = UserEncoder)
userJSON

'{"name": "Ishu", "age": 27, "User": true}'

In [17]:
def decode(dct):
    if User.__name__ in dct:
        return User(name = dct['name'], age = dct['age'])
    return dct

user = json.loads(userJSON, object_hook= decode)
print(type(user))
user.name

<class '__main__.User'>


'Ishu'

# RANDOM

In [1]:
import random

In [2]:
a = random.random()
a

0.6727372891425132

In [3]:
a = random.uniform(0, 3)
a

2.253546976298391

In [None]:
a = random.randint(0, 10) # Includes Upper Bound
print(a)

1


In [10]:
a = random.normalvariate(0.2, 5) # Random value from normal distribution with mean = 0 & S.D = 1
a

-2.820246212571695

In [12]:
l = list("ABCDEFGHIJKLMNOP")
a = random.choice(l)
a

'B'

In [13]:
l = list("ABCDEFGHIJKLMNOP")
a = random.sample(l, 3)
a

['H', 'I', 'K']

In [14]:
l = list("ABCDEFGHIJKLMNOP")
a = random.choices(l, k = 3)
a

['D', 'D', 'M']

In [17]:
l = list("ABCDEFGHIJKLMNOP")
random.shuffle(l)
l

['G',
 'D',
 'P',
 'F',
 'J',
 'I',
 'E',
 'K',
 'C',
 'O',
 'N',
 'H',
 'A',
 'L',
 'M',
 'B']

In [18]:
random.seed(1)
print(random.random())
print(random.randint(1, 10))

random.seed(3)
print(random.random())
print(random.randint(1, 10))

0.13436424411240122
2
0.23796462709189137
9


In [19]:
import secrets

In [20]:
a = secrets.randbelow(10)
a

2

In [23]:
a = secrets.randbits(4)
a

7

In [24]:
l = list("ABCSFDN")
a = secrets.choice(l)
a

'N'

# DECORATORS

## Function Decorator

It is a function that takes another function as argument and extends the behaviour of this function without explicitly modifying it.

In [2]:
# @myDecorator
# def dosomething():
#     pass

def start_end_decorator(func):
    def wrapper():
        print("Start")
        func()
        print("End")
    return wrapper

@start_end_decorator
def print_name():
    print("Ishu")

print_name()

Start
Ishu
End


In [3]:
print_name = start_end_decorator(print_name) # The decorator will replace this line
print_name()

Start
Start
Ishu
End
End


In [8]:
import functools
# Function with arguments

def start_end_decorator(func):
    # args and kwargs helps so that we can use as many arguments and keyword arguments as we want.
    @functools.wraps(func)
    def wrapper(*args, **kwargs): 
        print("Start")
        res = func(*args, **kwargs)
        print("End")
        return res
    return wrapper

@start_end_decorator
def add(x):
    return x + 5

res = add(10)
print(res)

print(add.__name__)

Start
End
15
add


In [11]:
# Repeat decorator
def repeat(num_times):
    def decorator_rep(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for _ in range(num_times):
                res = func(*args, **kwargs)
            return res
        return wrapper
    return decorator_rep

@repeat(3)
def greet(name):
    print(f"Hello, {name}")

greet("Ishu")

Hello, Ishu
Hello, Ishu
Hello, Ishu


## Class Decorator

It works similar work to function decorator but it's typically used if we want to maintain and update a state.

In [14]:
class CountCalls:
    def __init__(self, func):
        self.func = func
        self.num_calls = 0
    
    def __call__(self, *args, **kwargs):
        self.num_calls += 1
        print(f"This is executed {self.num_calls} times")
        return self.func(*args, **kwargs)


@CountCalls
def hello():
    print("Hello")

hello()
hello()

This is executed 1 times
Hello
This is executed 2 times
Hello


In [15]:
hello()

This is executed 3 times
Hello


# GENERATORS

These are very memory efficient and can be used for large data.

In [None]:
def mygenerator():
    yield 1
    yield 2
    yield 3

g = mygenerator()
val = next(g) # Repeat it for the other numbers.
print(val)

1


In [18]:
sorted(g)

[2, 3]

In [19]:
def countdown(s):
    print("Starting")
    while s > 0:
        yield s
        s -= 1

cd = countdown(4)

In [20]:
val = next(cd)
val

Starting


4

In [21]:
print(next(cd))

3


In [22]:
print(next(cd))

2


In [23]:
print(next(cd))

1


In [24]:
def firstn(n):
    nums = []
    s = 0
    while s < n:
        nums.append(s)
        s  += 1
    return nums

print(firstn(10))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


In [26]:
import sys

In [28]:
# Using generator instead
def firstN(n):
    s = 0
    while s < n:
        yield s
        s += 1

print(sys.getsizeof(firstn(10)))
print(sys.getsizeof(firstN(10)))

184
104


In [29]:
def fibo(limit):
    a, b = 0, 1
    while a < limit:
        yield a
        a, b = b, a + b

fib = fibo(30)
for i in fib:
    print(i)

0
1
1
2
3
5
8
13
21


In [None]:
mygen = (i for i in range(10) if i % 2 == 0) # () for generator and [] for list
for i in mygen:
    print(i)

0
2
4
6
8


# MULTIPROCESSING

In [79]:
from multiprocessing import Process, Value, Array, Lock, Queue, Pool
import os
import time

In [35]:
processes = []
num_processes = os.cpu_count()

def square_nums():
    for i in range(100):
        i * i
        time.sleep(0.1)

# Create processes
for i in range(num_processes):
    p = Process(target = square_nums)
    processes.append(p)

# start
for p in processes:
    p.start()

# Join
for p in processes:
    p.join()

print("End main")

End main


In [71]:
def add100(num, lock):
    for i in range(100):
        time.sleep(0.01)
        # lock.acquire()
        with lock:
            num.value += 1
        # lock.release()

if __name__ == "__main__":
    lock = Lock()
    shared = Value('i', 0)
    print("Number at beginning is: ", shared.value)
    p1 = Process(target = add100, args = (shared, lock))
    p2 = Process(target = add100, args = (shared, lock))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("Number at ending is: ", shared.value)

Number at beginning is:  0
Number at ending is:  200


In [75]:
# For arrays

def add100(num, lock):
    for i in range(100):
        time.sleep(0.01)
        for i in range(len(num)):
            with lock:
                num[i] += 1

if __name__ == "__main__":
    lock = Lock()
    shared = Array('d', [0.0, 100.0, 200.0])
    print("Array at beginning is: ", shared[:])
    p1 = Process(target = add100, args = (shared, lock))
    p2 = Process(target = add100, args = (shared, lock))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("Number at ending is: ", shared[:])

Array at beginning is:  [0.0, 100.0, 200.0]
Number at ending is:  [200.0, 300.0, 400.0]


In [78]:
def square(num, queue):
    for i in num:
        queue.put(i * i)

def negative(num, queue):
    for i in num:
        queue.put(-1 * i)

if __name__ == "__main__":
    q = Queue()
    num = range(1, 6)
    p1 = Process(target=square, args = (num, q))
    p2 = Process(target = negative, args = (num, q))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    while not q.empty():
        print(q.get())


1
4
9
16
25
-1
-2
-3
-4
-5


In [85]:
# Process Pool

def cube(num):
    return num * num * num

if __name__ == "_main__":
    pool = Pool()
    numbers = range(10)

    # map, apply, join, close
    res = pool.map(cube, numbers)
    print(res)
    pool.apply(cube, numbers[0])
    pool.close()
    pool.join()

# MULTITHREADING

In [40]:
def square_nums():
    for i in range(100):
        i * i
        time.sleep(0.1)

In [53]:
from threading import Thread, Lock, current_thread
import os
import time

In [None]:
threads = []
num_threads = os.cpu_count()

# Create threads
for i in range(num_threads):
    p = Thread(target = square_nums)
    threads.append(p)

# start threads
for t in threads:
    t.start()

# Join threads: wait for them to complete
for t in threads:
    t.join()

print("End main")

End main


In [88]:
from queue import Queue

In [46]:
database_value = 0

def increase(lock):
    global database_value

    # lock.acquire()
    with lock:
        local_copy = database_value
        
        # Processing
        local_copy += 1
        time.sleep(0.1) # Switches to thread2 from thread1 and vice versa
        database_value = local_copy
    # lock.release()

if __name__ == "__main__":
    lock = Lock()
    print("Start value:", database_value)
    thread1 = Thread(target = increase, args = (lock,))
    thread2 = Thread(target = increase, args = (lock,))
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

    print("End value", database_value)
    print("End main")

Start value: 0
End value 2
End main


In [52]:
if __name__ == "__main__":
    q = Queue()

    q.put(1)
    q.put(2)
    q.put(3)

    # 3 2 1 -->
    print(q.get()) # 1

    q.task_done()
    # q.join()

    print("End Main")


1
End Main


In [89]:
def worker(q, lock):
    while True:
        val = q.get()
        with lock:
            # Processing
            print(f"in {current_thread().name} got {val}")
        q.task_done()

if __name__ == "__main__":
    q = Queue()
    numT = 10
    for i in range(numT):
        thread = Thread(target = worker, args = (q, lock))
        thread.daemon = True # Background thread which dies when main thread
        thread.start()
    
    for i in range(1, 21):
        q.put(i)
    
    q.join()
    print("End main")

in Thread-1280 (worker) got 1
in Thread-1280 (worker) got 2
in Thread-1280 (worker) got 3
in Thread-1280 (worker) got 4
in Thread-1280 (worker) got 5
in Thread-1280 (worker) got 6
in Thread-1280 (worker) got 7
in Thread-1280 (worker) got 8
in Thread-1280 (worker) got 9
in Thread-1280 (worker) got 10
in Thread-1280 (worker) got 11
in Thread-1280 (worker) got 12
in Thread-1280 (worker) got 13
in Thread-1280 (worker) got 14
in Thread-1280 (worker) got 15
in Thread-1280 (worker) got 16
in Thread-1280 (worker) got 17
in Thread-1280 (worker) got 18
in Thread-1280 (worker) got 19
in Thread-1280 (worker) got 20
End main


# CONTEXT MANAGER

In [94]:
class ManagedFile:
    def __init__(self, filename):
        print("init")
        self.filename = filename
    
    def __enter__(self):
        print("Enter")
        self.file = open(self.filename, 'w')
        return self.file

    def __exit__(self, exc_type, exc_value, exc_traceback):
        if self.file:
            self.file.close()
        if exc_type is not None: # Checks the exception
            print("Exception has been handled.")
        print("exit")
        return True

with ManagedFile('notes.txt') as file:
    print("Do Something")
    file.write("Something to do")
    file.somemethod() # For checking an exception

print("Continuing")

init
Enter
Do Something
Exception has been handled.
exit
Continuing


In [95]:
from contextlib import contextmanager

In [96]:
@contextmanager
def open_managed_file(filename):
    f = open(filename, 'w')
    try:
        yield f
    finally:
        f.close()

with open_managed_file('notes.txt') as f:
    f.write("Something to do...")