## Data Pipelines - Introduction to Prefect
### Big Data Tools 
#### M.Sc. in Applied Analytics (coterminal course)
Fac. de Ingeniería -  Universidad de la Sabana<br>
Prof.: Hugo Franco, Ph.D.

In [4]:
from prefect import flow, task, get_run_logger
from typing import Callable
from functools import wraps
import time


# Custom Timing Decorator for Prefect Tasks
def timing_decorator(func: Callable) -> Callable:
    '''Custom decorator to time task execution'''
    @wraps(func)
    def wrapper(*args, **kwargs):
        logger = get_run_logger()
        start_time = time.time()

        # Execute the original function
        result = func(*args, **kwargs)

        end_time = time.time()
        execution_time = end_time - start_time

        logger.info(f"Task '{func.__name__}' completed in {execution_time:.2f} seconds")
        return result

    return wrapper

@task
def get_users():
    return ["juan@example-mail.com", "ana@mail-example.com"]

@task
@timing_decorator
def send_email(user):
    print(f"Sending email to {user}")

@flow
def notify_users():
    users = get_users()
    for user in users:
        send_email(user)

notify_users()

Sending email to juan@example-mail.com


Sending email to ana@mail-example.com
