# 1. Logging APi Calls
In a microservices architecture, you might want lo log all API request for monitoring and debugging.

* Use a decorator to log the details of the API calls dynamically.

In [1]:
import requests

# Logging decorator for API calls.
def api_logger(func):
    def wrapper(*args, **kwargs):
        print(f'API Request: {func.__name__} | args: {args}, kwargs: {kwargs}')
        response = func(*args, **kwargs)
        print(f'Api Response: {response.status_code} | {response.text[:100]}')
        return response
    return wrapper

@api_logger
def fetch_data_from_api(url, params=None):
    return requests.get(url, params=params)

# Use case: Fetch data from an API
fetch_data_from_api('https://jsonplaceholder.typicode.com/posts', params={'userId': 1})


API Request: fetch_data_from_api | args: ('https://jsonplaceholder.typicode.com/posts',), kwargs: {'params': {'userId': 1}}
Api Response: 200 | [
  {
    "userId": 1,
    "id": 1,
    "title": "sunt aut facere repellat provident occaecati excep


<Response [200]>

# 2. Processing Large Log Files
Image you need to analize gigabytes of server logs without loading them entirely into memory.

* Use a generator to process the log file line by line.

*Best Practice.* Generators allow you process large files efficiently without exhausting memory.
 

In [2]:
# Generator to read a log file line by line.
def read_log_file(filepath):
    with open(filepath, 'r') as file:
        for line in file:
            yield line.strip()

# Use case: Search for error messages in the log.
for log_line in read_log_file('server.log'):
    if 'Error' in log_line:
        print(log_line)
    

# 3. Data Validation in ETL Pipeline
In an ETL(Extract, Transform, Load) pipeline, you might want to validate data records dynamically before processing them.

__Solution with Decorators and Generators__
Combine decorators and generators for efficient fata handling.

__Best Practice:__ This aproach dynamically validates data while efficiently handling large datasets with generators.

In [3]:
# Validate decorator.
def validate_record(func):
    def wrapper(record):
        if not record.get('id') or not record.get('value'):
            print(f'invalid record skipped: {record}')
            return None
        return func(record)
    return wrapper

# Generator for porcessing records.
@validate_record
def process_record(record):
    # Simulate processing
    return {'id': record['id'], 'processed_value': record['value']*2}

# Use case: Process records from a large dataset
def record_stream(data):
    for record in data:
        processed = process_record(record)
        if processed:
            yield processed

# Simulated large dataset
dataset = [
    {'id': 1, 'value': 10},
    {'id': None, 'value': 20}, # Invalid
    {'id': 2, 'value': 30}
]

for processed_record in record_stream(dataset):
    print(processed_record)

{'id': 1, 'processed_value': 20}
invalid record skipped: {'id': None, 'value': 20}
{'id': 2, 'processed_value': 60}


# 4. Rate-Limiting Function Calls
In APIs or automation scripts, you might want to limit the number of function calls to prevent overloading the system.

__Use a decorator to throttle function calls.__

__Best Practice.__ The decorator centralizes rate-limiting logic, making it easy to apply across multiple functions.

In [4]:
import time

# Rate-limiting decorator
def rate_limiter(max_calls_per_second):
    interval = 1/ max_calls_per_second
    last_call_time = [0]

    def decorator(func):
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_call_time[0]
            if elapsed < interval:
                time.sleep(interval-elapsed)
            last_call_time[0] = time.time()
            return func(*args, **kwargs)
        return wrapper
    return decorator

@rate_limiter(2) # Allow two calls per second
def send_request():
    print(f'Request sent at {time.time()}')

# Use case: Simulate sending requests
for _ in range(5):
    send_request()


Request sent at 1736360072.753134
Request sent at 1736360073.258177
Request sent at 1736360073.7619781
Request sent at 1736360074.267002
Request sent at 1736360074.7720268


# 5. Streaming data Analysis
for real-time analytics, you might need to analyze streams of data (e.g., from sensors or Kafka topics).

* Use generator to process the data stream as it arrives.

In [6]:
import random
import time 

# Simulate a live data stream
def data_stream():
    while True:
        yield {'sensor_id': 1, 'value':random.randint(0, 100), 'timestamp':time.time()}
        time.sleep(1) # Simulate delay

# Use case: Process the stream.
for data in data_stream():
    if data['value'] > 80:
        print(f'High value alert: {data}')

High value alert: {'sensor_id': 1, 'value': 86, 'timestamp': 1736360262.23959}


KeyboardInterrupt: 