In [None]:
#| default_exp etl

# ETL

> Helper to make ETL pipelines easier.

## Our plan

We want to try to build up some functionality to create etl pipelines for pandas operations.
We want to define classic extract, transform and load functions which we can string together in a pipeline. 
We would also like to add some functionality to inspect the intermediate dataframes and some key properties.

## Our background

We work as Data Science Consultants mainly with SME's in manufacturing. We often have to build up etl pipelines from various sources to then use for analytics or ML workloads.
The pipelines can get pretty complex with custom business logic, that why we want to have a way to quickly show all the steps we take.

In [None]:
#| hide
from dialoghelper import *
from fastcore.tools import *

#| hide

Tools available from `dialoghelper`:

- &`curr_dialog`: Get the current dialog info.
- &`msg_idx`: Get absolute index of message in dialog.
- &`add_html`: Send HTML to the browser to be swapped into the DOM using hx-swap-oob.
- &`find_msg_id`: Get the current message id.
- &`find_msgs`: Find messages in current specific dialog that contain the given information.
  - (solveit can often get this id directly from its context, and will not need to use this if the required information is already available to it.)
- &`read_msg`: Get the message indexed in the current dialog.
  - To get the exact message use `n=0` and `relative=True` together with `msgid`.
  - To get a relative message use `n` (relative position index).
  - To get the nth message use `n` with `relative=False`, e.g `n=0` first message, `n=-1` last message.
- &`del_msg`: Delete a message from the dialog.
- &`add_msg`: Add/update a message to the queue to show after code execution completes.
- &`update_msg`: Update an existing message.
- &`url2note`: Read URL as markdown, and add a note below current message with the result
- &`msg_insert_line`: Insert text at a specific location in a message.
- &`msg_str_replace`: Find and replace text in a message.
- &`msg_strs_replace`: Find and replace multiple strings in a message.
- &`msg_replace_lines`: Replace a range of lines in a message with new content.
  - Always first use `read_msg( msgid=msgid, n=0, relative=True, nums=True)` to view the content with line numbers.

#| hide

Tools available from `fastcore.tools`:

- &`rg`: Run the `rg` command with the args in `argstr` (no need to backslash escape)
- &`sed`: Run the `sed` command with the args in `argstr` (e.g for reading a section of a file)
- &`view`: View directory or file contents with optional line range and numbers
- &`create`: Creates a new file with the given content at the specified path
- &`insert`: Insert new_str at specified line number
- &`str_replace`: Replace first occurrence of old_str with new_str in file
- &`strs_replace`: Replace for each str pair in old_strs,new_strs
- &`replace_lines`: Replace lines in file using start and end line-numbers

In [None]:
#| export
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from time import sleep
from functools import wraps

In [None]:
# Sample manufacturing data
data = {
    'order_id': [101, 102, 103, 104, 105],
    'product': ['Widget A', 'Widget B', 'Widget A', 'Widget C', 'Widget B'],
    'quantity': [50, 30, 75, 20, 45],
    'defects': [2, 1, 3, 0, 2],
    'production_time': [120, 95, 150, 80, 110]  # in minutes
}

df = pd.DataFrame(data)
df

Unnamed: 0,order_id,product,quantity,defects,production_time
0,101,Widget A,50,2,120
1,102,Widget B,30,1,95
2,103,Widget A,75,3,150
3,104,Widget C,20,0,80
4,105,Widget B,45,2,110


In [None]:
#| export
def get_demo_data():
    data = {
    'order_id': [101, 102, 103, 104, 105],
    'product': ['Widget A', 'Widget B', 'Widget A', 'Widget C', 'Widget B'],
    'quantity': [50, 30, 75, 20, 45],
    'defects': [2, 1, 3, 0, 2],
    'production_time': [120, 95, 150, 80, 110]  # in minutes
    }
    return pd.DataFrame(data)

In [None]:
df[df["product"] != "Widget C"].groupby(["product"])[["quantity", "defects"	,"production_time"]].sum()

Unnamed: 0_level_0,quantity,defects,production_time
product,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Widget A,125,5,270
Widget B,75,3,205


# Solution space exploration

#| hide

Say we have a pipeline like this. What are some approaches to implement our plan?

##### ðŸ¤–ReplyðŸ¤–<!-- SOLVEIT_SEPARATOR_7f3a9b2c -->

ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ 

Looking at your pipeline, there are a few solid approaches to consider:

1. **Function composition with decorators** â€” wrap each step (filter, groupby, etc.) in a decorator that logs metadata (shape, columns, timing). Simple to implement, minimal overhead.

2. **Class-based pipeline** â€” create a `Pipeline` class that chains `Step` objects. Each step stores its transformation function plus inspection data. More structure, easier to serialize/visualize.

3. **Method chaining with a wrapper** â€” wrap DataFrames in a custom class that records operations as you chain them (like a "verbose" DataFrame).

4. **Existing libraries** â€” tools like `hamilton`, `prefect`, or `dagster` already solve this, though they may be heavier than you need for SME consulting.

Given your context (manufacturing SMEs, custom business logic, need for transparency), I'd lean toward **option 1 or 2**. They're lightweight, easy to explain to clients, and don't add external dependencies.

Which approach sounds most appealing to you? Or would you like me to sketch out a quick comparison of the tradeoffs?

#| hide

Show a simple example for each please

##### ðŸ¤–ReplyðŸ¤–<!-- SOLVEIT_SEPARATOR_7f3a9b2c -->

I'll show you a simple example skeleton for each approach. Since you asked explicitly, I'll write out the code examples:

**1. Pipeline class approach:**
```python
class ETLPipeline:
    def __init__(self, df):
        self.steps = []
        self.df = df
        
    def add_step(self, func, name):
        self.df = func(self.df)
        self.steps.append({'name': name, 'shape': self.df.shape})
        return self
    
    def get_result(self):
        return self.df
```

**2. Decorator approach:**
```python
def track_step(step_name):
    def decorator(func):
        def wrapper(df):
            result = func(df)
            print(f"{step_name}: {result.shape}")
            return result
        return wrapper
    return decorator

@track_step("Filter")
def filter_products(df):
    return df[df["product"] != "Widget C"]
```

**3. Function list with runner:**
```python
def run_pipeline(df, steps):
    results = []
    for name, func in steps:
        df = func(df)
        results.append({'step': name, 'shape': df.shape})
    return df, results
```

Which of these feels most natural for your workflow? Would you like to try implementing one with your actual pipeline example?

#| hide

How would we deal with methods vs functions in each approach?

##### ðŸ¤–ReplyðŸ¤–<!-- SOLVEIT_SEPARATOR_7f3a9b2c -->

Good question! Methods vs functions is an important consideration for each approach.

**Pipeline class**: Works naturally with both - you can pass either functions or methods as long as they take a dataframe and return a dataframe. Methods would need to be bound or wrapped.

**Decorator**: Best suited for functions. For methods, you'd need to handle `self` carefully in the wrapper, which adds complexity.

**Function list runner**: Similar to the pipeline class - works with both as long as they're callable and follow the df-in, df-out pattern.

The key question is: are you planning to use pandas methods (like `.groupby()`, `.merge()`) or write your own functions that wrap those operations? This might influence which approach fits best.

What's your typical style - do you prefer chaining pandas methods or writing custom transform functions?

#| hide

Can you write our small example transformation for each approach

##### ðŸ¤–ReplyðŸ¤–<!-- SOLVEIT_SEPARATOR_7f3a9b2c -->

Since you've explicitly asked me to write the code, I'll create the full example for each approach using your filter + groupby transformation:

**1. Pipeline class approach:**
```python
class ETLPipeline:
    def __init__(self, df):
        self.steps = []
        self.df = df
        self.history = [{'step': 'initial', 'shape': df.shape, 'columns': list(df.columns)}]
        
    def add_step(self, func, name):
        self.df = func(self.df)
        self.history.append({'step': name, 'shape': self.df.shape, 'columns': list(self.df.columns)})
        return self
    
    def get_result(self):
        return self.df
    
    def show_history(self):
        return pd.DataFrame(self.history)

# Usage
pipeline = ETLPipeline(df)
result = (pipeline
    .add_step(lambda d: d[d["product"] != "Widget C"], "filter_widget_c")
    .add_step(lambda d: d.groupby(["product"])[["quantity", "defects", "production_time"]].sum(), "aggregate")
    .get_result())

print(pipeline.show_history())
print("\nResult:")
print(result)
```

**2. Decorator approach:**
```python
steps_log = []

def track_step(step_name):
    def decorator(func):
        def wrapper(df):
            result = func(df)
            steps_log.append({'step': step_name, 'shape': result.shape, 'columns': list(result.columns)})
            return result
        return wrapper
    return decorator

@track_step("filter_widget_c")
def filter_products(df):
    return df[df["product"] != "Widget C"]

@track_step("aggregate")
def aggregate_by_product(df):
    return df.groupby(["product"])[["quantity", "defects", "production_time"]].sum()

# Usage
result = aggregate_by_product(filter_products(df))
print(pd.DataFrame(steps_log))
print("\nResult:")
print(result)
```

**3. Function list with runner:**
```python
def run_pipeline(df, steps):
    history = [{'step': 'initial', 'shape': df.shape, 'columns': list(df.columns)}]
    current_df = df
    
    for name, func in steps:
        current_df = func(current_df)
        history.append({'step': name, 'shape': current_df.shape, 'columns': list(current_df.columns)})
    
    return current_df, pd.DataFrame(history)

# Define steps
steps = [
    ("filter_widget_c", lambda d: d[d["product"] != "Widget C"]),
    ("aggregate", lambda d: d.groupby(["product"])[["quantity", "defects", "production_time"]].sum())
]

# Usage
result, history = run_pipeline(df, steps)
print(history)
print("\nResult:")
print(result)
```

Try running these! Which approach feels most intuitive for your workflow?

# Our preferred approach
For a the exploration and discussion with AI on the approaches see the notebooks in the `nbs` folder. 

So we would write a decorator that adds a logging keyword to each function, then we can determine for each function in the pipeline what to log/how much to log. We want to keep it as simple as possible. The pipeline should just take a df and a list of functions as steps of the pipeline.

In [None]:
def pipeline(df, steps, vrbs_default=True):
    for func, func_kwargs in steps:
        vrbs =  func_kwargs.get("vrbs", vrbs_default)
        func_kwargs.update({"vrbs": vrbs})
        df = func(df, **func_kwargs)
    return df

In [None]:
def track(func):
    def wrapper(in_df, vrbs=False, *args, **kwargs):
        if vrbs:
            print(f"\n{'*'*10} Step: {func.__name__} {'*'*10}")
            print(f"\nInput DataFrame shape: {in_df.shape}")
            print(in_df)
        out_df = func(in_df, *args, **kwargs)
        if vrbs:
            print(f"\nOutput DataFrame shape: {out_df.shape}")
            print(out_df)
        return out_df
    return wrapper

In [None]:
@track
def filter_products(df):
    return df[df["product"] != "Widget C"]

@track
def aggregate_by_product(df):
    return df.groupby(["product"])[["quantity", "defects", "production_time"]].sum()

We include a way to handle pipeline overwrites of the `vrbs` argument for specific transformation steps. In this example, no information for `aggregate_by_product` is printed:

In [None]:
steps = [
    (filter_products, {}),
    (aggregate_by_product, {"vrbs": False}),
]

In [None]:
pipeline(df, steps)


********** Step: filter_products **********

Input DataFrame shape: (5, 5)
   order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
3       104  Widget C        20        0               80
4       105  Widget B        45        2              110

Output DataFrame shape: (4, 5)
   order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
4       105  Widget B        45        2              110


Unnamed: 0_level_0,quantity,defects,production_time
product,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Widget A,125,5,270
Widget B,75,3,205


We want to add basic profiling of time and memory. 

In [None]:
#| export
from datetime import datetime
from time import sleep

In [None]:
start_time = datetime.now()
sleep(3.14)
end_time = datetime.now()
total_time = end_time - start_time
total_time

datetime.timedelta(seconds=3, microseconds=140231)

In [None]:
start_time

datetime.datetime(2026, 1, 21, 13, 48, 8, 13919)

In [None]:
def track(func):
    def wrapper(in_df, vrbs=False, *args, **kwargs):
        start_time = datetime.now()
        if vrbs:
            
            print(f"\n{'*'*10} Step: {func.__name__} {'*'*10}")
            print(f"\nInput DataFrame shape: {in_df.shape}")
            print(f"Start time: {start_time}")
            print(in_df)
        out_df = func(in_df, *args, **kwargs)
        

        end_time = datetime.now()
        total_time = end_time - start_time
        if vrbs:
            print(f"\nOutput DataFrame shape: {out_df.shape}")
            print(f"End time: {end_time}")
            print(f"Total time: {total_time}")
            print(out_df)
        return out_df
    return wrapper

In [None]:
@track
def filter_products(df):
    return df[df["product"] != "Widget C"]

@track
def aggregate_by_product(df):
    return df.groupby(["product"])[["quantity", "defects", "production_time"]].sum()

In [None]:
steps = [
    (filter_products, {}),
    (aggregate_by_product, {}),
]

In [None]:
_df = pipeline(df, steps)


********** Step: filter_products **********

Input DataFrame shape: (5, 5)
Start time: 2026-01-21 13:48:11.183573
   order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
3       104  Widget C        20        0               80
4       105  Widget B        45        2              110

Output DataFrame shape: (4, 5)
End time: 2026-01-21 13:48:11.185250
Total time: 0:00:00.001677
   order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150
4       105  Widget B        45        2              110

********** Step: aggregate_by_product **********

Input DataFrame shape: (4, 5)
Start time: 2026-01-21 13:48:11.186039
   order_id   product  quantity

# first reflection and next steps
- right now we are still printing the whole dataframe. We want to change that to just show the first 3, 5 random and the last 3 rows, maybe with some visual clue to show that the rows are truncated.
- we would also like show the dataframes to the right of the aggregate infos (like shape, etc.)
- include information like df.info()
- include information like df.describe()
- include information like diff columns (num cols changed, cols just in before, cols just in after)
- include information like diff rows (maybe just the number)
- source code of functions
- description of step, based on docstring

- nicer representation maybe in html or display or something like that

- new wrapper to include assertions and/or sanity checks (propertiy based, for example same number of mat_ids)

- function generate pipeline report, in excel (or something else)

Okay our first idea after picking the problem back up is to store the information in a dictionary and then create different functions to display the information.

In [None]:
def track(func):
    @wraps(func)
    def wrapper(in_df, vrbs=False, *args, **kwargs):
        meta_dict = {
            'step_name':func.__name__, # name of the pipeline step
            'in_time':datetime.now(), # time when the pipeline step starts
            'in_df_shape':in_df.shape, # shape of the input dataframe
            'in_df_head':in_df.head(3), # head of the input dataframe
            'in_df_sample':in_df.sample(min(in_df.shape[0], 5)), # sample of the input dataframe
            'in_df_tail':in_df.tail(3), # tail of the input dataframe
        }

        out_df = func(in_df, *args, **kwargs)

        out_time = datetime.now()
        total_time = out_time - meta_dict['in_time']
            
        meta_dict.update({
            'out_time':datetime.now(), # time when the pipeline step stops
            'out_df_shape':out_df.shape, # shape of the output dataframe
            'out_df_head':out_df.head(3), # head of the output dataframe
            'out_df_sample':out_df.sample(min(out_df.shape[0], 5)), # sample of the output dataframe
            'out_df_tail':out_df.tail(3), # tail of the output dataframe
            'total_time':total_time, # difference between in_time and out_time
        })

        if vrbs:
            print('Here we use a fuction to display the information')
            print(meta_dict)
        
        return out_df
    return wrapper

In [None]:
@track
def filter_products(df):
    return df[df["product"] != "Widget C"]

@track
def aggregate_by_product(df):
    return df.groupby(["product"])[["quantity", "defects", "production_time"]].sum()

In [None]:
steps = [
    (filter_products, {}),
    (aggregate_by_product, {}),
]

In [None]:
_df = pipeline(df, steps)

Here we use a fuction to display the information
{'step_name': 'filter_products', 'in_time': datetime.datetime(2026, 1, 21, 13, 48, 11, 209353), 'in_df_shape': (5, 5), 'in_df_head':    order_id   product  quantity  defects  production_time
0       101  Widget A        50        2              120
1       102  Widget B        30        1               95
2       103  Widget A        75        3              150, 'in_df_sample':    order_id   product  quantity  defects  production_time
4       105  Widget B        45        2              110
0       101  Widget A        50        2              120
2       103  Widget A        75        3              150
1       102  Widget B        30        1               95
3       104  Widget C        20        0               80, 'in_df_tail':    order_id   product  quantity  defects  production_time
2       103  Widget A        75        3              150
3       104  Widget C        20        0               80
4       105  Widget B        45 

# display functions
Here we created a sample meta_dict to play around and create different functions to display the information

In [None]:
meta_dict = {
    'step_name': 'filter_products',
    'step_description': 'Exclude products which are Widget C',
    'in_time': datetime.now(),
    'in_df_shape': (5, 5),
    'in_df_head': df.head(3),
    'in_df_sample': df.sample(min(df.shape[0], 5)),
    'in_df_tail': df.tail(3),
    'out_time': datetime.now(),
    'out_df_shape': (4, 5),
    'out_df_head': df[df["product"] != "Widget C"].head(3),
    'out_df_sample': df[df["product"] != "Widget C"].sample(min(4, 5)),
    'out_df_tail': df[df["product"] != "Widget C"].tail(3),
    'total_time': timedelta(microseconds=1287)
}

meta_dict

{'step_name': 'filter_products',
 'step_description': 'Exclude products which are Widget C',
 'in_time': datetime.datetime(2026, 1, 21, 13, 48, 11, 223583),
 'in_df_shape': (5, 5),
 'in_df_head':    order_id   product  quantity  defects  production_time
 0       101  Widget A        50        2              120
 1       102  Widget B        30        1               95
 2       103  Widget A        75        3              150,
 'in_df_sample':    order_id   product  quantity  defects  production_time
 0       101  Widget A        50        2              120
 1       102  Widget B        30        1               95
 4       105  Widget B        45        2              110
 2       103  Widget A        75        3              150
 3       104  Widget C        20        0               80,
 'in_df_tail':    order_id   product  quantity  defects  production_time
 2       103  Widget A        75        3              150
 3       104  Widget C        20        0               80
 4    

## print name and time

In [None]:
print(15*'*' + ' ' + meta_dict['step_name'] + ' ' + 15*'*')

*************** filter_products ***************


In [None]:
print(f'Total Time: {meta_dict["total_time"]}')
print(f'Start: {meta_dict["in_time"]}')
print(f'End: {meta_dict["out_time"]}')

Total Time: 0:00:00.001287
Start: 2026-01-21 13:48:11.223583
End: 2026-01-21 13:48:11.223818


In [None]:
#| export
def format_timedelta(td):
    total_seconds = td.total_seconds()
    if total_seconds < 0.001: return f"{total_seconds * 1_000_000:.0f} Âµs"
    elif total_seconds < 1: return f"{total_seconds * 1000:.2f} ms"
    elif total_seconds < 60: return f"{total_seconds:.2f} s"
    elif total_seconds < 3600: return f"{total_seconds / 60:.2f} min"
    else: return f"{total_seconds / 3600:.2f} h"

In [None]:
print(f'Total Time: {format_timedelta(meta_dict["total_time"])}')

Total Time: 1.29 ms


In [None]:
#| export
def print_step_name(meta_dict): print(15*'*' + ' ' + meta_dict['step_name'] + ' ' + 15*'*')

In [None]:
print_step_name(meta_dict)

*************** filter_products ***************


In [None]:
#| export
def print_time(meta_dict):
    print(f'Total Time: {format_timedelta(meta_dict["total_time"])}')
    print('')
    print(f'Start: {meta_dict["in_time"]}')
    print(f'  End: {meta_dict["out_time"]}')

In [None]:
print_time(meta_dict)

Total Time: 1.29 ms

Start: 2026-01-21 13:48:11.223583
  End: 2026-01-21 13:48:11.223818


## print sample df

In [None]:
pd.concat([meta_dict['in_df_head'], meta_dict['in_df_sample'], meta_dict['in_df_tail']])

Unnamed: 0,order_id,product,quantity,defects,production_time
0,101,Widget A,50,2,120
1,102,Widget B,30,1,95
2,103,Widget A,75,3,150
0,101,Widget A,50,2,120
1,102,Widget B,30,1,95
4,105,Widget B,45,2,110
2,103,Widget A,75,3,150
3,104,Widget C,20,0,80
2,103,Widget A,75,3,150
3,104,Widget C,20,0,80


In [None]:
pd.DataFrame(np.nan, index=range(1), columns=meta_dict['in_df_head'].columns)

Unnamed: 0,order_id,product,quantity,defects,production_time
0,,,,,


In [None]:
pd.DataFrame(np.nan, index=range(1), columns=meta_dict['in_df_head'].columns).fillna('...')

Unnamed: 0,order_id,product,quantity,defects,production_time
0,...,...,...,...,...


In [None]:
pd.DataFrame(np.nan, index=range(3), columns=meta_dict['in_df_head'].columns).fillna('.')

Unnamed: 0,order_id,product,quantity,defects,production_time
0,.,.,.,.,.
1,.,.,.,.,.
2,.,.,.,.,.


In [None]:
pd.DataFrame(np.nan, index=range(1), columns=meta_dict['in_df_head'].columns).fillna(':')

Unnamed: 0,order_id,product,quantity,defects,production_time
0,:,:,:,:,:


In [None]:
#| export
def fill_between_df_parts(df):
    return pd.DataFrame(np.nan, index=[''], columns=df.columns).fillna(':')

In [None]:
fill_between_df_parts(meta_dict['in_df_head'])

Unnamed: 0,order_id,product,quantity,defects,production_time
,:,:,:,:,:


In [None]:
pd.concat(
    [meta_dict['in_df_head'], 
    fill_between_df_parts(meta_dict['in_df_head']),
    meta_dict['in_df_sample'], 
    fill_between_df_parts(meta_dict['in_df_head']),
    meta_dict['in_df_tail']]
    )

Unnamed: 0,order_id,product,quantity,defects,production_time
0.0,101,Widget A,50,2,120
1.0,102,Widget B,30,1,95
2.0,103,Widget A,75,3,150
,:,:,:,:,:
0.0,101,Widget A,50,2,120
1.0,102,Widget B,30,1,95
4.0,105,Widget B,45,2,110
2.0,103,Widget A,75,3,150
3.0,104,Widget C,20,0,80
,:,:,:,:,:


We want to build to functions:
1. a display function for within our wrapper which only uses the meta_dict
2. a display function that just takes in one df and gets the head, sample, tail from there

In the future we might want to turn this into one function but for now we don't want to store the whole df in meta_dict

In [None]:
#| export
def print_sample_from_meta_dict(meta_dict, mode='in'):
    print(pd.concat([
        meta_dict[f'{mode}_df_head'], 
        fill_between_df_parts(meta_dict[f'{mode}_df_head']),
        meta_dict[f'{mode}_df_sample'], 
        fill_between_df_parts(meta_dict[f'{mode}_df_head']),
        meta_dict[f'{mode}_df_tail']
        ]))

In [None]:
print_sample_from_meta_dict(meta_dict)

  order_id   product quantity defects production_time
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
2      103  Widget A       75       3             150
         :         :        :       :               :
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
4      105  Widget B       45       2             110
2      103  Widget A       75       3             150
3      104  Widget C       20       0              80
         :         :        :       :               :
2      103  Widget A       75       3             150
3      104  Widget C       20       0              80
4      105  Widget B       45       2             110


In [None]:
#| export
def display_sample_from_df(df):
    return pd.concat([
            df.head(3), 
            fill_between_df_parts(df),
            df.sample(min(5, df.shape[0])), 
            fill_between_df_parts(df),
            df.tail(3)]
    )

In [None]:
display_sample_from_df(df)

Unnamed: 0,order_id,product,quantity,defects,production_time
0.0,101,Widget A,50,2,120
1.0,102,Widget B,30,1,95
2.0,103,Widget A,75,3,150
,:,:,:,:,:
2.0,103,Widget A,75,3,150
4.0,105,Widget B,45,2,110
1.0,102,Widget B,30,1,95
0.0,101,Widget A,50,2,120
3.0,104,Widget C,20,0,80
,:,:,:,:,:


We will have to also consider what happen if we have to many columns.

Our idea is to truncate them as well and just add a dummy columns with '...' as sign that its truncated

## print shape and shape change

In [None]:
print('(rows, columns) =', meta_dict['in_df_shape'])
print('        |          ')
print('        V          ')
print('(rows, columns) =', meta_dict['out_df_shape'])

(rows, columns) = (5, 5)
        |          
        V          
(rows, columns) = (4, 5)


In [None]:
print('(rows, columns) =', meta_dict['in_df_shape'])
print(' â†“ '*8)
print('(rows, columns) =', meta_dict['out_df_shape'])

(rows, columns) = (5, 5)
 â†“  â†“  â†“  â†“  â†“  â†“  â†“  â†“ 
(rows, columns) = (4, 5)


In [None]:
in_str = f"{meta_dict['in_df_shape'][0]} rows, {meta_dict['in_df_shape'][1]} columns"
print(in_str)
print('â†“ â†“ â†“'.center(len(in_str), ' '))
print(f"{meta_dict['out_df_shape'][0]} rows, {meta_dict['out_df_shape'][1]} columns")

5 rows, 5 columns
      â†“ â†“ â†“      
4 rows, 5 columns


is there a convenietn way to space out a number of strings over a specific length?

##### ðŸ¤–ReplyðŸ¤–<!-- SOLVEIT_SEPARATOR_7f3a9b2c -->

You could use string formatting with padding! Here are a couple of approaches:

**1. Using f-strings with alignment:**
```python
s1, s2, s3 = "5 rows", "4 columns", "time: 1.2ms"
total_width = 50
print(f"{s1:<{total_width//3}}{s2:<{total_width//3}}{s3}")
```

**2. Using `str.ljust()` / `str.center()` / `str.rjust()`:**
```python
width = 20
print(s1.ljust(width) + s2.center(width) + s3.rjust(width))
```

**3. For evenly spacing multiple strings, you could calculate spacing:**
```python
def space_strings(strings, total_width):
    gap = (total_width - sum(len(s) for s in strings)) // (len(strings) - 1)
    return (' ' * gap).join(strings)
```

Which approach fits what you're trying to do with your pipeline display?

In [None]:
s1, s2, s3 = "5 rows", "4 columns", "time: 1.2ms"
total_width = 50
print(f"{s1:<{total_width//3}}{s2:<{total_width//3}}{s3}")

5 rows          4 columns       time: 1.2ms


In [None]:
width = 20
print(s1.ljust(width) + s2.center(width) + s3.rjust(width))

5 rows                   4 columns               time: 1.2ms


In [None]:
#| export
def space_strings(strings, total_width):
    gap = (total_width - sum(len(s) for s in strings)) // (len(strings) - 1)
    return (' ' * gap).join(strings)

In [None]:
space_strings(['|', '|', '|'], 20)

'|        |        |'

In [None]:
len(space_strings(['|', '|', '|'], 20))

19

In [None]:
in_str = f"{meta_dict['in_df_shape'][0]} rows, {meta_dict['in_df_shape'][1]} columns"
print(in_str)
print(space_strings(['â†“','â†“','â†“'], len(in_str)))
print(f"{meta_dict['out_df_shape'][0]} rows, {meta_dict['out_df_shape'][1]} columns")

5 rows, 5 columns
â†“       â†“       â†“
4 rows, 5 columns


In [None]:
in_rows, in_cols = meta_dict['in_df_shape']
out_rows, out_cols = meta_dict['out_df_shape']

in_str = f"{in_rows} rows, {in_cols} columns"
print(in_str)
print(space_strings(['â†“','â†“','â†“'], len(in_str)))

print(f'{str(out_rows-in_rows).center(len(in_str)//2)}', f'{str(out_cols-in_cols).center(len(in_str)//2)}')

print(space_strings(['â†“','â†“','â†“'], len(in_str)))
print(f"{out_rows} rows, {out_cols} columns")

5 rows, 5 columns
â†“       â†“       â†“
   -1       0    
â†“       â†“       â†“
4 rows, 5 columns


In [None]:
in_rows, in_cols = meta_dict['in_df_shape']
out_rows, out_cols = meta_dict['out_df_shape']
diff_rows, diff_cols = out_rows - in_rows, out_cols - in_cols

diff_rows_str = f"{diff_rows:+d}" if diff_rows != 0 else "0"
diff_cols_str = f"{diff_cols:+d}" if diff_cols != 0 else "0"

# Right-align each column
row_width = max(len(str(in_rows)), len(str(out_rows)), len(diff_rows_str))
col_width = max(len(str(in_cols)), len(str(out_cols)), len(diff_cols_str))

print(f"Input:  {in_rows:>{row_width}} rows, {in_cols:>{col_width}} cols")
print(f"        {' '*row_width}   â†“   {' '*col_width}   â†“")
print(f"Diff:   {diff_rows_str:>{row_width}} rows, {diff_cols_str:>{col_width}} cols")
print(f"        {' '*row_width}   â†“   {' '*col_width}   â†“")
print(f"Output: {out_rows:>{row_width}} rows, {out_cols:>{col_width}} cols")

Input:   5 rows, 5 cols
             â†“       â†“
Diff:   -1 rows, 0 cols
             â†“       â†“
Output:  4 rows, 5 cols


In [None]:
print(f"""
Input:  {in_rows:>{row_width}} rows, {in_cols:>{col_width}} cols
        {' '*row_width}   â†“   {' '*col_width}   â†“
Diff:   {diff_rows_str:>{row_width}} rows, {diff_cols_str:>{col_width}} cols
        {' '*row_width}   â†“   {' '*col_width}   â†“
Output: {out_rows:>{row_width}} rows, {out_cols:>{col_width}} cols
""")


Input:   5 rows, 5 cols
             â†“       â†“
Diff:   -1 rows, 0 cols
             â†“       â†“
Output:  4 rows, 5 cols



In [None]:
#| export
def print_shape_change(meta_dict):
    in_rows, in_cols = meta_dict['in_df_shape']
    out_rows, out_cols = meta_dict['out_df_shape']
    diff_rows, diff_cols = out_rows - in_rows, out_cols - in_cols
    
    diff_rows_str = f"{diff_rows:+d}" if diff_rows != 0 else "0"
    diff_cols_str = f"{diff_cols:+d}" if diff_cols != 0 else "0"
    
    row_width = max(len(str(in_rows)), len(str(out_rows)), len(diff_rows_str))
    col_width = max(len(str(in_cols)), len(str(out_cols)), len(diff_cols_str))
    
    print(
        f"""
        Input:  {in_rows:>{row_width}} rows, {in_cols:>{col_width}} cols
                {' '*row_width}   â†“   {' '*col_width}   â†“
        Diff:   {diff_rows_str:>{row_width}} rows, {diff_cols_str:>{col_width}} cols
                {' '*row_width}   â†“   {' '*col_width}   â†“
        Output: {out_rows:>{row_width}} rows, {out_cols:>{col_width}} cols
        """
        )

In [None]:
print_shape_change(meta_dict)


        Input:   5 rows, 5 cols
                     â†“       â†“
        Diff:   -1 rows, 0 cols
                     â†“       â†“
        Output:  4 rows, 5 cols
        


Now we want to display the docstring of the etl function this way we can provide text that describes the step. How do we get the docstring from a function?

##### ðŸ¤–ReplyðŸ¤–<!-- SOLVEIT_SEPARATOR_7f3a9b2c -->

ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ 

You can access a function's docstring using the `__doc__` attribute:

```python
def my_func():
    """This is the docstring."""
    pass

print(my_func.__doc__)
```

Or use `inspect.getdoc()` which cleans up the indentation nicely:

```python
import inspect
print(inspect.getdoc(my_func))
```

Try adding a docstring to one of your ETL functions (like `filter_products`) and accessing it to see how it works!

In [None]:
def my_func():
    """This is the docstring."""
    pass

print(my_func.__doc__)

This is the docstring.


In [None]:
import inspect
print(inspect.getdoc(my_func))

This is the docstring.


In [None]:
@track
def my_func():
    """This is the docstring."""
    pass

print(my_func.__doc__)

This is the docstring.


It seems that when using the decorator we get the docstring of the wrapper function. Is there a way that we get the docstring of the actual function?

##### ðŸ¤–ReplyðŸ¤–<!-- SOLVEIT_SEPARATOR_7f3a9b2c -->

ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ ðŸ§ 

Yes! Use `functools.wraps` in your decorator â€” it copies over the original function's metadata (including `__doc__`, `__name__`, etc.) to the wrapper:

```python
from functools import wraps

def track(func):
    @wraps(func)
    def wrapper(in_df, vrbs=False, *args, **kwargs):
        # ... your tracking code ...
    return wrapper
```

The `@wraps(func)` decorator on the inner `wrapper` function does the magic. Try updating your `track` decorator with this and see if `my_func.__doc__` now returns the original docstring!

In [None]:
def track(func):
    @wraps(func)
    def wrapper(in_df, vrbs=False, *args, **kwargs):
        meta_dict = {
            'step_name':func.__name__, # name of the pipeline step
            'step_description':func.__doc__, # description based on docstring
            'in_time':datetime.now(), # time when the pipeline step starts
            'in_df_shape':in_df.shape, # shape of the input dataframe
            'in_df_head':in_df.head(3), # head of the input dataframe
            'in_df_sample':in_df.sample(min(in_df.shape[0], 5)), # sample of the input dataframe
            'in_df_tail':in_df.tail(3), # tail of the input dataframe
        }

        out_df = func(in_df, *args, **kwargs)

        out_time = datetime.now()
        total_time = out_time - meta_dict['in_time']
            
        meta_dict.update({
            'out_time':datetime.now(), # time when the pipeline step stops
            'out_df_shape':out_df.shape, # shape of the output dataframe
            'out_df_head':out_df.head(3), # head of the output dataframe
            'out_df_sample':out_df.sample(min(out_df.shape[0], 5)), # sample of the output dataframe
            'out_df_tail':out_df.tail(3), # tail of the output dataframe
            'total_time':total_time, # difference between in_time and out_time
        })

        if vrbs:
            print('Here we use a fuction to display the information')
            print(meta_dict)
        
        return out_df
    return wrapper

## print docstring

In [None]:
#| export
def print_step_description(meta_dict): print(f"'''{meta_dict['step_description']}'''")

In [None]:
print_step_description(meta_dict)

'''Exclude products which are Widget C'''


## print all function

In [None]:
#| export
def print_step_info(meta_dict):
    print_step_name(meta_dict)
    print_step_description(meta_dict)
    print('\n')
    print_time(meta_dict)
    print("\nInput DataFrame:")
    print_sample_from_meta_dict(meta_dict, mode='in')
    print_shape_change(meta_dict)
    print("Output DataFrame:")
    print_sample_from_meta_dict(meta_dict, mode='out')
    print('\n')

In [None]:
print_step_info(meta_dict)

*************** filter_products ***************
'''Exclude products which are Widget C'''


Total Time: 1.29 ms

Start: 2026-01-21 13:48:11.223583
  End: 2026-01-21 13:48:11.223818

Input DataFrame:
  order_id   product quantity defects production_time
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
2      103  Widget A       75       3             150
         :         :        :       :               :
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
4      105  Widget B       45       2             110
2      103  Widget A       75       3             150
3      104  Widget C       20       0              80
         :         :        :       :               :
2      103  Widget A       75       3             150
3      104  Widget C       20       0              80
4      105  Widget B       45       2             110

        Input:   5 rows, 5 cols
            

# Testing the pipeline

The issue is that `func_kwargs.update()` **mutates the original dictionary** in your `steps` list. So if you ran the pipeline earlier with `vrbs_default=True`, the empty `{}` for `aggregate_by_product` got modified to `{'vrbs': True}` â€” and that change persists!

Try adding `.copy()` before updating:

```python
func_kwargs = func_kwargs.copy()
```

That way you're modifying a copy, not the original dictionary in `steps`.

In [None]:
#| export
def pipeline(df, steps, vrbs_default=True):
    for func, func_kwargs in steps:
        func_kwargs = func_kwargs.copy()
        vrbs =  func_kwargs.get("vrbs", vrbs_default)
        func_kwargs.update({"vrbs": vrbs})
        df = func(df, **func_kwargs)
    return df

In [None]:
#| export
def track(func):
    @wraps(func)
    def wrapper(in_df, vrbs=False, *args, **kwargs):
        meta_dict = {
            'step_name':func.__name__, # name of the pipeline step
            'step_description':func.__doc__, # description based on docstring
            'in_time':datetime.now(), # time when the pipeline step starts
            'in_df_shape':in_df.shape, # shape of the input dataframe
            'in_df_head':in_df.head(3), # head of the input dataframe
            'in_df_sample':in_df.sample(min(in_df.shape[0], 5)), # sample of the input dataframe
            'in_df_tail':in_df.tail(3), # tail of the input dataframe
        }

        out_df = func(in_df, *args, **kwargs)

        out_time = datetime.now()
        total_time = out_time - meta_dict['in_time']
            
        meta_dict.update({
            'out_time':datetime.now(), # time when the pipeline step stops
            'out_df_shape':out_df.shape, # shape of the output dataframe
            'out_df_head':out_df.head(3), # head of the output dataframe
            'out_df_sample':out_df.sample(min(out_df.shape[0], 5)), # sample of the output dataframe
            'out_df_tail':out_df.tail(3), # tail of the output dataframe
            'total_time':total_time, # difference between in_time and out_time
        })

        if vrbs: print_step_info(meta_dict)
        
        return out_df
    return wrapper

In [None]:
@track
def filter_products(df):
    '''Exclude products which are Widget C.'''
    return df[df["product"] != "Widget C"]

In [None]:
@track
def aggregate_by_product(df):
    '''aggregate by product and sum columns "quantity", "defects", "production_time"'''
    return df.groupby(["product"])[["quantity", "defects", "production_time"]].sum()

In [None]:
steps = [
    (filter_products, {'vrbs':True}),
    (aggregate_by_product, {}),
]

In [None]:
_df = pipeline(df, steps, vrbs_default=False)

*************** filter_products ***************
'''Exclude products which are Widget C.'''


Total Time: 855 Âµs

Start: 2026-01-21 13:49:23.814167
  End: 2026-01-21 13:49:23.815024

Input DataFrame:
  order_id   product quantity defects production_time
0      101  Widget A       50       2             120
1      102  Widget B       30       1              95
2      103  Widget A       75       3             150
         :         :        :       :               :
1      102  Widget B       30       1              95
3      104  Widget C       20       0              80
2      103  Widget A       75       3             150
0      101  Widget A       50       2             120
4      105  Widget B       45       2             110
         :         :        :       :               :
2      103  Widget A       75       3             150
3      104  Widget C       20       0              80
4      105  Widget B       45       2             110

        Input:   5 rows, 5 cols
           

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()