# Taipy Core

Taipy Core is one of the components of Taipy to facilitate pipeline orchestration. There are a lot of reasons for using Taipy Core:

- Taipy Core efficiently manages the execution of your functions/pipelines.

- Taipy Core manages data sources and monitors KPIs.

- Taipy Core provides easy management of multiple pipelines and end-user scenarios, which comes in handy in the context of Machine Learning or Mathematical optimization.

To apprehend the Scenario Management aspect of Taipy, you need to understand four essential concepts.


## Four fundamental concepts in Taipy Core:
- Data Nodes: are the translation of variables in Taipy. Data Nodes don't contain the data but know how to retrieve it. They can refer to any data: any Python object (string, int, list, dict, model, dataframe, etc), a Pickle file, a CSV file, an SQL database, etc. They know how to read and write data. You can even write your own custom Data Node if needed to access a particular data format.

- Tasks: are the translation of functions in Taipy.

- Pipelines: are a list of tasks executed with intelligent scheduling created automatically by Taipy. They usually represent a sequence of Tasks/functions corresponding to different algorithms like a simple baseline Algorithm or a more sophisticated Machine-Learning pipeline.

- Scenarios: End-Users very often require modifying various parameters to reflect different business situations. Taipy Scenarios will provide the framework to "play"/"execute" pipelines under different conditions/variations (i.e., data/parameters modified by the end user)


## What is a configuration?

Configuration is the structure or model of what is our scenario. It represents our Direct Acyclic Graph but also how we want our data to be stored or how our code is run. Taipy is able to create multiple instances of this structure with different data thus, we need a way to define it through this configuration step.


## Taipy Studio for configuration

There are two ways to configure Taipy Core, either by Python code or with Taipy Studio. We strongly recommend using Taipy Studio. 

Taipy Studio is a VS Code extension that provides a graphical editor to describe pipelines. Everything can be done easily and faster through Taipy Studio. 


Let's create our first configuration and then create our entities to submit.



In [1]:
!rmdir /s /q .data

The system cannot find the file specified.


In [2]:
from taipy import Config
import taipy as tp

# Normal function used by Taipy
def double(nb):
    return nb * 2

Here is the code to configure a simple scenario.

Two Data Nodes are being configured 'input' and 'output'. The 'input' Data Node has a _default_data_ put at 21. They will be stored as Pickle files automatically and they are unique to their scenario.

The task links the two scenarios through the Python function _double_.

The pipeline will contain this one task and the scenario will contain this one pipeline.

______________________ Taipy Studio ______________________
- Create new file: 'config.toml'
- Open Taipy Studio view
- Go to the 'Config files' section of Taipy Studio
- Right click on the right configuration
- Choose 'Taipy: Show View'
- Add your first Data Node by clicking the button on the right above corner of the windows
- Create a name for it and change its details in the 'Details' section of Taipy Studio
        - name: input
        - Details: default_data=21, storage_type:pickle
- Do the same for the output
        - name: output
        - Details: storage_type:pickle
- Add a task and choose a function to associate with `<module>.<name>:function`
        -name: double
        -Details: function=`__main__.double:function`
- Link the Data Nodes and the task
- Add a pipeline and link it to the task
- Add a scenario and link to the pipeline

In [3]:
# Configuration of Data Nodes
input_data_node_cfg = Config.configure_data_node("input", default_data=21)
output_data_node_cfg = Config.configure_data_node("output")

# Configuration of tasks
task_cfg = Config.configure_task("double",
                                 double,
                                 input_data_node_cfg,
                                 output_data_node_cfg)

# Configuration of the pipeline and scenario
pipeline_cfg = Config.configure_pipeline("my_pipeline", [task_cfg])
scenario_cfg = Config.configure_scenario("my_scenario", [pipeline_cfg])

In [4]:
# Run of the Core
tp.Core().run()

# Creation of the scenario and execution
scenario = tp.create_scenario(scenario_cfg)
tp.submit(scenario)

print("Value at the end of task", scenario.output.read())

[2022-12-22 16:20:02,740][Taipy][INFO] job JOB_double_699613f8-7ff4-471b-b36c-d59fb6688905 is completed.
Value at the end of task 42


## Basic functions

Let's discuss about the basic functions that comes along with Taipy.

-_write_: this is how data can be changed through Taipy. _write_ will change the _last_edit_date_ of the data node which will influence if a task can be skipped or not.

-_tp.get_scenarios()_: this function returns a list of all the scenarios

-_tp.get()_: this function returns an entity based on the id of the entity

-_tp.delete(ID)_: this function deletes the entity and nested elements based on the id of the entity

## Utility of having scenarios

Taipy lets the user create multiple instances of the same configuration. Datas can differ between instances and can be used to compare different scenarios.

Datas can be naturally different depending on the input data nodes or the randomness of functions. Moreover, the user can change them with the _write_ function.

In [5]:
scenario = tp.create_scenario(scenario_cfg, name="Scenario")
tp.submit(scenario)
print("First submit", scenario.output.read())

[2022-12-22 16:20:02,874][Taipy][INFO] job JOB_double_a5ecfa4d-1963-4776-8f68-0859d22970b9 is completed.
First submit 42


By using _write_, data of a Data Node can be changed. The syntax is `<Scenario>.<Pipeline>.<Data Node>.write(value)`. If there is just one pipeline, we can just write `<Scenario>.<Data Node>.write(value)`.

In [6]:
print("Before write", scenario.input.read())
scenario.input.write(54)
print("After write",scenario.input.read())

Before write 21
After write 54


The submission of the scenario will update the output values.

In [7]:
tp.submit(scenario)
print("Second submit",scenario.output.read())

[2022-12-22 16:20:03,011][Taipy][INFO] job JOB_double_7eee213f-062c-4d67-b0f8-4b54c04e45e7 is completed.
Second submit 108


In [8]:
# Basic functions of Taipy Core 
# how to access all the scenarios
print([s.input.read() for s in tp.get_scenarios()])

# how to get a scenario from its id
scenario = tp.get(scenario.id)

# how to delete a scenario
tp.delete(scenario.id)

[21, 54]


## Different types of Data Nodes:

- *Pickle* (default): Taipy can store and read anykind of data that can be serializable.

- *CSV*: Taipy can read and store any dataframe as a CSV.

- *JSON*: Taipy can read and store any JSONable data as a JSON file.

- *SQL*: Taipy can read and store a table or data base.

- *Generic*: Taipy provides a generic Data Node that can read and store any data based on the reding and writing function created by the user.

The execution graph used to explain the different concepts is quite simple.

Three Data Nodes:
- historical data: initial CSV DataFrame
- month_data: DataFrame after the filtering on the month (pd.DataFrame as a Pickle file)
- nb_of_values: number of values in this month (int as a Pickle file)

Two tasks linking these Data Nodes:
- filter: filters on the months of the dataframe
- count_values: calculates the number of elements in this month

One pipeline in a scenario gathering these two tasks

![](config.png)

______________________ Taipy Studio ______________________
- Create new file: 'config.toml'
- Open Taipy Studio view
- Go to the 'Config files' section of Taipy Studio
- Right click on the right configuration
- Choose 'Taipy: Show View'
- Add your first Data Node by clicking the button on the right above corner of the windows
- Create a name for it and change its details in the 'Details' section of Taipy Studio
        - name: historical_data
        - Details: default_path=xxxx/yyyy.csv, storage_type=csv
- Do the same for the month_data and nb_of_values
        - name: month_data and nb_of_values
        - Details: storage_type:pickle
- Add a task and choose a function to associate with `<module>.<name>:function`
        -name: filter_current
        -Details: function=`__main__.filter_current:function`
- Do the same for count_values
- Link the Data Nodes and the tasks
- Add a pipeline and link it to the tasks
- Add a scenario and link to the pipeline

In [9]:
import datetime as dt
import pandas as pd

In [10]:
def filter_current(df):
    current_month = dt.datetime.now().month
    df['Date'] = pd.to_datetime(df['Date']) 
    df = df[df['Date'].dt.month == current_month]
    return df

def count_values(df):
    return len(df)

In [11]:
# here is a CSV Data Node
historical_data_cfg = Config.configure_csv_data_node(id="historical_data",
                                                     default_path="time_series.csv")
month_values_cfg =  Config.configure_data_node(id="month_data")
nb_of_values_cfg = Config.configure_data_node(id="nb_of_values")

In [12]:
task_filter_current_cfg = Config.configure_task(id="filter_current",
                                                 function=filter_current,
                                                 input=historical_data_cfg,
                                                 output=month_values_cfg)

task_count_values_cfg = Config.configure_task(id="count_values",
                                                 function=count_values,
                                                 input=month_values_cfg,
                                                 output=nb_of_values_cfg)

In [13]:
pipeline_cfg = Config.configure_pipeline(id="my_pipeline",
                                         task_configs=[task_filter_current_cfg,
                                                       task_count_values_cfg])

scenario_cfg = Config.configure_scenario(id="my_scenario",
                                         pipeline_configs=[pipeline_cfg])

#scenario_cfg = Config.configure_scenario_from_tasks(id="my_scenario",
#                                                    task_configs=[task_filter_current_cfg,
#                                                                  task_count_values_cfg])

In [14]:
tp.Core().run()

scenario_1 = tp.create_scenario(scenario_cfg, creation_date=dt.datetime(2022,10,7), name="Scenario 2022/10/7")
scenario_1.submit()

scenario_2 = tp.create_scenario(scenario_cfg, creation_date=dt.datetime(2022,10,7), name="Scenario 2022/10/7")
scenario_2.submit()

[2022-12-22 16:20:03,424][Taipy][INFO] job JOB_filter_current_257edf8d-3ca3-46f5-aec6-c8a413c86c43 is completed.
[2022-12-22 16:20:03,510][Taipy][INFO] job JOB_count_values_90c9b3c7-91e7-49ef-9064-69963d60f52a is completed.
[2022-12-22 16:20:03,755][Taipy][INFO] job JOB_filter_current_4adc91ee-cd64-4ebf-819b-8643da0282fd is completed.
[2022-12-22 16:20:03,901][Taipy][INFO] job JOB_count_values_968c8c34-2ed4-4f89-995c-a4137af82beb is completed.


{'PIPELINE_my_pipeline_b751f808-87de-4de1-866f-c1b3dd7bba19': [<taipy.core.job.job.Job at 0x219403b6080>,
  <taipy.core.job.job.Job at 0x219403b7c40>]}

## Cycles :

So far, we have talked about how having different scenarios helps us to oversee our assumptions about the future. For example, in business, it is critical to weigh different options to come up with an optimal solution. However, this decision-making process isn’t just a one-time task but rather a recurrent operation that happens over a time period. This is why we want to introduce Cycles.

A cycle can be thought of as a place to store different and recurrent scenarios within a time frame. In Taipy Core, each Cycle will have a unique primary scenario representing the reference scenario for a time period.


Typically, in a Machine Learning problem, many scenarios are created daily to predict the next day, for example. Among all those scenarios, there is only one primary scenario. In the step's example, scenarios are attached to a MONTHLY cycle. Using Cycles is useful because some specific Taipy's functions exist to navigate through these Cycles. Taipy can get all the scenarios created in a month by providing the Cycle. You can also get every primary scenario ever made to see their progress over time quickly.

Moreover, nothing is more straightforward than creating a Cycle. The frequency parameter in a scenario configuration will produce the desired type of Cycle. In the code below, the scenario has a monthly cycle. When it is created, it will be attached to the correct period (month).

As you can see, a Cycle can be made very easily once you have the desired frequency. In this snippet of code, since we have specified frequency=Frequency.MONTHLY, the corresponding scenario will be automatically attached to the correct period (month) once it is created.

In [15]:
#[2022-12-15 10:12:40,104][Taipy][INFO] job JOB_filter_by_month_9d986a1d-63fd-4eb1-8f75-2544eb34424b is completed.
#[2022-12-15 10:12:40,155][Taipy][INFO] job JOB_count_values_3b515b56-21b6-40f6-94dd-32e4a3da35dd is completed.
#[2022-12-15 10:12:40,345][Taipy][INFO] job JOB_filter_by_month_d540c831-9d73-4555-8d69-a2a546a77467 is completed.
#[2022-12-15 10:12:40,400][Taipy][INFO] job JOB_count_values_ec296987-6178-43cd-8f28-1a990893733a is completed.
#[2022-12-15 10:12:40,590][Taipy][INFO] job JOB_filter_by_month_42d4f0db-d911-40f5-b820-02e750f77ba5 is completed.
#[2022-12-15 10:12:40,643][Taipy][INFO] job JOB_count_values_53caf39c-0060-46c1-8d48-3dc6b5e74d02 is completed.

Also, as you can see every scenario has been submitted and executed entirely. However, the result for these tasks are all the same. Caching will help to skip certain redundant task.

______________________ Taipy Studio ______________________
- Create new file: 'config.toml'
- Open Taipy Studio view
- Go to the 'Config files' section of Taipy Studio
- Right click on the right configuration
- Choose 'Taipy: Show View'
- Add your first Data Node by clicking the button on the right above corner of the windows
- Create a name for it and change its details in the 'Details' section of Taipy Studio
        - name: historical_data
        - Details: default_path=xxxx/yyyy.csv, storage_type=csv
- Do the same for the month_data and nb_of_values
        - name: output
        - Details: storage_type:pickle
- Add a task and choose a function to associate with `<module>.<name>:function`
        -name: filter_current
        -Details: function=`__main__.filter_current:function`
- Do the same for count_values
- Link the Data Nodes and the tasks
- Add a pipeline and link it to the tasks
- Add a scenario and link to the pipeline
- Add the frequency property and put "WEEKLY:FREQUENCY" (DAYLY, WEEKLY, MONTHLY, YEARLY)

In [16]:
!rmdir /s /q .data

In [17]:
from taipy.core.config import Frequency

In [18]:
def filter_by_month(df, month):
    df['Date'] = pd.to_datetime(df['Date']) 
    df = df[df['Date'].dt.month == month]
    return df

def count_values(df):
    return len(df)

In [19]:
historical_data_cfg = Config.configure_csv_data_node(id="historical_data",
                                                     default_path="time_series.csv")
month_cfg =  Config.configure_data_node(id="month")
month_values_cfg =  Config.configure_data_node(id="month_data")
nb_of_values_cfg = Config.configure_data_node(id="nb_of_values")


task_filter_by_month_cfg = Config.configure_task(id="filter_by_month",
                                                 function=filter_by_month,
                                                 input=[historical_data_cfg, month_cfg],
                                                 output=month_values_cfg)

task_count_values_cfg = Config.configure_task(id="count_values",
                                                 function=count_values,
                                                 input=month_values_cfg,
                                                 output=nb_of_values_cfg)

pipeline_cfg = Config.configure_pipeline(id="my_pipeline",
                                         task_configs=[task_filter_by_month_cfg,
                                                       task_count_values_cfg])

scenario_cfg = Config.configure_scenario(id="my_scenario",
                                         pipeline_configs=[pipeline_cfg],
                                         frequency=Frequency.MONTHLY)


#scenario_cfg = Config.configure_scenario_from_tasks(id="my_scenario",
#                                                    task_configs=[task_filter_by_month_cfg,
#                                                    task_count_values_cfg])

In [20]:
tp.Core().run()

scenario_1 = tp.create_scenario(scenario_cfg,
                                creation_date=dt.datetime(2022,10,7),
                                name="Scenario 2022/10/7")
scenario_2 = tp.create_scenario(scenario_cfg,
                                creation_date=dt.datetime(2022,10,5),
                                name="Scenario 2022/10/5")

Scenario 1 and 2 belongs to the same cycle but they don't share the same data node. Each one have a Data Node by itself.

In [21]:
scenario_1.month.write(10)
scenario_2.month.write(10)


print("Month Data Node of Scenario 1", scenario_1.month.read())
print("Month Data Node of Scenario 2", scenario_2.month.read())

scenario_1.submit()
scenario_2.submit()

scenario_3 = tp.create_scenario(scenario_cfg,
                                creation_date=dt.datetime(2021,9,1),
                                name="Scenario 2022/9/1")
scenario_3.month.write(9)
scenario_3.submit()

Month Data Node of Scenario 1 10
Month Data Node of Scenario 2 10
[2022-12-22 16:20:04,746][Taipy][INFO] job JOB_filter_by_month_a4d3c4a7-5ec9-4cca-8a1b-578c910e255a is completed.
[2022-12-22 16:20:04,833][Taipy][INFO] job JOB_count_values_a81b2f60-e9f9-4848-aa58-272810a0b755 is completed.
[2022-12-22 16:20:05,026][Taipy][INFO] job JOB_filter_by_month_22a3298b-ac8d-4b55-b51f-5fab0971cc9e is completed.
[2022-12-22 16:20:05,084][Taipy][INFO] job JOB_count_values_a52b910a-4024-443e-8ea2-f3cdda6c1c9d is completed.
[2022-12-22 16:20:05,317][Taipy][INFO] job JOB_filter_by_month_8643e5cf-e863-434f-a1ba-18222d6faab8 is completed.
[2022-12-22 16:20:05,376][Taipy][INFO] job JOB_count_values_72ab71be-f923-4898-a8a8-95ec351c24d9 is completed.


{'PIPELINE_my_pipeline_8f1e1475-9294-41be-a9da-70539491524a': [<taipy.core.job.job.Job at 0x21940433580>,
  <taipy.core.job.job.Job at 0x21940431750>]}

## Scoping 

Scoping determines how Data Nodes are shared between cycles, scenarios, and pipelines. Indeed, multiple scenarios can have their own Data Nodes or share the same one. For example, the initial/historical dataset is usually shared by all the scenarios/pipelines/cycles. It has a Global Scope and will be unique in the entire application.

In [22]:
!rmdir /s /q .data

In [23]:
from taipy.core.config import Scope

In [24]:
def filter_by_month(df, month):
    df['Date'] = pd.to_datetime(df['Date']) 
    df = df[df['Date'].dt.month == month]
    return df

def count_values(df):
    return len(df)

- **Pipeline** scope: two pipelines can reference different Data Nodes even if their names are the same. For example, the _prediction_ Data Node of an ARIMA model (ARIMA pipeline) and the _prediction_ Data Node of a RandomForest model (RandomForest pipeline). 

- **Scenario** scope: pipelines share the same Data Node within a scenario. 

- **Cycle** scope: scenarios from the same cycle share the same Data Node

- **Global** scope: unique Data Node for all the scenarios/pipelines/cycles

______________________ Taipy Studio ______________________
- Create new file: 'config.toml'
- Open Taipy Studio view
- Go to the 'Config files' section of Taipy Studio
- Right click on the right configuration
- Choose 'Taipy: Show View'
- Add your first Data Node by clicking the button on the right above corner of the windows
- Create a name for it and change its details in the 'Details' section of Taipy Studio
        - name: historical_data
        - Details: default_path=xxxx/yyyy.csv, storage_type=csv
- Do the same for the month_data and nb_of_values
        - name: output
        - Details: storage_type:pickle
- Add a task and choose a function to associate with `<module>.<name>:function`
        -name: filter_current
        -Details: function=`__main__.filter_current:function`
- Do the same for count_values
- Link the Data Nodes and the tasks
- Add a pipeline and link it to the tasks
- Add a scenario and link to the pipeline
- Add the frequency property and put "WEEKLY:FREQUENCY" (DAILY, WEEKLY, MONTHLY, YEARLY)

In [25]:
historical_data_cfg = Config.configure_csv_data_node(id="historical_data",
                                                 default_path="time_series.csv",
                                                 scope=Scope.GLOBAL)
month_cfg =  Config.configure_data_node(id="month", scope=Scope.CYCLE)

month_values_cfg = Config.configure_data_node(id="month_data",
                                               scope=Scope.CYCLE)
nb_of_values_cfg = Config.configure_data_node(id="nb_of_values")


task_filter_by_month_cfg = Config.configure_task(id="filter_by_month",
                                                 function=filter_by_month,
                                                 input=[historical_data_cfg,month_cfg],
                                                 output=month_values_cfg)

task_count_values_cfg = Config.configure_task(id="count_values",
                                                 function=count_values,
                                                 input=month_values_cfg,
                                                 output=nb_of_values_cfg)

pipeline_cfg = Config.configure_pipeline(id="my_pipeline",
                                         task_configs=[task_filter_by_month_cfg,
                                                       task_count_values_cfg])

scenario_cfg = Config.configure_scenario(id="my_scenario",
                                         pipeline_configs=[pipeline_cfg],
                                         frequency=Frequency.MONTHLY)


#scenario_cfg = Config.configure_scenario_from_tasks(id="my_scenario",
#                                                    task_configs=[task_filter_by_month_cfg,
#                                                                  task_count_values_cfg])

In [26]:
tp.Core().run()

scenario_1 = tp.create_scenario(scenario_cfg,
                                creation_date=dt.datetime(2022,10,7),
                                name="Scenario 2022/10/7")
scenario_2 = tp.create_scenario(scenario_cfg,
                               creation_date=dt.datetime(2022,10,5),
                               name="Scenario 2022/10/5")
scenario_3 = tp.create_scenario(scenario_cfg,
                                creation_date=dt.datetime(2021,9,1),
                                name="Scenario 2021/9/1")

Scenario 1 and 2 belongs to the same cycle so I can define the month just once for scenario 1 and 2 because month has a Cycle scope.

![]()

In [27]:
scenario_1.month.write(10)
print("Scenario 1: month", scenario_1.month.read())
print("Scenario 2: month", scenario_2.month.read())

Scenario 1: month 10
Scenario 2: month 10


In [28]:
print("\nScenario 1: submit")
scenario_1.submit()
print("Value", scenario_1.nb_of_values.read())


Scenario 1: submit
[2022-12-22 16:20:05,810][Taipy][INFO] job JOB_filter_by_month_d71cfd10-f674-40c8-b7a5-c66bea8773ef is completed.
[2022-12-22 16:20:05,902][Taipy][INFO] job JOB_count_values_cbe0b3b3-2531-440a-9413-48845c9cfdf1 is completed.
Value 849


In [29]:
print("\nScenario 2: first submit")
scenario_2.submit()
print("Value", scenario_2.nb_of_values.read())
print("Scenario 2: second submit")
scenario_2.submit()
print("Value", scenario_2.nb_of_values.read())


Scenario 2: first submit
[2022-12-22 16:20:06,101][Taipy][INFO] job JOB_filter_by_month_2e474e1b-dc0b-464c-8d14-d64a59535717 is completed.
[2022-12-22 16:20:06,162][Taipy][INFO] job JOB_count_values_259039e2-20ed-4400-b11a-06119939f081 is completed.
Value 849
Scenario 2: second submit
[2022-12-22 16:20:06,356][Taipy][INFO] job JOB_filter_by_month_705fcb69-64fc-4f66-a5f3-90169e09f8bf is completed.
[2022-12-22 16:20:06,426][Taipy][INFO] job JOB_count_values_5a8eea88-4477-48ab-a401-8036bada2267 is completed.
Value 849


In [30]:
print("\nScenario 3: submit")
scenario_3.month.write(9)
scenario_3.submit()
print("Value", scenario_3.nb_of_values.read())


Scenario 3: submit
[2022-12-22 16:20:07,404][Taipy][INFO] job JOB_filter_by_month_d0eefd6f-af96-46b8-b916-7cb94303ae4d is completed.
[2022-12-22 16:20:07,578][Taipy][INFO] job JOB_count_values_469f6ae3-7a8a-4b65-9175-899399013f60 is completed.
Value 1012


In [31]:
print("Scenario 3: change in historical data")
scenario_3.historical_data.write(pd.read_csv('time_series_2.csv'))
scenario_3.submit()
print("Value", scenario_3.nb_of_values.read())

Scenario 3: change in historical data
[2022-12-22 16:20:08,150][Taipy][INFO] job JOB_filter_by_month_8bab73fb-733b-4d8e-93fb-c550213bb2f1 is completed.
[2022-12-22 16:20:08,300][Taipy][INFO] job JOB_count_values_909c5bfa-a622-4959-aa2e-864abef8a0b8 is completed.
Value 1012


## Caching

Caching is an important feature of Taipy. Tasks can be skipped depending if input data nodes of tasks have changed or not. If none of the input data nodes have been changed after a first submission, tasks will be skipped saving time and ressources.

In [32]:
!rmdir /s /q .data

In [33]:
def filter_by_month(df, month):
    df['Date'] = pd.to_datetime(df['Date']) 
    df = df[df['Date'].dt.month == month]
    return df

def count_values(df):
    return len(df)

______________________ Taipy Studio ______________________
- Create new file: 'config.toml'
- Open Taipy Studio view
- Go to the 'Config files' section of Taipy Studio
- Right click on the right configuration
- Choose 'Taipy: Show View'
- Add your first Data Node by clicking the button on the right above corner of the windows
- Create a name for it and change its details in the 'Details' section of Taipy Studio
        - name: historical_data
        - Details: default_path=xxxx/yyyy.csv, storage_type=csv
- Do the same for the month_data and nb_of_values
        - name: output
        - Details: storage_type:pickle, cacheable=True
- Add a task and choose a function to associate with `<module>.<name>:function`
        -name: filter_current
        -Details: function=`__main__.filter_current:function`
- Do the same for count_values
- Link the Data Nodes and the tasks
- Add a pipeline and link it to the tasks
- Add a scenario and link to the pipeline
- Add the frequency property and put "WEEKLY:FREQUENCY" (DAILY, WEEKLY, MONTHLY, YEARLY)

The configuration is the same. 'cacheabable' are added to the output Data Nodes that we want to be cached.

In [34]:
historical_data_cfg = Config.configure_csv_data_node(id="historical_data",
                                                 default_path="time_series.csv",
                                                 scope=Scope.GLOBAL)
month_cfg =  Config.configure_data_node(id="month", scope=Scope.CYCLE)
month_values_cfg =  Config.configure_data_node(id="month_data",
                                               scope=Scope.CYCLE,
                                               cacheable=True)

nb_of_values_cfg = Config.configure_data_node(id="nb_of_values",
                                              cacheable=True)


task_filter_by_month_cfg = Config.configure_task(id="filter_by_month",
                                                 function=filter_by_month,
                                                 input=[historical_data_cfg, month_cfg],
                                                 output=month_values_cfg)

task_count_values_cfg = Config.configure_task(id="count_values",
                                                 function=count_values,
                                                 input=month_values_cfg,
                                                 output=nb_of_values_cfg)

pipeline_cfg = Config.configure_pipeline(id="my_pipeline",
                                         task_configs=[task_filter_by_month_cfg,
                                                       task_count_values_cfg])

scenario_cfg = Config.configure_scenario(id="my_scenario",
                                         pipeline_configs=[pipeline_cfg],
                                         frequency=Frequency.MONTHLY)

#scenario_cfg = Config.configure_scenario_from_tasks(id="my_scenario",
#                                                    task_configs=[task_filter_by_month_cfg,
#                                                    task_count_values_cfg])

Creation of three different scenarios with different creation dates and names.

In [35]:
tp.Core().run()

scenario_1 = tp.create_scenario(scenario_cfg,
                                creation_date=dt.datetime(2022,10,7),
                                name="Scenario 2022/10/7")
scenario_2 = tp.create_scenario(scenario_cfg,
                               creation_date=dt.datetime(2022,10,5),
                               name="Scenario 2022/10/5")
scenario_3 = tp.create_scenario(scenario_cfg,
                                creation_date=dt.datetime(2021,9,1),
                                name="Scenario 2022/9/1")

In [36]:
# scenario 1 and 2 belongs to the same cycle so 
# defining the month for scenario 1 defines the month for the scenarios in the cycle
scenario_1.month.write(10)
print("Scenario 1: month", scenario_1.month.read())
print("Scenario 2: month", scenario_2.month.read())

Scenario 1: month 10
Scenario 2: month 10


 No task has already been submitted so everything will be submitted

In [37]:
print("Scenario 1: submit")
scenario_1.submit()
print("Value", scenario_1.nb_of_values.read())

Scenario 1: submit
[2022-12-22 16:20:09,079][Taipy][INFO] job JOB_filter_by_month_0d7836eb-70eb-4fe6-b954-0e56967831b6 is completed.
[2022-12-22 16:20:09,177][Taipy][INFO] job JOB_count_values_91214241-ce81-42d8-9025-e83509652133 is completed.
Value 849


When the second scenario is being executed, the first task will be skipped. Indeed, the two scenarios shares the same data nodes for this task and no input data nodes have been changed. node

In [38]:
# first task has already been executed by scenario 1 because scenario 2 shares the same data node for this task
print("Scenario 2: first submit")
scenario_2.submit()
print("Value", scenario_2.nb_of_values.read())

Scenario 2: first submit
[2022-12-22 16:20:09,317][Taipy][INFO] job JOB_filter_by_month_c1db1f0c-6e0a-4691-b0a3-331d473c4c42 is skipped.
[2022-12-22 16:20:09,371][Taipy][INFO] job JOB_count_values_271cefd0-8648-47fa-8948-ed49e93e3eee is completed.
Value 849


Resubmitting the same scenario without any change will just skip every task.

In [39]:
# every task has already been executed so everything will be skipped
print("Scenario 2: second submit")
scenario_2.submit()
print("Value", scenario_2.nb_of_values.read())

Scenario 2: second submit
[2022-12-22 16:20:09,516][Taipy][INFO] job JOB_filter_by_month_da2762d1-6f24-40c1-9bd1-d6786fee7a8d is skipped.
[2022-12-22 16:20:09,546][Taipy][INFO] job JOB_count_values_9071dff4-37b2-4095-a7ed-34ef81daad27 is skipped.
Value 849


This scenario is not in the same cycle. We change the month to 9 and every task will be completed. 

In [40]:
# scenario 3 has no connection to the other scenarios so everything will be executed
print("Scenario 3: submit")
scenario_3.month.write(9)
scenario_3.submit()
print("Value", scenario_3.nb_of_values.read())

Scenario 3: submit
[2022-12-22 16:20:10,071][Taipy][INFO] job JOB_filter_by_month_c4d06eba-a149-4b79-9194-78972c7b7a18 is completed.
[2022-12-22 16:20:10,257][Taipy][INFO] job JOB_count_values_817df173-6bae-4742-a2c0-b8b8eba52872 is completed.
Value 1012


Here, we change the input data node of the pipeline so Taipy will re run the correct tasks to make sure that everything is up-to-date.

In [41]:
# changing an input data node will make the task be reexecuted
print("Scenario 3: change in historical data")
scenario_3.historical_data.write(pd.read_csv('time_series_2.csv'))
scenario_3.submit()
print("Value", scenario_3.nb_of_values.read())

Scenario 3: change in historical data
[2022-12-22 16:20:10,870][Taipy][INFO] job JOB_filter_by_month_92f32135-b410-41f0-b9f3-a852c2eb07cd is completed.
[2022-12-22 16:20:10,932][Taipy][INFO] job JOB_count_values_a6a75e13-4cd4-4f7e-bc4e-d14a86733440 is completed.
Value 1012


## Execution modes

Taipy has different ways to execute the code. There is two different job execution modes:
- standalone: asynchronous. Jobs can be runned in parallel depending on the graph of execution if max_nb_of_workers > 1
- development mode: synchronous

Options of submit:
- wait: if wait is True, the submit is synchronous and will wait for the end of all the jobs (if timeout is not defined)
- timeout: if wait is True, Taipy will wait for the end of the submit until a certain amount of time

In [42]:
!rmdir /s /q .data

In [43]:
#  Taipy Core Data nodes - CSV, pickle
from taipy.core.config import Config, Scope, Frequency
import taipy as tp
import datetime as dt
import pandas as pd
import time

In [44]:
def filter_by_month(df, month):
    df['Date'] = pd.to_datetime(df['Date']) 
    df = df[df['Date'].dt.month == month]
    return df

def count_values(df):
    print("Wait 10 seconds")
    time.sleep(10)
    return len(df)

In [45]:
Config.configure_job_executions(mode="standalone", max_nb_of_workers=2)

<taipy.core.config.job_config.JobConfig at 0x2193e63c3d0>

In [46]:
historical_data_cfg = Config.configure_csv_data_node(id="historical_data",
                                                 default_path="time_series.csv",
                                                 scope=Scope.GLOBAL)

month_cfg = Config.configure_data_node(id="month",
                                       scope=Scope.CYCLE)

month_values_cfg =  Config.configure_data_node(id="month_data",
                                               scope=Scope.CYCLE,
                                               cacheable=True)

nb_of_values_cfg = Config.configure_data_node(id="nb_of_values",
                                              cacheable=True)


task_filter_by_month_cfg = Config.configure_task(id="filter_by_month",
                                                 function=filter_by_month,
                                                 input=[historical_data_cfg, month_cfg],
                                                 output=month_values_cfg)

task_count_values_cfg = Config.configure_task(id="count_values",
                                                 function=count_values,
                                                 input=month_values_cfg,
                                                 output=nb_of_values_cfg)

pipeline_cfg = Config.configure_pipeline(id="my_pipeline",
                                         task_configs=[task_filter_by_month_cfg,
                                                       task_count_values_cfg])

scenario_cfg = Config.configure_scenario(id="my_scenario",
                                         pipeline_configs=[pipeline_cfg],
                                         frequency=Frequency.MONTHLY)

#scenario_cfg = Config.configure_scenario_from_tasks(id="my_scenario",
#                                                    task_configs=[task_filter_by_month_cfg,
#                                                    task_count_values_cfg])

In [47]:
if __name__=="__main__":
    tp.Core().run()
    scenario_1 = tp.create_scenario(scenario_cfg, creation_date=dt.datetime(2022,10,7), name="Scenario 2022/10/7")
    scenario_1.month.write(10)
    scenario_1.submit()
    scenario_1.submit()

    time.sleep(30)

exception calling callback for <Future at 0x21940406590 state=finished raised PicklingError>
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Users\jacta\AppData\Local\R-MINI~1\envs\notebook\lib\multiprocessing\queues.py", line 245, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "C:\Users\jacta\AppData\Local\R-MINI~1\envs\notebook\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function filter_by_month at 0x00000219403DDA20>: it's not the same object as __main__.filter_by_month
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\jacta\AppData\Local\R-MINI~1\envs\notebook\lib\concurrent\futures\_base.py", line 330, in _invoke_callbacks
    callback(self)
  File "C:\Users\jacta\AppData\Local\R-MINI~1\envs\notebook\lib\site-packages\taipy\core\_scheduler\_dispatcher\_standalone_job_dispatc

In [48]:
if __name__=="__main__":
    tp.Core().run()
    scenario_1 = tp.create_scenario(scenario_cfg, creation_date=dt.datetime(2022,10,7), name="Scenario 2022/10/7")
    scenario_1.month.write(10)
    scenario_1.submit(wait=True)
    scenario_1.submit(wait=True, timeout=5)

exception calling callback for <Future at 0x21940367790 state=finished raised PicklingError>
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Users\jacta\AppData\Local\R-MINI~1\envs\notebook\lib\multiprocessing\queues.py", line 245, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "C:\Users\jacta\AppData\Local\R-MINI~1\envs\notebook\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function filter_by_month at 0x00000219403DDA20>: it's not the same object as __main__.filter_by_month
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\jacta\AppData\Local\R-MINI~1\envs\notebook\lib\concurrent\futures\_base.py", line 330, in _invoke_callbacks
    callback(self)
  File "C:\Users\jacta\AppData\Local\R-MINI~1\envs\notebook\lib\site-packages\taipy\core\_scheduler\_dispatcher\_standalone_job_dispatc

KeyboardInterrupt: 

## Callback on scenarios

To have an action after the change of status of a job, we can subscribe a function to a scenario. This function will be called each time a job has its status changed.

In [None]:
def callback_scenario_state(scenario, job):
    """All the scenarios are subscribed to the callback_scenario_state function. It means whenever a job is done, it is called.
    Depending on the job and the status, it will update the message stored in a json that is then displayed on the GUI.

    Args:
        scenario (Scenario): the scenario of the job changed
        job (_type_): the job that has its status changed
    """
    print(scenario.name)
    if job.status.value == 7:
        for data_node in job.task.output.values():
            print(data_node.read())


if __name__=="__main__":
    tp.Core().run()
    scenario_1 = tp.create_scenario(scenario_cfg, creation_date=dt.datetime(2022,10,7), name="Scenario 2022/10/7")
    scenario_1.subscribe(callback_scenario_state)

    scenario_1.submit(wait=True)
    scenario_1.submit(wait=True, timeout=5)


## Comparison of funtions

Taipy provides a way to compare scenarios by providing a function directly into the configuration of the scenario.

_data_node_results_ is a list of data nodes from all scenarios passed in the comparator. We iterate through it to compare scenarios.

In [None]:
def compare_function(*data_node_results):
    compare_result= {}
    current_res_i = 0
    for current_res in data_node_results:
        compare_result[current_res_i]={}
        next_res_i = 0
        for next_res in data_node_results:
            print(f"comparing result {current_res_i} with result {next_res_i}")
            compare_result[current_res_i][next_res_i] = next_res - current_res
            next_res_i += 1
        current_res_i += 1
    return compare_result

The Data Node that will be compared here is the 'month' Data Node. It is indicated in the comparators parameter of the _configure_scenario_.

In [None]:
scenario_cfg = Config.configure_scenario("multiply_scenario",
                                         [pipeline_cfg],
                                         comparators={month_cfg.id: compare_function},
                                         frequency=Frequency.MONTHLY)

#scenario_cfg = Config.configure_scenario_from_tasks(id="my_scenario",
#                                                    task_configs=[task_filter_by_month_cfg,
#                                                                  task_count_values_cfg])

In [None]:
tp.Core().run()

scenario_1 = tp.create_scenario(scenario_cfg,
                                creation_date=dt.datetime(2022,10,7),
                                name="Scenario 2022/10/7")
scenario_2 = tp.create_scenario(scenario_cfg,
                                creation_date=dt.datetime(2022,8,5),
                                name="Scenario 2022/8/5")

scenario_1.month.write(10)
scenario_2.month.write(8)
print("Scenario 1: month", scenario_1.month.read())
print("Scenario 2: month", scenario_2.month.read())

print("\nScenario 1: submit")
scenario_1.submit()
print("Value", scenario_1.nb_of_values.read())

print("\nScenario 2: first submit")
scenario_2.submit()
print("Value", scenario_2.nb_of_values.read())


print(tp.compare_scenarios(scenario_1, scenario_2))

## Taipy Rest

Taipy Rest allows the user to navigate through the entities of the application but also create and submit scenarios. Try the following commands:

- 
- 
- 
- 

In [None]:
tp.Rest().run()