In [None]:
#| default_exp distributed.timegpt

In [None]:
#| hide 
%load_ext autoreload
%autoreload 2

In [None]:
#| export
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd
import fugue
import fugue.api as fa
from fugue.execution.factory import make_execution_engine
from triad import Schema

In [None]:
#| export
class _DistributedTimeGPT:

    def forecast(
            self,
            token: str,
            df: fugue.AnyDataFrame,
            h: int,
            freq: Optional[str] = None,    
            id_col: str = 'unique_id',
            time_col: str = 'ds',
            target_col: str = 'y',
            X_df: Optional[pd.DataFrame] = None,
            level: Optional[List[Union[int, float]]] = None,
            finetune_steps: int = 0,
            clean_ex_first: bool = True,
            validate_token: bool = False,
            add_history: bool = False,
            date_features: Union[bool, List[str]] = False,
            date_features_to_one_hot: Union[bool, List[str]] = True,
            num_partitions: Optional[int] = None,
        ) -> fugue.AnyDataFrame:
        kwargs = dict(
            h=h,
            freq=freq,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
            level=level,
            finetune_steps=finetune_steps,
            clean_ex_first=clean_ex_first,
            validate_token=validate_token,
            add_history=add_history,
            date_features=date_features,
            date_features_to_one_hot=date_features_to_one_hot,
        )
        if id_col not in fa.get_column_names(df):
            raise Exception(
                'Distributed environment is meant to forecasts '
                'multiple time series at once. You did not provide '
                'an identifier for each time series.'
            )
        schema = self._get_forecast_schema(id_col=id_col, time_col=time_col, level=level)
        engine = make_execution_engine(infer_by=[df])
        if num_partitions is None:
            num_partitions = engine.get_current_parallelism()
        partition = dict(by=id_col, num=num_partitions, algo='coarse')
        if X_df is not None:
            raise Exception(
                'Exogenous variables not supported for '
                'distributed environments yet. '
                'Please rise an issue at https://github.com/Nixtla/nixtla/issues/new '
                'requesting the feature.'
            )
        fcst_df = fa.transform(
            df,
            self._forecast,
            params=dict(token=token, kwargs=kwargs,),
            schema=schema,
            engine=engine,
            partition=partition,
            as_fugue=True,
        )
        return fa.get_native_as_df(fcst_df)

    def _instantiate_timegpt(self, token: str):
        from nixtlats.timegpt import _TimeGPT
        timegpt = _TimeGPT(token=token)
        return timegpt

    def _forecast(
            self, 
            df: pd.DataFrame, 
            token: str,
            kwargs,
        ) -> pd.DataFrame:
        timegpt = self._instantiate_timegpt(token)
        return timegpt._forecast(df=df, **kwargs)
    
    @staticmethod
    def _get_forecast_schema(id_col, time_col, level):
        schema = f'{id_col}:string,{time_col}:datetime,TimeGPT:double'
        if level is not None:
            level = sorted(level)
            schema = f'{schema},{",".join([f"TimeGPT-lo-{lv}:double" for lv in reversed(level)])}'
            schema = f'{schema},{",".join([f"TimeGPT-hi-{lv}:double" for lv in level])}'
        return Schema(schema)

In [None]:
#| hide
import os

from fastcore.test import test_eq
from dotenv import load_dotenv
from utilsforecast.data import generate_series

load_dotenv()

In [None]:
#| hide
def test_forecast(
        df: fugue.AnyDataFrame, 
        horizon: int = 12,
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        **fcst_kwargs,
    ):
    fcst_df = distributed_timegpt.forecast(
        token=os.environ['TIMEGPT_TOKEN'], 
        df=df, 
        h=horizon,
        id_col=id_col,
        time_col=time_col,
        **fcst_kwargs,
    )
    fcst_df = fa.as_pandas(fcst_df)
    test_eq(n_series * 12, len(fcst_df))
    cols = fcst_df.columns.to_list()
    exp_cols = [id_col, time_col, 'TimeGPT']
    if 'level' in fcst_kwargs:
        level = sorted(fcst_kwargs['level'])
        exp_cols.extend([f'TimeGPT-lo-{lv}' for lv in reversed(level)])
        exp_cols.extend([f'TimeGPT-hi-{lv}' for lv in level])
    test_eq(cols, exp_cols)

In [None]:
#| hide
def test_same_results_num_partitions(
        df: fugue.AnyDataFrame, 
        horizon: int = 12, 
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        **fcst_kwargs,
    ):
    fcst_df = distributed_timegpt.forecast(
        token=os.environ['TIMEGPT_TOKEN'], 
        df=df, 
        h=horizon, 
        num_partitions=1,
        id_col=id_col,
        time_col=time_col,
        **fcst_kwargs
    )
    fcst_df = fa.as_pandas(fcst_df)
    fcst_df_2 = distributed_timegpt.forecast(
        token=os.environ['TIMEGPT_TOKEN'], 
        df=df, 
        h=horizon, 
        num_partitions=2,
        id_col=id_col,
        time_col=time_col,
        **fcst_kwargs
    )
    fcst_df_2 = fa.as_pandas(fcst_df_2)
    pd.testing.assert_frame_equal(
        fcst_df.sort_values([id_col, time_col]).reset_index(drop=True),
        fcst_df_2.sort_values([id_col, time_col]).reset_index(drop=True),
    )

In [None]:
#| hide
def test_dataframe(df: fugue.AnyDataFrame):
    test_forecast(df, num_partitions=1)
    test_forecast(df, level=[90, 80], num_partitions=1)
    test_same_results_num_partitions(df)

In [None]:
#| hide
def test_dataframe_diff_cols(df: fugue.AnyDataFrame, id_col: str = 'id_col', time_col: str = 'time_col'):
    test_forecast(df, id_col=id_col, time_col=time_col, num_partitions=1)
    test_forecast(df, id_col=id_col, time_col=time_col, level=[90, 80], num_partitions=1)
    test_same_results_num_partitions(df, id_col=id_col, time_col=time_col)

In [None]:
#| hide
distributed_timegpt = _DistributedTimeGPT()

In [None]:
#| hide
n_series = 4
horizon = 7

series = generate_series(n_series)
series['unique_id'] = series['unique_id'].astype(str)

series_diff_cols = series.copy()
series_diff_cols = series_diff_cols.rename(columns={'unique_id': 'id_col', 'ds': 'time_col'})

## Spark

In [None]:
#| hide
from pyspark.sql import SparkSession

In [None]:
#| hide
spark = SparkSession.builder.getOrCreate()
spark_df = spark.createDataFrame(series)
spark_diff_cols_df = spark.createDataFrame(series_diff_cols)
test_dataframe(spark_df)
test_dataframe_diff_cols(spark_diff_cols_df)

## Dask

In [None]:
#| hide
import dask.dataframe as dd

In [None]:
#| hide
dask_df = dd.from_pandas(series, npartitions=2)
dask_diff_cols_df = dd.from_pandas(series_diff_cols, npartitions=2)
test_dataframe(dask_df)
test_dataframe_diff_cols(dask_diff_cols_df)

## Ray

In [None]:
#| hide
import ray
from ray.cluster_utils import Cluster

In [None]:
#| hide
ray_cluster = Cluster(
    initialize_head=True,
    head_node_args={"num_cpus": 2}
)
ray.init(address=ray_cluster.address, ignore_reinit_error=True)
# add mock node to simulate a cluster
mock_node = ray_cluster.add_node(num_cpus=2)
ray_df = ray.data.from_pandas(series)
ray_diff_cols_df = ray.data.from_pandas(series_diff_cols)
test_dataframe(ray_df)
test_dataframe_diff_cols(ray_diff_cols_df)
ray.shutdown()