In [None]:
from pathlib import Path
import sqlparse

def construct_query(sql_file, placeholders):
    # Read the query file
    project_root = get_reports_root_folder()
    query_file_path = Path(f"{project_root}/sql/{sql_file}").absolute()
    with open(query_file_path.absolute(), "r") as f:
        query = f.read().replace("\n", " ")

    # Replace all placeholders
    for key, value in placeholders.items():
        query = query.replace(f"${key}", str(value).replace("'", "''"))

    # Format the query
    query = sqlparse.format(query, reindent=True, keyword_case='upper')

    return query

In [None]:
import pandas as pd
import sqlite3
import os

DB_PATH_VARIABLE="WEATHER_DB"

def query_data(query):
    # Connect to the database, execute the query and read the results into a dataframe
    database_path = os.environ[DB_PATH_VARIABLE]
    connection = sqlite3.connect(database_path)
    df = pd.read_sql_query(query, connection, parse_dates=["timestamp"])

    # Check there is some data
    if not df.shape[0]:
        message = f"No data found"
        raise ValueError(message)

    # Convert column titles to lowercase
    df.columns = df.columns.str.lower()

    # Convert the timestamp string to a date and time - need to remove the trailing Z or it won't parse
    df["timestamp"] = (
        df["timestamp"]
        .str.rstrip("Z")
        .pipe(pd.to_datetime, utc=True)
    )

    # Sort by the timestamp
    df.sort_values("timestamp", inplace=True)

    return df

In [None]:
def load_sensor_readings(sensor_name, days=None):
    # Construct the path to the query file for the specified sensor
    query = construct_query(f"{sensor_name.casefold()}.sql", {
        "DAYS": days if days else "NULL"
    })

    # Run the query to retrieve the data
    df = query_data(query)
    return df

In [None]:
from functools import reduce
import pandas as pd
from typing import List

def merge_sensor_readings(
    data_frames: List[pd.DataFrame],
    tolerance: str = "3s",
    direction: str = "nearest",
    set_index: bool = True
) -> pd.DataFrame:
    """
    Merge multiple sensor reading DataFrames on a common timestamp column using
    pandas.merge_asof, handling slightly misaligned timestamps

    Parameters
    ----------
    dfs : list of pd.DataFrame
        One DataFrame per sensor. All must contain the `on` column
        The *first* DataFrame in the list is treated as the "base" timeline
    tolerance : str or pd.Timedelta, default "3s"
        Maximum allowed time difference when matching rows (e.g. "3s", "500ms").
    direction : {"backward", "forward", "nearest"}, default "nearest"
        Direction for merge_asof matching.
    set_index : bool, default True
        If True, set the merged DataFrame index to the `on` column.

    Returns
    -------
    pd.DataFrame
        Merged DataFrame containing all columns from all input DataFrames.
        Unmatched rows (outside tolerance) will have NaNs for missing sensors.
    """

    # Work on copies so we don't mutate the originals
    data_frames = [df.copy() for df in data_frames]

    # Normalise tolerance to Timedelta
    tol = pd.Timedelta(tolerance)

    # Reduce the list using merge_asof
    def _merge_asof(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
        return pd.merge_asof(
            left,
            right,
            on="timestamp",
            direction=direction,
            tolerance=tol
        )

    merged = reduce(_merge_asof, data_frames)

    if set_index:
        merged = merged.set_index("timestamp")

    return merged