This tutorial can be found and ran in the GITHUB libray TIDE: https://github.com/BuildingEnergySimulationTools/tide. 
Complementary documentation can be found here: https://python-tide.readthedocs.io/en/main/?badge=main.

# Plumber Tutorial — Coheating Test Use Case

This notebook serves as a tutorial for the plumber Python library — a tool for building robust and traceable data processing pipelines.
It walks through the processing and analysis of data collected from a coheating experiment performed on a 1m3 test cell, using the <code>Plumber</code> class.

## Description of the test bench and of coheating test
A coheating test is a method used to assess the thermal performance of a building envelope by maintaining a constant indoor temperature using electric heaters, while measuring:
- Heat input
- Internal and external environmental conditions

The used test bench consists of five identical opaque walls and a sixth wall with glazing, designed to include and characterize a complex shading system. In this tutorial, the tests are conducted without the shading system in place, in order to establish a baseline performance of the test cell.

| Figure : Test bench with and without tested shading system |
| :---: |
| <img src="images/instrumented_miniBET.jpg"  style="height:300px;">  <img src="images/tilted_oriented_slats.jpg"  style="height:300px;"> <img src="images/tilted_oriented_slats2.jpg"  style="height:300px;">   | 

This baseline allows for:

- Accurate determination of the U-values of each envelope component
- Identification of infiltration rates and thermal bridging
- Assessment of the thermal inertia of the structure
- Evaluation of the glazing performance without interference from shading

## Used dataset
This dataset used in this tutorial comes from measurement performed in a lab, with constant outside conditions and no sun radiation, with:

- Constant heating using a simple heat source (light bulb)
- Indoor air and surface temperatures
- Heat fluxes on interior wall surfaces


## Tutorial Overview
This tutorial demonstrates how to use the plumber library to structure and process our data.

We explore several features of plumber to build a modular and transparent data workflow.

1. **Load and inspect the raw dataset**
Use <code>show()</code> to quickly examine structure, time coverage, and metadata from the raw CSV file.

2. **Build a parsing and resampling pipeline**
Create a <code>Plumber</code> pipeline to rename columns, parse datetime formats, and resample the time series to regular intervals (e.g. 1 min, 10 min) for better readability and processing.

3. **Visualize cleaned signals**
Use the plotting capabilities of <code>show()</code> to explore temperature and heat flux trends over time, and verify pipeline outputs.

4. **Compute and export thermal metrics**
Extend the pipeline to calculate key indicators (e.g. U-values, integrated fluxes) and prepare the dataset for modeling or reporting.

Each transformation step remains traceable, reproducible, and easy to debug thanks to plumber's node-based architecture.

## Import data - graphtec file

First, let's load some generic libraries (pandas, Path)

In [None]:
from pathlib import Path
import pandas as pd
import re
from datetime import datetime, timedelta
import numpy as np
import os

In [None]:
TUTORIAL_DIR = Path(os.getcwd()).as_posix()

Now we load and clean the datafile containing the raw measurements. Channels were renamed following the structure: Name__Unit__Bloc__Sub_bloc.

This structured format allows plumber to automatically parse the dataset and assign metadata to each column, enabling smart filtering, grouping, and visualization across different processing nodes.

- <code>name</code>: the physical quantity (e.g., T_air, heat_flux)

- <code>unit</code>: the measurement unit (e.g., degC, W/m2)

- <code>bloc</code>: the logical system or location (e.g., indoor, outdoor, wall_A)

- <code>sub_bloc</code> (optional): a more specific designation (e.g., surface, air, glass)

This structured column naming is central to plumber's ability to:

- Dynamically group signals by type or location

- Enable semantic-aware plotting and filtering using <code>select()</code>


In [None]:
df = pd.read_csv(
    Path(TUTORIAL_DIR) / "resources/coheating_test_minibet.csv",
    index_col=0,
    parse_dates=[0],
    date_format="%Y-%m-%d %H:%M:%S%z" 
)
df.head()

# Examples of pipeline operations

## Plumber class

In [None]:
from tide.plumbing import Plumber

First, let's instantiate the Plumber class, giving it the measurement dataframe as an argument, with the correct column names.

In [None]:
plumber = Plumber(
   df,  
)

You can check whether the data is correctly structured using Plumber's <code>show</code> method.

In [None]:
plumber.show()

You can select specific data using Plumber's <code>select</code> method, with labels: 
- full name of sensors e.g., "hf_d_1"
- units e.g., "degC"
- type of sensor: "temperature"
- positions : "door

In [None]:
plumber.select("door")

You can also check whether the data is missing (depending on the category chosen (position, unit, or type of sensor), or a time-step.

In [None]:
plumber.plot_gaps_heatmap(time_step=("5min"))

Here it appears some data is ponctually missing for a few inputs. Let's look into that:

In [None]:
plumber.get_gaps_description()

Overwall,  little data is missing for T_surf_ci__degC__temperature__ceiling but 60 of hf_fl__mV__heatflux__floor appears to be missing  (example of gaps below). Interpolation for these gaps should be used if these inputs are necessary.

## Pipeline transformations

The pipeline is defined as a **dictionary (`pipe_dict`)** where:

- **Keys** are **custom names** for processing steps (e.g., `"resampling"`, `"conversion"`, `"cleaning"`).
- **Values** specify the **processing functions**:
  - A **list (`[]`)** applies a transformation to the **entire dataset**.
  - A **dictionary (`{}`)** applies transformations **only to specific columns**.
- **Processing functions (strings)** → These come from **Tide's processing module**.
    - Example: `"GaussianFilter1D"`, `"ApplyExpression"`, `"Resample"`, etc.
    - These function names **must match exactly** what’s in the Tide documentation.


---

**Example Pipeline**

Below is an example of a structured pipeline:

```python
pipe_dict = {
    # Step 1: Resample data to 1-minute intervals
    "resampling_1min": [["Resample", ["1min", "mean"]]],
    
    # Step 2: Unit conversions
    "conversion": {
        "V__pyranometer": [
            ["ApplyExpression", ["(X-0.88)/(4.4-0.88)*1600", "W/m²"]],
        ],
        "V__anemometer": [
            ["ApplyExpression", ["(X*35/10)", "m/s"]],
        ],
    },
    
    # Step 3: Calibration adjustments using sensitivity coefficients
    "calibration": {
        'hf_gr_cd': [['ApplyExpression', ['X*9.174', 'W/m²']]],
        'hf_gr_cg_w': [['ApplyExpression', ['X*7.537', 'W/m²']]]
    },

    # Step 4: Apply Gaussian smoothing filter
    "smoothing": [["GaussianFilter1D",  dict(sigma=10, mode="nearest", truncate=15)]],

    # Step 5: Handling missing data
    "handling_missing": {
        "degC": [["Interpolate", ["linear"]]],  # Linear interpolation for temperature
        "pyranometer": [["Ffill", [None]]],  # Forward fill for radiation data
    },

    # Step 6: Renaming columns
    "rename_columns": [["RenameColumns", [{"old_col": "new_col"}]]],

    # Step 7: Combine columns
  
    "average_of_surface": [
        ["CombineColumns", dict(function=np.mean, function_kwargs={"axis":1}, tide_format_columns="ground", drop_columns=True, result_column_name="T_gr__degC__temperature__ground")],
        ["CombineColumns", dict(function=np.mean, function_kwargs={"axis":1}, tide_format_columns="ground", drop_columns=True, result_column_name="T_gr__degC__temperature__ground")],
    ]
}

```

---
This pipeline applies a sequence of transformations to the dataset.
Processing steps are defined in pipe_dict and executed in order.

To find **valid processing functions**, check the documentation:[Tide Processing Documentation](https://github.com/BuildingEnergySimulationTools/tide/blob/main/tide/processing.py)

Functions are listed in `tide.processing`, and you should **use their exact names** in your pipeline.

**Example valid function names:**
| Function Name | Purpose |
|--------------|---------|
| `"Resample"` | Resamples time series data (e.g., `"1min", "mean"`). |
| `"ApplyExpression"` | Applies a mathematical formula to columns. |
| `"GaussianFilter1D"` | Applies Gaussian smoothing to reduce noise. |
| `"Dropna"` | Drops rows/columns with missing values. |
| `"ReplaceThreshold"` | Replaces values beyond a threshold. |
| `"STLFilter"` | Detects anomalies using Seasonal-Trend decomposition. |
| `"Interpolate"` | Fills missing values using interpolation. |
| `"Ffill"` | Forward-fills missing values. |
| `"Bfill"` | Backward-fills missing values. |
| `"FillNa"` | Replaces NaN values with a specified constant. |
| `"CombineColumns"` | Aggregates multiple columns (e.g., `"mean"`). |
| `"RenameColumns"` | Renames columns based on a mapping. |

---


# Pipeline for cleaning our data 

Here we want to: 
- resample data first (measurement each second so far)
- convert pyranometer and anemometer (from volt to W/m² and m/s) (not really necessary here, since measurements were performed in a lab, at night)
- apply a filter on wind (likewise, no wind in lab)
- convert heatlfuxmeter voltage into heatflow density (using sensitivity provided by manufacturer)
- apply calibration factors to temperature measurement
- create average temperatures

## Dictionary definition

In [None]:
conversion_sensitivity_dict =  {  # measurement is converted from V into in W/m² here
    'hf_fw': [['ApplyExpression', ['X/9.174*1e6', 'W/m²']]],
    'hf_d_1': [['ApplyExpression', ['X/7.537*1e6', 'W/m²']]],
    'hf_gl_corner': [['ApplyExpression', ['X/10.447*1e6', 'W/m²']]],
    'hf_ci_1': [['ApplyExpression', ['X/8.684*1e6', 'W/m²']]],
    'hf_gl_cen': [['ApplyExpression', ['X/62.54*1e6', 'W/m²']]],
    'hf_fe': [['ApplyExpression', ['X/127.12*1e6', 'W/m²']]],
    'hf_fl': [['ApplyExpression', ['X/127.12*1e6', 'W/m²']]],
    'hf_d_2': [['ApplyExpression', ['X/129.0*1e6', 'W/m²']]],
    'hf_ci_2': [['ApplyExpression', ['X/129.0*1e6', 'W/m²']]]
}


conversion_calib_dict =  { # affine transformation here
        'T_surf_fe_cen': [['ApplyExpression', ['X * 1.0029 + -0.2311', 'degC']]],
        'T_surf_ci': [['ApplyExpression', ['X * 0.9969 + -0.186', 'degC']]],
        'T_surf_gl_in_cen': [['ApplyExpression', ['X * 0.9977 + -0.1604', 'degC']]],
        'T_air_in_up': [['ApplyExpression', ['X * 0.997 + -0.1746', 'degC']]],
        'T_surf_d': [['ApplyExpression', ['X * 0.9965 + -0.1535', 'degC']]],
        'T_surf_fl_cen': [['ApplyExpression', ['X * 0.9967 + -0.1638', 'degC']]],
        'T_air_in_down': [['ApplyExpression', ['X * 0.997 + -0.1618', 'degC']]],
        'T_surf_fl_cg_e': [['ApplyExpression', ['X * 0.998 + -0.2555', 'degC']]],
        'T_surf_ext_gl': [['ApplyExpression', ['X * 0.9973 + -0.2009', 'degC']]],
        'T_surf_fl_cd': [['ApplyExpression', ['X * 0.9973 + -0.2147', 'degC']]],
        'T_air_ext_1': [['ApplyExpression', ['X * 0.9956 + 0.0933', 'degC']]],
        'T_surf_fw': [['ApplyExpression', ['X * 0.9967 + 0.0053', 'degC']]],
        'T_air_ext_2': [['ApplyExpression', ['X * 0.9955 + 0.0443', 'degC']]],
        'T_surf_gl_up_ext': [['ApplyExpression', ['X * 0.9966 + 0.0103', 'degC']]],
        'T_surf_fl_cg_w': [['ApplyExpression', ['X * 0.9963 + -0.0166', 'degC']]],
        'T_surf_gl_in_up': [['ApplyExpression', ['X * 0.9966 + 0.0641', 'degC']]]
    }


pipe_dict = {
    
    "resampling_1min": [["Resample", ["1min", "mean"]]], 
    
    "conversion": {
        "mV": [
            ["ApplyExpression", ["X/1000", "V"]],
    ]},
    
    "conversion_heatfluxes": conversion_sensitivity_dict,

    "handling_missing": {
        "degC": [["Interpolate", ["linear"]]],
        "W/m²": [["Interpolate", ["linear"]]],
    },
    
    "moving_av": {
        "heatflux": [
                ["GaussianFilter1D", dict(sigma=1, mode="nearest", truncate=1)],
            ],

    },
    
    "moving_av_specific": {
        "hf_d_1": [
            ["GaussianFilter1D", dict(sigma=2, mode="nearest", truncate=2)],
        ]
    },
    
    "goodsigns": {
        "hf_fe": [
            ["ApplyExpression", ["-X", "W/m²"]],
        ],
        "hf_fw": [
            ["ApplyExpression", ["-X", "W/m²"]],
        ],
        "hf_gl_cen": [
            ["ApplyExpression", ["-X", "W/m²"]],
        ],
    },
    
    "calibration_temp_sensors": conversion_calib_dict,

   
    "average_temp": {
        "air ext__degC":[
          ["CombineColumns", dict(
            function="mean", 
            drop_columns=True, 
            result_column_name="T_air_ext_av__degC__temperature__air ext")
        ]], 
        "air in__degC":[
          ["CombineColumns", dict(
            function="mean", 
            drop_columns=True, 
            result_column_name="T_air_in_av__degC__temperature__air in")
        ]], 
    },


    "average_HF": {
        "heatflux__ceiling":[
          ["CombineColumns", dict(
            function="mean", 
            drop_columns=True, 
            result_column_name="hf_ci_av__W/m²__heatflux__ceiling")
        ]], 
    },

   
    "power_input": [
        ["ExpressionCombine", [
            {
            "X":  'P__V__lamp__in',
            },
            "(X > 1) * 148",
            'P__W__lamp__in',
        ]],
    ],  
}

In [None]:
plumber = Plumber(
    df,
    pipe_dict
)

We can now use method <code>show</code> again, and see final transformation when pipeline is executed

In [None]:
plumber.show()

## Plot transformed data and compare different steps of the pipeline

Let's plot data. Likewise, you can decide to show data :
- after specifics steps only
- for a specifi timerange
- by filter labels (only W/m² or degC labels, or "ext" sensors if it a label)
- and their gaps if any)

In example below, we can check transformation at different steps for the interior surface door temperature measurement: 
- first ("steps"), only resampling_1min step is applied.
- second (steps_2"), all steps to "calibration_temp_sensors" step included

Note that for better visibility, you can plot graphs in browser rather than in the notebook using: 

`import plotly.io as pio`, `pio.renderers.default = "browser" `

In [None]:
plumber.plot(
    steps=slice("resampling_1min"),
    steps_2=slice(None, "calibration_temp_sensors"),
    select="degC__door",
    # start="2025-03-07 00:00",
    # stop="2025-03-07 04:00",
    data_2_mode="lines"
)

To  plot data after all operations, simply use <code>plot()</code>, no step specified. Note that you can select one or several tabs using the '|' separator. For instance, for all degC **and** heatflows, you can use : <code>"label1|label2"</code>

Let's compare with and without interpolation for hf_fl__mV__heatflux__floor

In [None]:
plumber.plot(
    steps=slice(None,"conversion_heatfluxes"), #up to conversion_heatfluxes step
    steps_2=slice(None, "handling_missing"), #up to interpolation step
    select="hf_fl__mV__heatflux__floor", 
    plot_gaps=True,
    data_2_mode="lines"
)

You can vizualise the entire pipeline using the <code>get_pipeline</code> method 
and apply specific steps of the pipeline, using the "steps" argument: 

In [None]:
plumber.get_pipeline(steps="goodsigns")

## Get corrected data

Finally, you can access your corrected data using the <code>get_corrected_data</code> method.
Likewise, "steps" and "select" arguments are available:

In [None]:
plumber.get_corrected_data(steps=slice(None, "calibration_temp_sensors"), verbose=True).head()

In [None]:
clean_df = plumber.get_corrected_data()

In [None]:
clean_df.head()

# Use of pipeline for U-values calculations

## Definition of a new pipeline dictionary for calculation steps

Now, let's define a new dictionary. 

The following  Ucalc_dict defines a set of modular processing steps used to compute thermal transmittance values (U-values) based on measured heat fluxes and temperatures. It is structured around the use of ExpressionCombine and other processing nodes within the plumber framework.Each entry in the dictionary corresponds to a specific calculation block, grouped by purpose:

- Estimatation of a global U-value for the entire envelope using Heating power input (P), Indoor temperature (Tin), Outdoor temperature (Text) (This approach assumes a steady-state condition and is useful for overall balance checks)
- U-values calculation, by comparing Measured heat flux on internal surfaces, Measured surface temperatures, Outdoor air temperature
- The calculated overall U_value, combining all individual surface U-value into a global equivalent U-value, weighted by relative surface area and glazing transmittance assumptions.
- Infiltrations, as the difference between Power-balance-derived global U-value and Sum of surface-derived U-values
- A stability check of data, to remove time periods wwhere the U_values vary too rapidly, ensuring only quasi-steady-state data is retained.

In [None]:
Ucalc_dict = {

      "U_value_powerbalance": [
          
          ["ExpressionCombine", [ 
            {
                "X":  'P__W__lamp__in',
                "Tin":  'T_air_in_av__degC__temperature__air in',
                "Text":  'T_air_ext_av__degC__temperature__air ext',
            },
            "X/(Tin-Text)/6",
            'Utot_power__W/m²K__Ucalc_tot__in',
        ]],],


        "U_value_surfaces_to_ext": [
          
          ["ExpressionCombine", [  #eastwall
            {
                "HF":  'hf_fw__W/m²__heatflux__westwall',
                "Tin":  'T_surf_fw__degC__temperature__westwall',
                "Text":  'T_air_ext_av__degC__temperature__air ext',
            },
            "HF/(Tin-Text)",
            'Uw__W/m²K__Ucalc__westwall',
        ]],
            
           ["ExpressionCombine", [  #westwall
            {
                "HF":  'hf_fe__W/m²__heatflux__eastwall',
                "Tin":  'T_surf_fe_cen__degC__temperature__eastwall',
                "Text":  'T_air_ext_av__degC__temperature__air ext',
            },
            "HF/(Tin-Text)",
            'Ue__W/m²K__Ucalc__eastwall',
        ]],  
            
          ["ExpressionCombine", [  ##door
             {
                "HF":  'hf_d_2__W/m²__heatflux__door',
                "Tin":  'T_surf_d__degC__temperature__door',
                "Text":  'T_air_ext_av__degC__temperature__air ext',
            },
            "HF/(Tin-Text)",
            'Ud__W/m²K__Ucalc__door',
        ]],       
        
              
          ["ExpressionCombine", [  ##floor
             {
                "HF":  'hf_fl__W/m²__heatflux__floor',
                "Tin":  'T_surf_fl_cen__degC__temperature__floor',
                "Text":  'T_air_ext_av__degC__temperature__air ext',
            },
            "HF/(Tin-Text)",
            'Uf__W/m²K__Ucalc__floor',
        ]],
            
          ["ExpressionCombine", [  ## ceiling
             {
                "HF":  'hf_ci_av__W/m²__heatflux__ceiling',
                "Tin":  'T_surf_ci__degC__temperature__ceiling',
                "Text":  'T_air_ext_av__degC__temperature__air ext',
            },
            "HF/(Tin-Text)",
            'Uc__W/m²K__Ucalc__ceiling',
        ]],  

           ["ExpressionCombine", [  ## window_center _ surface to ext
             {
                "HF":  'hf_gl_cen__W/m²__heatflux__glazing',
                "Tin":  'T_surf_gl_in_cen__degC__temperature__glazingIn',
                "Text":  'T_air_ext_av__degC__temperature__air ext',
            },
            "HF/(Tin-Text)",
            'Ug_cen__W/m²K__Ucalc__glazing',
        ]],   

            ["ExpressionCombine", [  ## window_corner _ surface to ext
             {
                "HF":  'hf_gl_corner__W/m²__heatflux__glazing',
                "Tin":  'T_surf_gl_in_up__degC__temperature__glazingIn',
                "Text":  'T_air_ext_av__degC__temperature__air ext',
            },
            "HF/(Tin-Text)",
            'Ug_cor__W/m²K__Ucalc__glazing',
        ]],   
            
    ],  

      "U_value_additioned": [
          ["ExpressionCombine", [ 
            {
                "Uw":  "Uw__W/m²K__Ucalc__westwall",
                "Ue":  "Ue__W/m²K__Ucalc__eastwall",
                "Ud":  "Ud__W/m²K__Ucalc__door",
                "Uf":  "Uf__W/m²K__Ucalc__floor",
                "Ux":  "Uc__W/m²K__Ucalc__ceiling",
                "Ug":  "Ug_cen__W/m²K__Ucalc__glazing",
            },
        "(Uw + Ue + Ud + Uf + Ux + (Ug * 0.885 + (1 - 0.885) * (Uw + Ue + Ud + Uf + Ux) / 5)) / 6",
            'Utot_from_hfs__W/m²K__Ucalc_tot__in',
        ]],],  

      "infiltrations": [
          ["ExpressionCombine", [ 
            {
                "Utot_power":  "Utot_power__W/m²K__Ucalc_tot__in",
                "Utot_hfs":  "Utot_from_hfs__W/m²K__Ucalc_tot__in",
            },
        "Utot_power - Utot_hfs",
            'Uinf__W/m²K__Ucalc_tot__in',
        ]],],   

    "stability_check": {
        "Ucalc_tot": [
            ["DropTimeGradient", dict(
                dropna=True,
                upper_rate=0.15*0.7/3600, #15% of value (around 7), per hour
            )],
        ],
    }
}

In [None]:
plumber_U = Plumber(
    clean_df,
    Ucalc_dict
)

In [None]:
plumber_U.show()

## Vizualisation of U-values

After computing new columns (e.g. U-values) using custom expressions in the Ucalc_dict, we pass the resulting calc_data directly to a new Plumber instance for visualization.

This approach is necessary because these columns are generated outside the main plumber pipeline — typically using scikit-plumber logic or custom post-processing steps. Unlike standard pipeline nodes, scikit-based computations do not automatically integrate back into the original pipeline graph.

- Using calc_data as a direct input allows us to:
- Visualize derived signals with show() just like raw or cleaned data
- Apply filtering, grouping, and selection tools available in plumber


In [None]:
calc_data = plumber_U.get_corrected_data()

In [None]:
plumber_final = Plumber(
    calc_data
)

In [None]:
plumber_final.plot(
    select="Ucalc_tot"
)

In [None]:
plumber_final.plot(
    select="Ucalc"
)

In [None]:
plumber_final.plot(
    select="W/m²__heatflux"
)