# Get interpolated data

Rather than getting time series data from a view, it is possible to retrieve time series data directly using requests. Below is a general function that will return a pandas DataFrame of regularly interpolated time series data for given input tags. You can paste this function definition in your MLHub scripts to have an easier and more flexible way to retrieve time series data.

In [14]:
import pandas as pd
import os
import requests
import urllib3


def get_data(tagnames: list, start: pd.Timestamp, end: pd.Timestamp, freq: pd.Timedelta, tz: "UTC", verify=True) -> pd.DataFrame:
    """Retrieve interpolated time series data from selected tags
    
    Parameters
    ----------
    tagnames : list of str
        Tags for which to retrieve the data
    start : pandas.Timestamp
        Start of the interval for which to retrieve the data. Localized to `tz` if not timezone-aware.
    end : pandas.Timestamp
        End of the interval for which to retrieve the data. Localized to `tz` if not timezone-aware.
    freq : pd.Timdelta
        Frequency of the interpolated data. Interpolated data is exclusively generated from the indexed data in TrendMiner. 
        Setting a low value does not result raw time series data.
    tz : str, default 'UTC'
        Timezone for the timestamps in the returned DataFrame
    verify : bool, default True
        Whether requests should perform SSL certificate validation. Can be turned off in case you are getting errors about certificate issues.
        
    Returns
    -------
    df : pandas.DataFrame
        DataFrame with a DatetimeIndex in the selected timezone, with a data column (float or str) for each tag name
    """
    # Add timestamp timezones if not provided
    if not start.tz:
        start = start.tz_localize(tz)
    if not end.tz:
        end = end.tz_localize(tz)

    # Rounding to get regular timestamps
    start = start.round(freq)
    end = end.round(freq)

    # This authentication header we need to add to every request
    headers = {'Authorization': f'Bearer {os.environ["KERNEL_USER_TOKEN"]}'}

    # Turn off for verification being turned off
    if verify is False:
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

    tag_series_list = []
    for tagname in tagnames:
        # Get tag details required to retrieve tag data
        tag_details_response = requests.get(
            url=f'{os.environ["KERNEL_SERVER_URL"]}/hps/api/tags/details',
            headers=headers,
            params={'tagName': tagname},
            verify=verify,
        )
        tag_details = tag_details_response.json()

        # Retrieve tag data; data is split in pages if the request is too large
        tag_series_chunks = []
        current_start = start
        while True:
            tag_data_response = requests.post(
                url=f'{os.environ["KERNEL_SERVER_URL"]}/compute/interpolatedData/paged',
                headers=headers,
                json={
                    "tag": {
                        "id": tag_details["id"],
                        "shift": 0,  # tag timeshift in seconds; typically 0
                        "interpolationType": tag_details["interpolationType"],
                    },
                    "timePeriod": {
                        "startDate": current_start.isoformat(timespec="milliseconds"),
                        "endDate": end.isoformat(timespec="milliseconds")
                    },
                    "step": freq.total_seconds(),  # Data frequency in seconds
                },
                verify=verify,
            )
            tag_data = tag_data_response.json()

            # Convert response to Series
            tag_series_chunk = (
                pd.DataFrame(tag_data["values"])  # dict to DataFrame
                .assign(ts=lambda df: pd.to_datetime(df["ts"]))  # conver str to timestamps
                .set_index("ts")  # set timestamp as index
                ["value"] # DataFrame -> Series
            )
            tag_series_chunks.append(tag_series_chunk)

            # Check if there is an additional page; if not, stop the loop
            try:
                current_start = pd.Timestamp(tag_data["nextStartDate"])
            except KeyError:
                break

        # Concat the different chunks of a single tag
        tag_series = pd.concat(tag_series_chunks)
        tag_series.name = tagname    

        # Retrieve the correct string for text tags
        if tag_details["type"] in ["DIGITAL", "STRING"]:
            states = {state["Code"]: state["Name"] for state in tag_details["States"]}
            tag_series = tag_series.map(states)

        # Store data
        tag_series_list.append(tag_series)
            
    # Concat the different tags into a single DataFrame
    df = pd.concat(tag_series_list, axis=1)

    # Set timezone
    df.index = df.index.tz_convert(tz)
    
    return df

Below is a example call to our `get_data` function. We retrieve the time series data for 3 tags for January 2023, at a 5 minute resolution, in our local timezone.

In [17]:
df = get_data(
    tagnames=["TM4-BP2-PRODUCT.1", "TM4-BP2-LEVEL.1", "TM4-BP2-CONC.1"],
    start=pd.Timestamp("2023-01-01"),
    end=pd.Timestamp("2023-02-01"),
    freq=pd.Timedelta(minutes=5),
    tz="Europe/Brussels",
)

df.head()