# Python Advanced Workshop

#### Einleitung
Dieses Notebook enthält das gesamte Material des Workshops und dient als interaktive Resource.  
Code-Zellen können mit Strg+Enter oder Shift+Enter ausgeführt werden.

## Themen

Es gibt viele Konzepte und Themen die für einen Fortgeschrittenen-Workshop infrage kommen.  
Hier sollen drei Konzepte vorgestellt und mit Beispielen aus der Praxis besprochen werden:
* Decorator
* Contextmanager
* Concurrency

## Decorator
Im Basic Workshop haben wir besprochen, dass Funktionen in Python ebenfalls First Class Objekte  
sind und an andere Funktionen übergeben werden können.

In [None]:
def my_add(a,b):
    return a+b

def hashbox_print(fcn, *args,**kwargs):          # Funktionen sind 1. class Objekte und können ganz normal übergeben werden
    result_string = str( fcn(*args,**kwargs) )   # *args und **kwargs werden weitergereicht
    print("#"*(len(result_string)+4))
    print("# "+result_string+" #")
    print("#"*(len(result_string)+4))

hashbox_print(my_add,2,3)                        # my_add() ruft die Funktion auf, my_add ist das Funktionsobjekt

Auf diesem Prinzip basierend können wir Funktionen bauen, die andere Funktionen erweitern oder deren Rückgabewerte abfangen und verändern.

In [None]:
def hashbox_dec(fcn):                          # Decorator Funktion, bekommt eine Funktion als Parameter
    def wrapper(*args,**kwargs):               # Wrapped die Originalfunktion
        value = fcn(*args,**kwargs)            # Aufruf der Originalfunktion, *args und **kwargs werden weitergereicht
        result_string = str(value)
        print("#"*(len(result_string)+4))
        print("# "+result_string+" #")         # Zusätzliche Funktionalität
        print("#"*(len(result_string)+4))
        return value                           # Rückgabe des ursprünglichen Funktionswertes
    return wrapper                             # Rückgabe der neudefinierten wrapper Funktion

decorated_my_add = hashbox_dec(my_add)         # Dekorieren einer Funktion, umständliche Syntax

decorated_my_add(2,3)

Python bringt syntaktischen Zucker mit um das Dekorieren von Funktionen zu vereinfachen.

In [None]:
@hashbox_dec            # Decorator Syntax
def my_add(a,b):
    return a+b

# Gleichbedeutend mit:
# my_add = hashbox_dec(my_add)

my_add(4,5)

Alternativ können auch Klassen als Decorator verwendet werden.

In [None]:
class OneMore:                                # Decorator Klasse
    def __init__(self, fcn):                  # Init nimmt die zu dekorierende Funktion entgegen
        self.fcn = fcn                        # und speichert diese
        
    def __call__(self,*args,**kwargs):        # Anstelle einer inneren Funktion definieren wir die call Methode
        original = self.fcn(*args,**kwargs)   # Aufruf der originalen Funktion
        return original+1                     # Rückgabe eines veränderten Wertes
    
@OneMore
def my_sub(a,b):
    return a-b

my_sub(5,2)

Dieser sehr simple Weg hat einige Nachteile, darunter der Verlust von Funktionssignatur und Docstrings.

In [None]:
def hashbox_dec(fcn):
    """
    Decorate a function with nice hashbox printing.
    
    Parameters
    ----------
    fcn : callable
        Function to decorate.
    
    Returns
    -------
    callable
        Decorated function
    """
    def wrapper(*args,**kwargs):
        """
        Print the result of the original function in a hashbox before returning the value.
        
        Parameters
        ----------
        *args : tuple
            Positional arguments to original function.
        **kwargs : dict
            Keyword arguments to original function.
        
        Returns
        -------
        any
            Result of original function.
        """
        value = fcn(*args,**kwargs)            
        result_string = str(value)
        print("#"*(len(result_string)+4))
        print("# "+result_string+" #")         
        print("#"*(len(result_string)+4))
        return value                           
    return wrapper    

@hashbox_dec
def my_mult(a,b):
    """
    Multiply two numbers.
    
    Parameters
    ----------
    a : int or float
        First factor.
    b : int or float
        Second factor.
    
    Returns
    -------
    int or float
        Product of `a` and `b`.
    """
    return a*b

In [None]:
my_mult

In [None]:
help(my_mult)

Dieses Problem lässt sich einfach lösen, nämlich mit einem Decorator!

In [None]:
import functools

def hashbox_dec(fcn):
    """
    Print fcn result in nice hashbox before returning the value.
    
    Parameters
    ----------
    fcn : callable
        Function to decorate.
    
    Returns
    -------
    callable
        Decorated function
    """
    @functools.wraps(fcn)                   # <---- Magic happens here!
    def wrapper(*args,**kwargs):
        value = fcn(*args,**kwargs)            
        result_string = str(value)
        print("#"*(len(result_string)+4))
        print("# "+result_string+" #")         
        print("#"*(len(result_string)+4))
        return value                           
    return wrapper    

@hashbox_dec
def my_mult(a,b):
    """
    Multiply two numbers.
    
    Parameters
    ----------
    a : int or float
        First factor.
    b : int or float
        Second factor.
    
    Returns
    -------
    int or float
        Product of `a` and `b`.
    """
    return a*b

In [None]:
my_mult

In [None]:
help(my_mult)

### Best practise: Nutzt immer @functools.wraps wenn ihr Decorator schreibt!

Bei genauem Hinsehen sieht der Code oben komisch aus!   

@functools.wraps(fcn)  
def wrapper(\*args,\*\*kwargs)

Hier wird ein Parameter (fcn) an functools.wraps übergeben und das ganze als decorator verwendet.
Zum Verständnis können wir uns das ganze ohne den Zucker ansehen.  

In [None]:
def wrapper(*args,**kwargs):
    pass

def fcn():
    pass

fcn = functools.wraps(fcn)(wrapper)

functools.wraps(fcn) muss also ein callable Objekt zurückgeben, das eine Funktion als Parameter nimmt.  
Oder andersgesagt functools.wraps() ist kein Decorator sondern eine Decorator-Factory!  

Im Folgenden wird zur Klarheit zwischen Decorator und Factory unterschieden, oft wird das aber nicht klar getrennt und beides wird einfach als Decorator bezeichnet.

In [None]:
def create_count_dec(counter_start):          # Factory, gibt einen Decorator zurück
    def count_dec(fcn):                       # Der eigentliche Decorator
        @functools.wraps(fcn)
        def wrapper(*args,**kwargs):          # Der Wrapper
            wrapper.counter +=1
            return fcn(*args,**kwargs)
        wrapper.counter=counter_start         # Initialisierung des states
        return wrapper
    return count_dec

@create_count_dec(0)
def my_div(a,b):
    return a/b

print(my_div(6,2))
print(my_div(9,3))
print(my_div(100,2))

print(f"my_div was called {my_div.counter} times!")

In [None]:
import functools
class CounterDecFactory:                            # Für Klassen gibt es verschiedene Ansätze
    def __init__(self,counter_start):               # Die init Methode bekommt jetzt die Parameter
        self.counter = counter_start                
        self.mode = "decorating"                    # Hilfsvariable um Verhalten von call zu steuern
        
    def __call__(self,*args,**kwargs):              
        if self.mode == "decorating":
            self.fcn = args[0]                      # Erster Aufruf, erstes Argument ist zu dekorierende Funktion
            self.mode = "calling"
            functools.update_wrapper(self,self.fcn) # in etwa äquivalent zu functools.wraps
            return self                             # Rückgabe der Dekorierten Funktion
         
        self.counter +=1                            # weitere Aufrufe
        return self.fcn(*args, **kwargs)

In [None]:
@CounterDecFactory(5)
def my_div(a,b):
    """
    Divide a by b.
    
    Parameters
    ----------
    a : number
        The dividend.
    b : number
        The divisor.
        
    Returns
    -------
    number
        The quotient a/b.
    """
    return a/b

print(my_div(6,2))
print(my_div(9,3))
print(my_div(100,2))

print(f"my_div was called {my_div.counter} times!")

Häufige Anwendungsfälle für Decorator sind Logging, Authentifizierungschecks, Caching und Metalogik da Informationen über die dekorierte Funktion verfügbar sind.  
Auch beim Definieren von Klassen sind Decorator wie @property, @staticmethod, @classmethod oder @abstractmethod nützlich und häufig anzutreffen.  
Funktionen als Decorator haben den Vorteil, dass die Signatur und Docstrings sowie der Type besser erhalten bleiben.  
Klassen sind besser geeignet wenn ein (komplexer) State erhalten werden soll.

## Contextmanager
Contextmanager definieren einen Block bei dessen Ein- und Austritt spezieller Code ausgeführt wird.  
Sie vereinfachen das Exceptionhandling indem sie sicherstellen, dass egal an welcher Stelle im Inneren eine Exception geworfen wird, der Austrittscode IMMER ausgeführt wird. Dabei sind sie simpler und schlanker zu nutzen als try / except / finally Blöcke.

Häufige Anwendungsfälle sind Resourcenmanagement (file handles, database connections, sockets, locks & semaphores)
und das Standardbeispiel ist das  
with open() as f:  
Statement.

Hier soll es darum gehen wie wir unsere eigenen Contextmanager erstellen können und wie Exceptions verarbeitet werden.

In [None]:
class file:                                            # Contextmanager als Klasse 
                                                       # muss enter und exit Methoden haben    
    def __init__(self, file_name, method):             # Initialisierung
        print("Init file")
        self.file_name = file_name
        self.method = method
        
    def __enter__(self):                               # Eintritt
        print("Entering the context...")
        self.file_obj = open(self.file_name, self.method)
        return self.file_obj
    
    def __exit__(self, exctype, excinst, traceback):   # Austritt
        print("...leaving the context.")
        self.file_obj.close()

In [None]:
with file("foo.txt","w") as ofile:
    print("inside the context")
    ofile.write("bar")

In [None]:
with file("foo.txt","w") as ofile:
    print("inside the context")
    raise Exception("My custom exception text.")     # Exception innerhalb des with Blocks
    ofile.write("bar")

In [None]:
class file:
    
    def __init__(self, file_name, method):            # Initialisierung
        print("Init file")
        self.file_name = file_name
        self.method = method
        
    def __enter__(self):                              # Eintritt
        print("Entering the context...")
        self.file_obj = open(self.file_name, self.method)
        return self.file_obj
    
    def __exit__(self, exctype, excinst, traceback):  # Austritt
        print("...leaving the context.")
        self.file_obj.close()
        if exctype is not None:                       # Zusätzliche Argumente werden bei Exceptions genutzt
            print(exctype)
            print(excinst)
            print(traceback)
            print("Exception has been handled.")
            return True                               # Exception abgefangen, andere return Werte re-raisen

In [None]:
with file("foo.txt","w") as ofile:
    print("inside the context")
    raise Exception("My custom exception text.")     # Exception innerhalb des with Blocks
    ofile.write("bar")

In [None]:
class suppress:                              # Implementierung von contextlib.suppress
    def __init__(self, *exceptions):
        print("Init suppress")
        self._exceptions = exceptions

    def __enter__(self):
        print("Entering suppress")
        pass

    def __exit__(self, exctype, excinst, exctb):
        print("Exiting suppress")
        return exctype is not None and issubclass(exctype, self._exceptions)

with suppress(FileNotFoundError):
    print("inside with")
    open("non_existing_file.txt","r")
    print("after raise")
print("continuing after suppress context")

Oft können Contextmanager sehr einfach gehalten werden und die Klassendefinition ist unnötiger Overhead.  
Das Stdlib Modul contextlib bringt einige Hilfstools zur Vereinfachung mit.

In [None]:
from contextlib import closing     # Contextmanager closing(resource) der in __exit__ resource.close() aufruft
from urllib.request import urlopen

with closing(urlopen('https://www.python.org')) as page:
    for line in page:
        print(line)

In [None]:
import contextlib

@contextlib.contextmanager            # Decorator!
def open_file(name):                  # Generator Funktion
    print("Init from function")       # Initialisierung
    f = open(name, 'w')
    try:                              # wie __enter__
        print("Entering")
        yield f                       # wird an das "as" zurückgegeben. falls Exceptions vorkommen werden sie hier reraised
    finally:                          # wie __exit__
        print("Leaving")
        f.close()

with open_file("foo.txt") as ofile:
    print("inside with block")
    ofile.write("bar")

In [None]:
# Contextmanager die von contextlib.ContextDecorator erben oder solche die 
# von contextlib.contextmanager erzeugt werden können auch als Decorator genutzt werden...
import time

@contextlib.contextmanager
def log_time(name):
    start_time = time.time()
    try:
        yield
    finally:
        print(f"{name} ran {time.time()-start_time:.2f} seconds")

@log_time("output")
def delayed_out(val, delay=1):
    time.sleep(delay)
    print(val)  

# Syntaktischer zucker für
# def delayed_out(val, delay=1):
#     with log_time(output):
#         ....

delayed_out("spam")

with log_time("calculation"):
    print(2*5)
    time.sleep(0.5)


## Ein reales Beispiel: Registrierung und Checkpointing von PySpark Transformationen
Siehe Examples

## Concurrency & Parallelism
In Python gibt es drei Arten von Concurrency:  
* Multiprocessing
* Threading
* Asyncio  

Jedes dieser Konzepte kommt mit Vor- und Nachteilen und es gilt das Richtige für den konkreten Anwendungsfall auszuwählen.  

Dieses Kapitel folgt Vorträgen von Raymond Hettinger und David Beazley:  
[Raymond Hettinger - Keynote on Concurrency, PyBay 2017](https://www.youtube.com/watch?v=9zinZmE3Ogk) und  
[David Beazley - Python Concurrency From the Ground Up: LIVE!, PyCon 2015](https://www.youtube.com/watch?v=MCs5OvhV9S4)  
Von beiden gibt es eine Menge an Vorträgen auf Youtube die sich lohnen!

### Multiprocessing
Multiprocessing nutzt mehrere CPUs eines Systems parallel. Dabei werden mehrere Python Interpreter als individuelle Prozesse ohne shared-memory gestartet, die weitesgehend unabhängig voneinander laufen. Multiprocessing in Python unterscheidet sich damit kaum von Multiprocessing in anderen Programmiersprachen.  

#### Vorteile
* Nutzung mehrerer CPUs (cores)
* Getrennte Speicherbereiche (keine race conditions)

#### Nachteile
* Getrennte Speicherbereiche (langsame Kommunikation)
* Overhead
* Limitierung durch Hardware und Betriebssystem (100e Prozesse sind keine gute Idee)

#### Wann
* CPU limitierter Workload in purem Python

### Multithreading
Python hat einen **Global Interpreter Lock** (GIL) welcher den gleichzeitigen Zugriff auf Python Objekte verhindert da Pythons Memorymanagement nicht thread-safe ist!  
Das bedeutet, das standard Python Code in Threads **nicht** parallel ausgeführt wird, sondern der Interpreter zwischen den Threads hin und her springt.  
Einige potenziell blockende oder lang laufenden Berechnungen wie z.B. I/O oder NumPy Berechnungen geben den GIL frei und können parallel laufen.

#### Vorteile
* Shared Memory
* Funktioniert mit bestehendem Code
* Wenig zusätzliche Tools nötig (Locks, Queues, Pools)

#### Nachteile
* Shared Memory (race conditions)
* Overhead (weniger als bei Multiprocessing)
* Limitierung durch Betriebssystem (1000e Threads sind immernoch keine gute Idee)
* Threadwechsel wird durch Interpreter gesteuert --> Overhead und Locks nötig

#### Wann
* I/O limitierter Workload
* Workload der den GIL freigibt

In [None]:
# Ausgangspunkt: keine concurrency

counter = 0
def worker():
    """Increment and print the current counter"""
    global counter

    counter += 1
    print(f"The count is {counter}")
    print("---------------")

print("Starting up")
for i in range(10):
    worker()
print("Finishing up")

In [None]:
import threading

counter = 0

def worker():
    """Increment and print the current counter"""
    global counter

    counter += 1
    print(f"The count is {counter}")
    print("---------------")

print("Starting up")
for i in range(10):
    threading.Thread(target=worker).start()  # sehr simples threaden des bestehenden codes
print("Finishing up")

In [None]:
import threading, time, random

##########################################################################################
# Fuzzing is a technique for amplifying race condition errors to make them more visible

FUZZ = True

def fuzz():
    if FUZZ:
        time.sleep(random.random())

###########################################################################################

counter = 0

def worker():
    """Increment and print the current counter"""
    global counter

    fuzz()
    oldcnt = counter
    fuzz()
    counter = oldcnt + 1
    fuzz()
    print(f"The count is {counter}",end="")
    fuzz()
    print()
    fuzz()
    print("---------------",end="")
    fuzz()
    print()
    fuzz()

print("Starting up")
fuzz()
for i in range(10):
    threading.Thread(target=worker).start()
    fuzz()
print("Finishing up")
fuzz()

In [None]:
import threading, time, random
##########################################################################################
# Fuzzing is a technique for amplifying race condition errors to make them more visible

FUZZ = True

def fuzz():
    if FUZZ:
        time.sleep(random.random())

###########################################################################################

counter_lock = threading.Lock()  # to guard critical sections
printer_lock = threading.Lock()

counter = 0

def worker():
    """Increment and print the current counter"""
    global counter
    
    with counter_lock: # Critical section! Counter is accessed!
        fuzz()
        oldcnt = counter
        fuzz()
        counter = oldcnt + 1
        fuzz()
        with printer_lock: # Critical section! Print is used
            print(f"The count is {counter}",end="")
            fuzz()
            print()
            fuzz()
            print("---------------",end="")
            fuzz()
            print()
            fuzz()

with printer_lock:
    print("Starting up",end="")
    fuzz()
    print()

worker_threads = []
for i in range(10):
    t = threading.Thread(target=worker)
    worker_threads.append(t)
    t.start()
    fuzz()
for t in worker_threads:
    t.join()
    fuzz()

with printer_lock:
    print('Finishing up', end='')
    fuzz()
    print()

fuzz()

### Performance von Multithreading
Siehe Beispiel

### Asyncio
Python bringt ein weiteres Konzept der Concurrency mit: Coroutines  
Coroutinen baseieren auf und verhalten sich sehr ähnlich zu Generator-Funktionen.  
Das heißt, ihre Ausführung kann unterbrochen und fortgesetzt werden und das mit sehr wenig CPU Zeit!  

Auch asyncio ist kein Weg um den GIL! Es wird weiterhin nur ein CPU-Kern genutzt.

#### Vorteile
* Sehr wenig Overhead (1000e oder sogar 10000e Concurrent Coroutines sind kein Problem)
* Kooperatives Wechseln zwischen Tasks --> keine Locks nötig! Sehr viel einfacher bei komplexem Code

#### Nachteile
* Funktioniert nicht mit bestehendem Code, eigene Funktionen nötig
* Alles muss non-blocking sein!
* Mehr zusätzliche Tools nötig (futures, event loops, 
* Ungewohnter als Threading

#### Wann
* I/O limitierter Workload
* Sehr große Menge an Tasks

### Ein Beispiel: Azure File Upload mit Python SDK -- Threading vs Asyncio

In [None]:
# uncomment and run this if you don't have the packages installed
#!pip install azure-storage-blob aiohttp

In [None]:
adls_uri = <your_adls>
container_name = <your_container>
sas_token = <your_token>

In [None]:
import hashlib
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

from azure.storage.blob import ContainerClient


def calc_md5_sum(file):
    file = Path(file)
    with file.open("rb") as data:
        md5 = hashlib.md5()
        while chunk := data.read(4096):
            md5.update(chunk)
    return md5.digest()

con_client = ContainerClient(adls_uri,container_name,credential=sas_token)
source_path = Path("files")

def work(file):
    blob_name = f"PythonWorkshop/{file.name}"
    blob_client = con_client.get_blob_client(blob_name)
    change = True
    if blob_client.exists():
        remote_md5 = blob_client.get_blob_properties()["content_settings"]["content_md5"]
        local_md5 = calc_md5_sum(file)
        change = not(local_md5 == remote_md5)
    if change:
        with file.open("rb") as src_file:
            blob_client.upload_blob(src_file,overwrite=True)
    

start = time.time()
with con_client:
    with ThreadPoolExecutor() as executor:    # Tooling Teil 1
        for file in source_path.iterdir():
            executor.submit(work,file)        # Tooling Teil 2
end = time.time()
print(f"upload took {end-start:.2f}s")

In [None]:
import asyncio
import hashlib
import time
from pathlib import Path

from azure.storage.blob.aio import ContainerClient as ContainerClientAIO


con_client = ContainerClientAIO(adls_uri,container_name,credential=sas_token)
source_path = Path("files")

background_tasks = set()                                          # Tooling

def calc_md5_sum(file):   # Blocking! Kein async keyword
    file = Path(file)
    with file.open("rb") as data:
        md5 = hashlib.md5()
        while chunk := data.read(4096):
            md5.update(chunk)
    return md5.digest()

async def getFiles():  # Tooling
    for f in source_path.iterdir():
        yield f

async def work(file):                                              # Tooling
    blob_name = f"PythonWorkshop/{file.name}"
    blob_client = con_client.get_blob_client(blob_name)
    change = True
    if await blob_client.exists():                                 # Tooling
        blob_prop = await blob_client.get_blob_properties()        # Tooling
        remote_md5 = blob_prop["content_settings"]["content_md5"]
        local_md5 = await asyncio.to_thread(calc_md5_sum,file)     # Tooling² + Threading for blocking function!
        change = not(local_md5 == remote_md5)
    if change:
        with file.open("rb") as src_file:
            await blob_client.upload_blob(src_file,overwrite=True) # Tooling

async def main():                                                  # Tooling
    async with con_client:                                         # Tooling
        async for file in getFiles():                              # Tooling
            task = asyncio.create_task(work(file))                 # Tooling
            background_tasks.add(task)                             # Tooling
            task.add_done_callback(background_tasks.discard)       # Tooling
        await asyncio.gather(*background_tasks)                    # Tooling

In [None]:
start = time.time()
await main()

In [None]:
end = time.time()
print(f"upload took {end-start:.2f}s")