In [None]:
metro_areas = [
    ('Tokyo', 'Japan', 36.933, (35.66667722, 139.6505789)),
    ('Delhi NCR', 'India', 26.454, (28.6139391, 77.2090212)),
    ('Mexico City', 'Mexico', 21.782, (19.4326077, -99.1332087)),
    ('New York-Newark', 'United States', 21.186, (40.7127281, -74.0060152)),
    ('Sao Paulo', 'Brazil', 20.186, (-23.5505199, -46.6333094)),
    ('Shanghai', 'China', 19.200, (31.230416, 121.473701)),
    ('Cairo', 'Egypt', 18.772, (30.0444196, 31.2357116)),
    ('Dhaka', 'Bangladesh', 18.237, (23.8103, 90.4125)),
    ('Mumbai', 'India', 18.042, (19.0760, 72.8777)),
    ('Beijing', 'China', 17.180, (39.9042, 116.4074)),
]

print(f'{"":15} | {"lattitube": >9} | {"longitude": >9}')
for record in metro_areas:
    match record:
        case [name, *_, (lat, lon)] if lon <= 0:
            print(f'{name:15} | {lat:9.4f} | {lon:9.4f}')
        case [name, _, _, (lat, lon)] if lon > 0:
            print(f'{name:15} | {lat:9.4f} | {lon:9.4f} *')

In [None]:
import sys
import numpy as np

for i in range(50):
    print(sys.getsizeof(tuple(x for x in range(i))), end=', ') # Immutable
print('\n')
for i in range(50):
    print(sys.getsizeof(list(x for x in range(i))), end=', ') 
print('\n')
for i in range(50):
    print(sys.getsizeof(set(x for x in range(i))), end=', ')
print('\n')
for i in range(50):
    print(sys.getsizeof(np.array(x for x in range(i))), end=', ')

In [None]:
t = (1, 2, [34, 34])
try:
    t[2] += [1, 2]
except TypeError as e:
    print(e, end=' But still updates the value of t ')
    print(t)

In [None]:
import array
a = array.array('l', [-11111111, 22222222, -33333333, 44444444])
m = memoryview(a)

In [None]:
from datetime import datetime, timedelta
from dateutil import rrule
from collections.abc import Iterable

class ISIN:
    def __init__(self, code: str):
        if not self._validate_isin(code):
            raise ValueError(f"Invalid ISIN: {code}")
        self.code = code

    @staticmethod
    def _validate_isin(code: str) -> bool:
        """Validate the ISIN format."""
        if len(code) != 12:
            return False
        if not code[:2].isalpha() or not code[2:11].isalnum() or not code[-1].isdigit():
            return False
        return True

    def __repr__(self):
        return f"ISIN('{self.code}')"

    def __str__(self):
        return self.code


class BondPrice:
    __slots__ = ('__bond_yield', '__price', '__coupon_rate', '__expiry_date', '__face_value', '__coupon_dates')

    def __init__(self, bond_yield: float, coupon_rate: float, expiry_date: datetime, face_value: float, semi_annual: bool = True):
        self.__expiry_date = expiry_date  # Store expiry_date as a datetime object
        self.__bond_yield = bond_yield / 2 if semi_annual else bond_yield
        self.__coupon_rate = coupon_rate / 2 if semi_annual else coupon_rate
        self.__face_value = face_value
        self.__coupon_dates = []  # Initialize coupon dates
        self.__set_coupon_dates(semi_annual=semi_annual)
        self.calculate_price()

    def calculate_price(self, dirty=False):
        """Calculate the price of the bond based on yield, coupon rate, and expiry date."""
        time_to_maturity = (self.__expiry_date - datetime.now()).days / 365.0
        self.__price = self.__face_value / (1 + self.__bond_yield) ** time_to_maturity + self.__calculate_coupon_price()
        if dirty:
            self.__price += self.__calculate_accrued_coupon()

    def get_price(self):
        """Get the calculated price of the bond."""
        return self.__price

    def get_coupon_dates(self):
        """Get the coupon dates for the bond."""
        return self.__coupon_dates

    def __set_coupon_dates(self, semi_annual=True):
        """Set the coupon dates for the bond."""
        payment_months = (1, 7) if semi_annual else (1,)
        self.__coupon_dates = [
            datetime(year=date.year, month=date.month, day=min(self.__expiry_date.day, 28))
            for date in rrule.rrule(rrule.MONTHLY, dtstart=datetime.now(), until=self.__expiry_date)
            if date.month in payment_months
        ]

    @staticmethod
    def __get_last_coupon_date(semi_annual=True):
        """Get the last coupon date based on the current date."""
        dates = list(rrule.rrule(rrule.MONTHLY, dtstart=datetime.now() - timedelta(days=365), until=datetime.now()))
        for i in reversed(dates):
            if semi_annual:
                if i.month in (1, 7):  # Check for January or July
                    return i
            else:
                if i.month == 1:  # Check for January
                    return i
        return None  # Return None if no matching date is found

    def __calculate_accrued_coupon(self, semi_annual=True):
        """Calculate the accrued coupon based on the coupon rate and time to maturity."""
        last_coupon_date = self.__get_last_coupon_date(semi_annual)
        if not last_coupon_date:
            return 0  # No accrued coupon if no last coupon date is found
        days_since_last_coupon = (datetime.now() - last_coupon_date).days
        return (self.__coupon_rate * self.__face_value) * (days_since_last_coupon / 365)

    def __calculate_coupon_price(self):
        """Calculate the coupon price based on the coupon rate and time to maturity."""
        coupon_price = 0
        for coupon_date in self.__coupon_dates:
            time_to_coupon = (coupon_date - datetime.now()).days / 365.0
            coupon_price += self.__coupon_rate * self.__face_value / (1 + self.__bond_yield) ** time_to_coupon
        return coupon_price

    def __repr__(self):
        return f'Price: {self.__price}'


class Bond:
    def __init__(self, isin: ISIN, issuer: str, bond_yield: float, coupon_rate: float, face_value: float):
        self.isin = isin
        self.name = issuer
        self.bond_yield = bond_yield
        self.coupon_rate = coupon_rate
        self.face_value = face_value

    def __repr__(self):
        return f'Bond({self.name}, {self.bond_yield}, {self.coupon_rate})'

    def __str__(self):
        return f'{self.name} - {self.bond_yield} - {self.coupon_rate}'


class BondPortfolio:
    def __init__(self, bonds: Iterable[Bond]):
        if not isinstance(bonds, Iterable):
            raise TypeError("Bonds must be an iterable.")
        self.bonds = bonds

    def __iter__(self):
        yield from self.bonds

    def __len__(self):
        return len(self.bonds)

    def __getitem__(self, isin):
        for bond in self.bonds:
            if bond.isin == isin:
                return bond
        raise KeyError(f"Bond with ISIN {isin} not found.")

    def __contains__(self, bond):
        return bond in self.bonds

    def __repr__(self):
        return ','.join([str(bond.isin) for bond in self.bonds])


# Example Usage
bond_price = BondPrice(
    bond_yield=0.075,
    coupon_rate=0.05,
    expiry_date=datetime.today() + timedelta(days=365 * 20),
    face_value=1000
)
print(f"Bond Price: {bond_price.get_price()}")
print(f"Coupon Dates: {bond_price.get_coupon_dates()}")

In [None]:
class Strange:
    def __init__(self, val):
        self.val = val

    def __add__(self, other):
        try:
            return self.val * other.val
        except AttributeError:
            return self.val + other

    def __radd__(self, other):
        return self.val + other

a = Strange(3)
b = Strange(2)

print(a + b)    # Uses __add__: 3 * 2 = 6
print(10 + a)   # Uses __radd__: 3 + 10 = 13
print(a + 10)   # Uses __add__: 3 * 10 = 30

In [17]:
class Organism:
    def describe(self):
        return "Basic lifeform"

    class Cell:
        def function(self):
            return "Generic cell"

class Animal(Organism):
    def describe(self):
        return "I am an animal"

    class Cell(Organism.Cell):
        def function(self):
            return "Animal cell with specialized roles"

print(Animal().describe(), Animal.Cell().function())

I am an animal Animal cell with specialized roles


In [18]:
from itertools import zip_longest

arr = [1] * 2
arr2 = [1, 3] * 2
arr3 = [1] * 3
ans = 0

for i, j, k in zip_longest(arr, arr2, arr3, fillvalue=2):
    print((i + j), k)
    ans += (i + j) * k

print(ans)

2 1
4 1
3 1
5 2
19


In [None]:
class A():
    __num = 5

    def getNum(self):
        return self.__num

a = A()
a.__num = 4

print(f"{a.getNum()}{len(a.__dict__)}{a.__num}")

In [None]:
class A():
    def normalMethod(self):
        print(1, end = '')

class B(A):
    def normalMethod(self):
        print(2, end = '')
        return super().normalMethod()

class C(A):
    def normalMethod(self):
        print(3, end = '')
    
class D(B, C):
    def normalMethod(self):
        print(4, end = '')
        super().normalMethod()

d = D()
d.normalMethod()

In [None]:
class MyClass():
    def __init__(self, num):
        self.mine = num
    
    def __eq__(self, other):
        if isinstance(other, MyClass):
            return self.mine - other.mine
        
class YourClass():

    def __init__(self, num):
        self.yours = num

    def __eq__(self, other):
        if isinstance(other, YourClass):
            return self.yours + other.yours
        elif isinstance(other, MyClass):
            return self.yours * other.mine
        else:
            return 3

print(MyClass(4) == YourClass(2))

In [None]:
text = "Python"[::-1][::-1]
print(text)

In [None]:
import contextlib

class Origin:
    def __init__(self):
        print("The origin")

class A(Origin):
    def __init__(self):
        print("A's init")
        super().__init__()

class B(Origin):
    def __init__(self):
        print("B's init")
        super().__init__()

class TestContext(A, B):
    def __enter__(self):
        return 'Has Entered and been initialized'
    
    def __exit__(self, exec_type, exec_value, traceback):
        print("---------")
        print(exec_type)
        print(exec_value)
        print(traceback)

@contextlib.contextmanager
def testcontext():
    message = 'origin'

    try:
        yield 'Has Entered and been initialized'
        print("SUCCC")
        yield 'Here'
    except Exception as e:
        print(e)
    finally:
        print(message)



with testcontext() as te:
    print(te + 1)

# with TestContext() as text:
#     print(text + 1)

# @testcontext()
# def test():
#     print("Testing things")

In [None]:
from typing import ChainMap, Any
class Environment(ChainMap[int, Any]):

    def change(self, key: int, value: Any):
        for map in self.maps:
            if key in map:
                map[key] = value
                return
    
        raise KeyError
    
inner_env = { 1: 'something ', 2: 'else'}
outer_env = { 3: 'or is it ', 4: 'this'}

env = Environment(inner_env, outer_env)
env[3] = 'new stuff'
env

In [None]:
id(env)

In [None]:
from math import fma

print(int(fma(2, 2, 1_10_0)))

In [None]:
arr1 = [1, 2]
arr2 = [1, 2, 3, 4]
arr3 = list(zip(arr1,arr2))
print(arr3)

In [None]:
a = 1
b = 1
if (a is b):
    print("A")
else:
    print("B")

a = 257
b = 257
if (a is b):
    print("C")
else:
    print("D")

a = int("-6")
b = -6
if (a is b):
    print("E")
else:
    print("F")

In [None]:
t = (1, 2, [30, 40])
t[2] += [50, 60]

In [None]:
t

In [None]:
a = 257
b = 257
if (a is b):
    print("C")
else:
    print("D")

In [None]:
A = [1,2,3,4,5,6,612434,2342343,34,34234]
B = A.copy()
A.sort()

In [None]:
B

In [None]:
A

In [None]:
sorted(B)

In [None]:
class A():
    __num = 5

    def getNum(self):
        return self.__num

a = A()
print(a.__dict__)
a.__num = 4
a.__dict__

In [None]:
import os
os.cpu_count()

In [None]:
import httpx

client = httpx.AsyncClient()
task = client.post('http://localhost:8080')
response = await task
print(response)

In [None]:
from datetime import datetime

datetime.now().__class__

In [None]:
def config(default=None, **kwargs, ):
    kwargs['default'] = kwargs.get('default', default)
    return kwargs

result = config(setting=1, default=2)
print(result['default'])

In [None]:
class A(type):

    def __init__(c, *args, **kwargs):
        print("1", end = "")
        super().__init__(*args, **kwargs)

    def __call__(c, *args, **kwargs):
        print("2", end = "")
        return super().__call__(*args, **kwargs)

    def __new__(mcs, name, bases, namespace):
        print("3", end = "")
        return super().__new__(mcs, name, bases, namespace)

class B(metaclass=A):

    def __init__(self):
        print("4", end = "")

_ = B()

In [None]:
class A:
    def __init__(self, value):
        self.val = value
    def __hash__(self):
        return hash(self.val)

class B:
    def __init__(self, value):
        self.val = value

    def __eq__(self, other):
        return isinstance(other, B) and self.val == other.val
    
    def __hash__(self):
        return hash(self.val)

dict1 = dict()

a = True
b = 1
dict1[a] = 1
dict1[b] = 2

c = A(3)
d = A(3)
dict1[c] = 3
dict1[d] = 4

e = B(4)
f = B(4)
dict1[e] = 5
dict1[f] = 6

print(dict1.values())

In [None]:
dict1

In [None]:
import math

class Vector2d:
    typecode = 'd' #1

    def __init__(self, x, y): #2
        self.x = float(x)
        self.y = float(y)
    
    def __iter__(self):
        return (i for i in (self.x, self.y)) #3
    
    def __repr__(self):
        class_name = type(self).__name__
        return '{}({!r}, {!r})'.format(class_name, *self) #4
    
    def __str__(self):
        return str(tuple(self)) #5
    
    def __bytes__(self):
        return (bytes([ord(self.typecode)])) + \
        bytes(array(self.typecode, self)) #6 #7
    
    def __eq__(self, value):
        return tuple(self) == tuple(value) #8
    
    def __abs__(self):
        return math.hypot(self.x, self.y) #9
    
    def __bool__(self):
        return bool(abs(self)) #10
    
    @classmethod
    def frombytes(cls, octets):
        typecode = chr(octets[0])
        memv = memoryview(object[1:]).cast(typecode)
        return cls(*memv)

In [None]:
[*memoryview(bytes([ord('b')]) + bytes(list([1,2,3]))).cast('b')]

In [None]:
import sys

a = 5
sys.getrefcount(a)

In [None]:
import weakref

# Create a simple class
class MyClass:
    pass

obj = MyClass()  # Create an instance of MyClass

# Create a weak reference to obj.
# A weak reference does not increase the reference count of the object.
weak_obj = weakref.ref(obj)

print(weak_obj())  # Prints the object, since it still exists

del obj  # Remove the strong reference to the object

# Now, since there are no strong references, the object is garbage collected.
print(weak_obj())  # Prints None, as the object has been collected

# Explanation:
# - weakref.ref(obj) creates a weak reference to obj.
# - As long as there is at least one strong reference, weak_obj() returns the object.
# - Once all strong references are gone (after del obj), weak_obj() returns None.

# Why use weak references?
# Weak references are useful when you want to reference an object without preventing it from being garbage collected.
# This is helpful in caching, memoization, or tracking objects without creating memory leaks.
# For example, in observer patterns or caches, you may want to keep references to objects only as long as they exist elsewhere.

In [None]:
class TestingProps:
    def __init__(self, initial=0):
        self.__initial = initial

    @property
    def doing_the_calc(self):
      self.__initial = self.__initial + 10 + 20
      return self.__initial

tp = TestingProps(10)
print(tp.doing_the_calc)
print(tp.doing_the_calc)
print(tp.doing_the_calc)

In [None]:
from functools import cached_property

class Circle:
    def __init__(self, radius):
        self.radius = radius

    @cached_property
    def area(self):
        print("Calculating area...")
        return 3.14159 * self.radius ** 2

c = Circle(5)
print(c.area)  # First access, calculates and caches
print(c.area)  # Second access, uses cached value

In [None]:
class Example:
    def __new__(cls, value):
        print("__new__ called")
        return super().__new__(cls)
    
    def __init__(self, value):
        print("__init__ called")
        self.value = value
    
    def __call__(self, x):
        print("__call__ called")
        return self.value * x

# Creating an instance - __new__ is called
obj = Example(5)
# Output:
# __new__ called
# __init__ called

# Using the instance as a function - __call__ is called
result = obj(3)  # This calls __call__
# Output:
# __call__ called

In [None]:
def f():
    print('f', end='')
    return 2
def g():
    print('g', end='')
    return 3
def h():
    print('h', end='')
    return 2

print(f() ** g() ** h(), end='')

In [None]:
from functools import cache, wraps
def log(func):
    if getattr(func, "_log", False):
        return func
    
    @wraps(func)
    def wrapper(*args, **kwargs):
        result = func(*args, **kwargs)
        print(result, end='')
        return result
    
    wrapper._log = True
    return wrapper

@log
@cache
@log
def foo(a):
    return a
    
foo(1)
foo(2)
foo(1)
foo(3)
foo(1)
foo(2)

In [None]:
grid = [[0] * 3] * 3
grid[0][0] = 3
grid

In [None]:
print(min(-0.0, False, round(7/2) - 6.0, float("nan"), key = lambda x: -x))

In [None]:
type('t')

In [None]:
a = (1)
print(type(a))

In [None]:
def func(x):
    x = x + [4]
    return x

a = [1, 2, 3]
b = func(a)
print(a)

In [None]:
import sys
arr = [1,2,3]
arr1 = arr
s = max(arr)
arr2 = [arr, arr1, s]
arr3 = arr[:]
arr4 = [arr, arr1, arr3]
print(sys.getrefcount(arr))

In [None]:
x = "Hello my name is Harold"

print(f"abc{x[-6::2]:>10}123")

In [None]:
x = "Hello my name is Harold"

f"{x[-6::2]}"

In [None]:
"""2"""
def myFunc():
    #3
    """4"""
    #5
    """6"""
    return None
    """7"""
    #8
#9
"""10"""

    
print(myFunc.__doc__)

In [None]:
print(f"{2.5:,.0f}, {3.5:,.0f}, {4.5:,.0f}, {999.5:,.0f}")

# Metaclass Usecase

In [None]:
class MetaClass(type):
    def __new__(cls, name, bases, cls_dict):
        print(f"Meta class guy. {name}")
        return super().__new__(cls, name, bases, cls_dict)
    
    def something(self, a: str):
        return a + " says hello"
    
    def __call__(self, *args, **kwds):
        print("Meta class call")
        return self.something

class A(metaclass=MetaClass):
    def __init__(self):
        print("Initialized")

class B(A):
    def __call__(self, *args, **kwds):
        print("B called")
        return super().__call__(*args, **kwds)

b = B()
b("Something")

import inspect
inspect.signature(b)

In [None]:
from inspect import Parameter, Signature

def make_signature(names: list) -> Signature:
    return Signature(
        Parameter(name, Parameter.POSITIONAL_OR_KEYWORD) for name in names
    )

class MachineLearningClass(type):
    def __new__(cls, name, bases, class_dict):
        # This is pre or during class creation
        cls_obj = super().__new__(cls, name, bases, class_dict)
        signature = make_signature(cls_obj._fields)
        setattr(cls_obj, '__signature__', signature)
        return cls_obj

class MachineLearningBase(metaclass=MachineLearningClass):
    _fields = []

    def __repr__(self):
        return str(self.__class__.__dict__)
    
    def __init__(self, *args, **kwargs):
        bound = self.__signature__.bind(*args, **kwargs)
        bound.apply_defaults()  # Apply default values if any
        # Set attributes based on bound arguments
        for name, value in bound.arguments.items():
            setattr(self, name, value)


class KNNGetterSetter(MachineLearningBase):
    _fields = ['_type_of_algo', 'n_of_cluster']

    @property
    def type_of_algo(self):
        return self._type_of_algo  # Use private attribute with underscore
    
    @type_of_algo.setter
    def type_of_algo(self, algo_type):
        if algo_type in ['pearsons', 'kendall', 'spearman']:
            self._type_of_algo = algo_type  # Use private attribute with underscore
        else:
            print(f"Invalid algorithm type '{algo_type}'. Using default 'pearsons'")
            self._type_of_algo = 'pearsons'  # Use private attribute with underscore


# Test the fixed code
b = KNNGetterSetter('12', n_of_cluster=5)
print(f"n_of_cluster: {b.n_of_cluster}")
print(f"_type_of_algo: {b._type_of_algo}")

# Test the property
b.type_of_algo = 'pearsons'
print(f"type_of_algo (via property): {b.type_of_algo}")

# Test invalid algorithm type
b.type_of_algo = 'invalid_algo'
print(f"type_of_algo after invalid input: {b.type_of_algo}")

### Getter Setter in Python

In [None]:
class KNNGetterSetter(MachineLearningBase):
    _fields = ['_type_of_algo', 'n_of_cluster']

    @property
    def type_of_algo(self):
        return self.type_of_algo # Use private attribute
    
    @type_of_algo.setter
    def type_of_algo(self, algo_type):
        if algo_type in ['pearsons', 'kendall', 'spearman']:
            self.type_of_algo = algo_type  # Use private attribute
        else:
            print(f"Invalid algorithm type '{algo_type}'. Using default 'pearsons'")
            self.type_of_algo = 'pearsons' 
        # Optionally, you could raise an exception or set a default here
b = KNNGetterSetter('12', n_of_cluster=5)
b.type_of_algo

## Descriptor

In [None]:
class UpperCaseDescriptor:
    def __get__(self, instance, owner):
            return instance.__dict__.get(f'_{instance.__class__.__str__}__name', '')
    
    def __set__(self, instance, value):
        instance.__dict__[f'_{instance.__class__.__str__}__name'] = value.upper()

    def __delete__(self, instance):
        if f'_{instance.__class__.__str__}__name' in instance.__dict__:
            del instance.__dict__[f'_{instance.__class__.__str__}__name']

class OnlyPositiveDescriptor:
    def __get__(self, instance, owner) -> float | int:
        return instance.__dict__.get(f'_{instance.__class__.__str__}__price', '')
    
    def __set__(self, instance, value):
        if isinstance(value, float) or isinstance(value, int):
            if value < 0:
                raise ValueError("Needs to be positive number.")
        else:
            raise TypeError("Must be an integer or a float")
        instance.__dict__[f'_{instance.__class__.__str__}__price'] = value

class Person:
    __name = UpperCaseDescriptor()
    __price = OnlyPositiveDescriptor()

    def __init__(self, name, price):
        self.__name = name # Mangling
        self.__price = price
        print(self.__dict__)

p = Person('alice', 100)
print(p.__dict__)
print(p.__name) # The decriptors are called here. Not during initialization.
p.name = 'bob'
print(p.__name)
try:
    p.__price = -123
except ValueError as e:
    print(e)
print(p.__price)

# Metaprogramming

In [None]:
class FrozenJSON:
    """
    A read-only facade for navigating a JSON-like object using attribute notation
    """

    def __init__(self, mapping):
        self.__data = dict1(mapping)
    
    def __get_attr
        

In [None]:
import random

random_2d = ["".join([str(random.randint(0, 100)) for _ in range(3)]) for _ in range(3)]
print(random_2d)

list(map(list, zip(*random_2d)))

In [None]:
def remove_extra_spaces(s):
    return ' '.join(s.split())

# Example usage
text = "  This   is   a   string   with   extra   spaces.  "
cleaned_text = remove_extra_spaces(text)
print(cleaned_text)

In [None]:
def flatten_array(array):
    ans = []
    for item in array:
        if isinstance(item, list):
            ans += flatten_array(item)
        else:
            ans += [item]
    return ans

array = [1, [2, 3], [4, [5, 6]], 7]
flatten_array(array)

In [None]:
body = """
def __init__(self, name):
    print(f"In the init {name}")

def hello(self):
    print("Hello")
"""
clsdict = {}
exec(body, globals(), clsdict)
Class = type("NewClass", (object,), clsdict)

a = Class("demo")
a.hello()

In [None]:
import tracemalloc
import sys

# Start tracing memory allocations
tracemalloc.start()

# Your code here
data = []
for i in range(100000):
    data.append(f"string_{i}")

# Get current memory usage
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory usage: {current / 1024 / 1024:.2f} MB")
print(f"Peak memory usage: {peak / 1024 / 1024:.2f} MB")

# Get top memory consuming lines
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')

print("\nTop 10 memory allocations:")
for stat in top_stats[:10]:
    print(f"{stat.traceback.format()[-1]}: {stat.size / 1024:.2f} KB")

tracemalloc.stop()

In [None]:
import asyncio
import logging
import time
from contextlib import asynccontextmanager
from enum import Enum

class TransactionResult(Enum):
    SUCCESS = "success"
    ROLLBACK = "rollback"
    DEADLOCK_RETRY = "deadlock_retry"
    TIMEOUT = "timeout"

class TradingDBTransaction:
    def __init__(self, connection_pool, retry_count=3, timeout=30, isolation_level="READ_COMMITTED"):
        self.connection_pool = connection_pool
        self.retry_count = retry_count
        self.timeout = timeout
        self.isolation_level = isolation_level
        
    async def __aenter__(self):
        # Your async context manager entry
        self.db_conn = AsyncDBClient(self.connection_pool,
                                self.retry_count,
                                self.timeout,
                                self.isolation_level)
        return db_conn
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # Your async context manager exit
        # Handle different exception types
        try:
            self.db.commit()
        except DBException as e:
            print("error commiting")
        finally:
            self.db.close()
        
    async def execute(self, query, params=None):
        # Execute query within transaction
        return self.db.execute(query, params)
        
        
    async def _handle_deadlock(self, attempt):
        # Implement deadlock retry logic
        pass
        
    async def _log_transaction(self, result, duration, queries_executed):
        # Audit logging
        pass

## Classic Couroutine

In [None]:
def averager():
    total = 0.0
    count = 0
    average = 0.0
    done = 0
    while True:
        term = yield
        if term == 'DONE':
            break
        total += term
        count += 1
        average = total/count
    print("Avg calc over")
    yield average


couroutine = averager()
next(couroutine)
couroutine.send(10)
couroutine.send(332)
couroutine.send(20.21e1)
couroutine.send('DONE')

## Generator

In [None]:
def walk(array):
    for item in array:
        if isinstance(item, list):
            yield from walk(item) # Nested generators
        else:
            yield item

gen = walk([1,2,3,4,5,[1,3,4,[324,4,3,4,32]], [12.343, 12]])
answer = []
while True:
    try:
        val = next(gen)
        answer.append(val)
    except StopIteration:
        break

print(answer)

## Property Factory
Higher order function that creates a parameterized set of accessor functions and builds custom properties instance for them

Prevent repetative code


In [None]:
def bond_price(price_variable) -> float:
    def set_quantity(instance, price):
        print("Setting quantity")
        if price < 0:
            raise ValueError("Price cannot be negative")
        instance.__dict__[price_variable] = price
    
    def get_quantity(instance) -> float:
        print("Getting quantity")
        try:
            return instance.__dict__[price_variable]
        except KeyError:
            raise AttributeError("No Value set")
    
    return property(fget=get_quantity, fset=set_quantity)

class BondPrice:
    price = bond_price('price')
    nominal = bond_price('nominal')

    def __init__(self, nominal, price):
        self.price = price # This is when the set_quantity is called
        self.nominal = nominal # This is when the get_qunatity is called

bond_price = BondPrice(1000, 234)
print(bond_price.nominal)
print(bond_price.price)

## Descriptor

In [None]:
class Price:
    def __set_name__(self, owner_name, storage_name):
        self.storage_name = storage_name

    def __set__(self, instance, value):
        print("Print Set: ", instance, value)
        if value < 0:
            raise ValueError("Price cannot be negative")
        instance.__dict__[self.storage_name] = value
    
    def __get__(self, instance, owner): # Owner is the reference to the managed class
        print("Print Get: ", instance, owner)
        if instance is None: # When called from the managed class 
            return self # Return Descriptor instance
        try:
            return instance.__dict__[self.storage_name]
        except KeyError:
            raise AttributeError(f"No attribute {self.storage_name}")
        
        ## Not that get is not required when __set_name__ is used because the storage attribute matches the name in the managed instance

class BondPrice:
    pv = Price()
    notional = Price()

    def __init__(self, pv, notional):
        self.pv = pv
        self.notional = notional


BondPrice.pv # Will return Price ref if instance is None

In [None]:
bond_1 = BondPrice(100, 20)
bond_1.notional

```self.__dict__[self.storage_name] = value ``` is a bad idea because BondPrice can have 1000 instances but Price will ony have two 'pv' and  'notional'

Note that descriptors can be overriden.

In [None]:
import abc


class Validated(abc.ABC):

    def __set_name__(self, owner, name):
        self.storage_name = name
    
    def __set__(self, instance, value):
        value = self.validate(self.storage_name, value)
        instance.__dict__[self.storage_name] = value
    
    @abc.abstractmethod
    def validate(self, name, value):
        """
        raise a validated value or raise Value Error
        """

## Overriding Descriptors
class Quantity(Validated):
    
    def validate(self, name, value):
        if value <=0 :
            raise ValueError("Value is 0 or negative")
        return value

## Overriding Descriptors
class NonBlank(Validated):

    def validate(self, name, value):
        if name == "" or value == "":
            raise ValueError(f"Bank value. for '{name}'")
        value = value.strip()
        if not value:
            raise ValueError(f"Bank value. for '{name}'")
        return value

class SomePrice:
    price = Quantity()
    message = NonBlank()

    def __init__(self, price, message):
        self.price = price
        self.message = message

some_item = SomePrice(102, "Hello there")
print(some_item.message)
print(some_item.price)
try:
    some_item.message = ""
except ValueError as e:
    print(e)

In [None]:
from contextlib import contextmanager

def connection(host, user, password):
    @contextmanager
    def get_connection():
        conn = None
        try:
            # Replace with actual connection logic, e.g., using psycopg2 or mysql.connector
            conn = f"Connected to {host} as {user}"
            yield conn
        finally:
            # Replace with actual cleanup logic
            print("Connection closed.")

    return get_connection()

class Connection:
    def __init__(self, host, user, password):
        self.host = host
        self.user = user
        self.password = password

    def __enter__(self):
        # Replace with actual connection logic
        self.conn = f"Connected to {self.host} as {self.user}"
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        # Replace with actual cleanup logic
        print("Connection closed.")

with Connection("localhost", "admin", "secret") as conn:
    print(conn)

with connection("localhost", "admin", "secret") as conn:
    print(conn)

Connected to localhost as admin
Connection closed.
Connected to localhost as admin
Connection closed.


In [7]:
dict1 = {"a": 1}
dict2 = {"b": 2}
merged = {**dict1, **dict2} 
print(merged)

{'a': 1, 'b': 2}


In [15]:
A_iter = [[1,2],[3,4]]
list(map(tuple, zip(*A_iter)))

[(1, 3), (2, 4)]

In [None]:
with open('your_file.txt', 'r') as file:
    for line in file:
        print(line.rstrip())

In [12]:
class A(type):
    def __init__(c, *args, **kwargs):
        print("1", end = "")
        super().__init__(*args, **kwargs)

    def __call__(c, *args, **kwargs):
        print("2", end = "")
        type.__call__(c, *args, **kwargs)

    def __new__(c, *args, **kwargs):
        print("3", end = "")
        return super().__new__(c, *args, **kwargs)

class B(metaclass=A):
    def __init__(s):
        print("4", end = "")

_ = B()
_ = B()

312424

In [18]:
sorted(A_iter) # Merge Sort
A_iter.sort() # Quick sort

In [28]:
class A:
    def __init__(self, value):
        self.val = value
    def __hash__(self):
        return hash(self.val)

class B:
    def __init__(self, value):
        self.val = value
    def __eq__(self, other):
        return isinstance(other, B) and self.val == other.val 
    def __hash__(self):
        return hash(self.val)

dict1 = {}

a = True
b = 1
dict1[a] = 1
dict1[b] = 2

c = A(3)
d = A(3)
dict1[c] = 3
dict1[d] = 4

e = B(4)
f = B(4)
dict1[e] = 5
dict1[f] = 6

print(sum(dict1.values()))
dict1

15


{True: 2,
 <__main__.A at 0x1d0c2fb3010>: 3,
 <__main__.A at 0x1d0c2fb2a10>: 4,
 <__main__.B at 0x1d0c2fb1c90>: 6}