In this tutorial, we will guide you through the process of setting up an end-to-end continuously running workflow for the purposes of continuous ingestion of data.

We will cover the following:

- Preparing your dataset for synthetic data generation.
- Utilizing Rockfish Recommendation Engine to automatically determine the most suitable model for training, along with key configurations and settings required for successful onboarding.
- Generating and then evaluating synthetic data using the Rockfish Synthetic Data Assessor, which will help you improve the quality of your synthetic datasets.
- Setting up an always on workflow using the settings generated from the onboarding process.
- Applying custom labels to the models that are trained by the workflow.
- Searching for a previously trained model in Rockfish's model store.
- Using the model to generate synthetic data.


### Install and Import Rockfish SDK


In [1]:
%%capture
%pip install -U 'rockfish[labs]' -f 'https://docs.rockfish.ai/packages/index.html'

In [2]:
import rockfish as rf
import rockfish.actions as ra
from rockfish.labs.dataset_properties import (
    DatasetPropertyExtractor,
    FieldType,
    EncoderType,
)
from rockfish.labs.steps import Recommender
from rockfish.labs.metrics import marginal_dist_score
from rockfish.labs.sda import SDA

import time

### Connect to the Rockfish Platform

❗❗ Replace API_KEY and API_URL.


In [3]:
api_key = "API_KEY"

conn = rf.Connection.remote("https://api.rockfish.ai", api_key)

# 1. Onboard the dataset onto Rockfish


### Load the Dataset

We support ingesting other data formats, refer documentation for more details.


In [4]:
%%capture
!wget --no-clobber https://docs.rockfish.ai/tutorials/finance-1.csv
dataset = rf.Dataset.from_csv("finance", "finance-1.csv")

I0000 00:00:1730509525.955975 2571018 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers


In [5]:
dataset.to_pandas()

Unnamed: 0,customer,age,gender,merchant,category,amount,fraud,timestamp
0,C100045114,4,M,M348934600,transportation,35.13,0,2023-01-01 00:00:00.000000000
1,C100045114,4,M,M348934600,transportation,27.63,0,2023-01-01 08:00:00.000000000
2,C100045114,4,M,M348934600,transportation,13.46,0,2023-01-01 16:00:00.000000000
3,C100045114,4,M,M348934600,transportation,28.86,0,2023-01-02 00:00:00.000000000
4,C100045114,4,M,M151143676,barsandrestaurants,64.99,0,2023-01-02 08:00:00.000000000
...,...,...,...,...,...,...,...,...
8774,C1335108214,3,M,M348934600,transportation,62.58,0,2023-01-09 15:53:05.114045618
8775,C1335108214,3,M,M348934600,transportation,11.99,0,2023-01-09 23:53:05.114045618
8776,C1335108214,3,M,M348934600,transportation,18.85,0,2023-01-10 07:53:05.114045618
8777,C1335108214,3,M,M348934600,transportation,41.86,0,2023-01-10 15:53:05.114045618


### Onboard the dataset onto Rockfish

The onboarding workflow is a good starting point to get to a synthetic version of your dataset quickly.

To ensure optimal synthetic data generation, it's crucial to provide domain-specific information related to your dataset. This helps Rockfish’s Recommendation Engine tailor the workflow to your specific needs.


In [6]:
dataset_properties = DatasetPropertyExtractor(
    dataset,
    session_key="customer",
    metadata_fields=["age", "gender"],
    additional_property_keys=["association_rules"],
).extract()
recommender_output = Recommender(dataset_properties).run()
print(recommender_output.report)

# _________________________________________________________________________
#
# RECOMMENDED CONFIGURATIONS
#
# (Remove or change any actions or configurations that are inappropriate
#  for your use case, or add missing ones)
# _________________________________________________________________________


We detected a timeseries dataset with the following properties:
Dimensions of dataset: (8779 x 8)
Metadata fields: ['age', 'gender']
Measurement fields: ['category', 'amount', 'merchant', 'fraud']
Timestamp field: timestamp
Session key field: customer
Number of sessions: 658

# _________________________________________________________________________
#
# ~~~~~ Pre-processing recommendations ~~~~~
# _________________________________________________________________________



# _________________________________________________________________________
#
# ~~~~~ Model recommendations ~~~~~
# _________________________________________________________________________


We recommend using the Tim

#### Run the recommended workflow to get a synthetic dataset


In [7]:
rec_actions = recommender_output.actions
save = ra.DatasetSave({"name": "synthetic"})

# use recommended actions in a Rockfish workflow
builder = rf.WorkflowBuilder()
builder.add_path(dataset, *rec_actions, save)

# run the Rockfish workflow
pre_workflow = await builder.start(conn)
print(f"Workflow: {pre_workflow.id()}")

Workflow: 7hOPVXb0i0FM8kb4Z6SrVf


View logs for the running workflow:


In [8]:
async for log in pre_workflow.logs():
    print(log)

2024-11-02T01:05:27Z dataset-load: INFO Loading dataset '4RiOKAAJcbPQyuZ90BWIqF' with 8779 rows
2024-11-02T01:05:27Z train-time-gan: WARN Unsafe time cast on timestamp
2024-11-02T01:05:28Z train-time-gan: INFO Starting DG training job
2024-11-02T01:05:31Z train-time-gan: INFO Epoch 1 completed.
2024-11-02T01:05:35Z train-time-gan: INFO Epoch 2 completed.
2024-11-02T01:05:39Z train-time-gan: INFO Epoch 3 completed.
2024-11-02T01:05:42Z train-time-gan: INFO Epoch 4 completed.
2024-11-02T01:05:46Z train-time-gan: INFO Epoch 5 completed.
2024-11-02T01:05:49Z train-time-gan: INFO Epoch 6 completed.
2024-11-02T01:05:53Z train-time-gan: INFO Epoch 7 completed.
2024-11-02T01:05:57Z train-time-gan: INFO Epoch 8 completed.
2024-11-02T01:06:00Z train-time-gan: INFO Epoch 9 completed.
2024-11-02T01:06:04Z train-time-gan: INFO Epoch 10 completed.
2024-11-02T01:06:06Z train-time-gan: INFO Training completed. Uploaded model a2e041c3-98b6-11ef-9a67-22aa4ec1349b
2024-11-02T01:06:06Z generate-time-gan: 

Download and view the synthetic dataset locally:


In [9]:
syn = await pre_workflow.datasets().last()
syn = await syn.to_local(conn)
syn.to_pandas()

Unnamed: 0,timestamp,amount,age,gender,merchant,category,fraud,session_key
0,2023-01-05 21:03:45.274,5017.255356,2,F,M1823072687,transportation,0,0.0
1,2023-01-06 08:18:59.735,4701.608062,2,F,M348934600,transportation,0,0.0
2,2023-01-06 21:14:20.711,4288.048567,2,F,M348934600,transportation,0,0.0
3,2023-01-07 11:20:16.057,4005.334858,2,F,M348934600,transportation,0,0.0
4,2023-01-08 01:53:06.147,3778.009436,2,F,M348934600,transportation,0,0.0
...,...,...,...,...,...,...,...,...
3713,2023-01-11 05:10:29.262,-25.121359,0,M,M348934600,transportation,0,199.0
3714,2023-01-11 13:50:43.573,-29.509062,0,M,M348934600,transportation,0,199.0
3715,2023-01-11 22:02:48.081,-31.650354,0,M,M348934600,transportation,0,199.0
3716,2023-01-12 05:44:28.499,-30.706333,0,M,M348934600,transportation,0,199.0


### Evaluate the synthetic dataset


In [10]:
# @title ##### Define a helper function `get_fidelity_score()` to calculate the marginal distribution score:

import copy


def get_fidelity_score(source, source_dataset_properties, syn):
    source = copy.deepcopy(source)
    syn = copy.deepcopy(syn)

    columns_to_drop = [source_dataset_properties.session_key]
    source.table = source.table.drop_columns(columns_to_drop)

    columns_to_drop = ["session_key"]
    syn.table = syn.table.drop_columns(columns_to_drop)

    categorical_measurements = source_dataset_properties.filter_fields(
        ftype=FieldType.MEASUREMENT, etype=EncoderType.CATEGORICAL
    )

    return marginal_dist_score(
        source,
        syn,
        metadata=source_dataset_properties.metadata_fields,
        other_categorical=categorical_measurements,
    )

In [11]:
get_fidelity_score(
    source=dataset, source_dataset_properties=dataset_properties, syn=syn
)

0.6394508595969407

### Since the actions look good, we can use them for setting up the always-on workflow.


In [12]:
rec_actions

[<rockfish.actions.dg.TrainTimeGAN at 0x13714e7e0>,
 <rockfish.actions.dg.GenerateTimeGAN at 0x1078cd790>]

In [13]:
train_actions = rec_actions[:-1]
generate_actions = rec_actions[-1:]

# 2. Set up an always-on workflow for continuous data ingestion


### Employ the DataStreamLoad action to keep the workflow always on


In [14]:
# reduce the batch size for the following ingested data stream as the batch
# size should be smaller than the number of sessions in the dataset
train_actions[0].config().doppelganger.batch_size = 14
stream = ra.DatastreamLoad()

builder = rf.WorkflowBuilder()
builder.add(stream, alias="input")
builder.add_path(*train_actions, parents=["input"], alias="train_actions")
ingest_workflow = await builder.start(conn)
print(f"Ingestion Workflow ID: {ingest_workflow.id()}")

Ingestion Workflow ID: BFzmU53Fdt9heeFeTgAOw


### Write the data files to the workflow stream

- each input is a dataset
- each output is a trained model stored to the model_store


### Write data files to the workflow stream

Replace the workflow ID with the actual workflow ID of the workflow that was set up


### Download the sample files for the datastream workflow


In [15]:
%%capture
!wget --no-clobber https://docs.rockfish.ai/tutorials/finance-2.csv
!wget --no-clobber https://docs.rockfish.ai/tutorials/finance-3.csv
!wget --no-clobber https://docs.rockfish.ai/tutorials/finance-4.csv

I0000 00:00:1730509572.085200 2571018 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers
I0000 00:00:1730509572.353754 2571018 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers
I0000 00:00:1730509572.588498 2571018 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers


### Replace the workflow ID with the ID of the workflow that was just set up.

This also allows you to run the data-ingestion service in an independent process.


In [16]:
# Retrieve the workflow with the previous ID without need to re-build the workflow
workflow_id = ingest_workflow.id()  # insert workflow ID here
workflow = await conn.get_workflow(workflow_id)

In [17]:
for file_num in range(2, 5):
    data = rf.Dataset.from_csv("finance", f"finance-{file_num}.csv")
    await workflow.write_datastream(
        "input", data
    )  # "input" is the pre-set alias of the datastream
    print(f"Writing finance-{file_num} to datastream...")
    time.sleep(10)
await workflow.close_datastream(
    "input"
)  # "input" is the pre-set alias of the datastream

Writing finance-2 to datastream...
Writing finance-3 to datastream...
Writing finance-4 to datastream...


In [18]:
# check the status of the workflow
async for log in workflow.logs():
    print(log)

2024-11-02T01:06:14Z train_actions: WARN Unsafe time cast on timestamp
2024-11-02T01:06:15Z train_actions: INFO Starting DG training job
2024-11-02T01:06:19Z train_actions: INFO Epoch 1 completed.
2024-11-02T01:06:23Z train_actions: INFO Epoch 2 completed.
2024-11-02T01:06:14Z input: INFO Read batch with 8850 rows from datastream
2024-11-02T01:06:25Z input: INFO Read batch with 9080 rows from datastream
2024-11-02T01:06:27Z train_actions: INFO Epoch 3 completed.
2024-11-02T01:06:35Z input: INFO Read batch with 6979 rows from datastream
2024-11-02T01:06:31Z train_actions: INFO Epoch 4 completed.
2024-11-02T01:06:35Z train_actions: INFO Epoch 5 completed.
2024-11-02T01:06:39Z train_actions: INFO Epoch 6 completed.
2024-11-02T01:06:43Z train_actions: INFO Epoch 7 completed.
2024-11-02T01:06:47Z train_actions: INFO Epoch 8 completed.
2024-11-02T01:06:51Z train_actions: INFO Epoch 9 completed.
2024-11-02T01:06:56Z train_actions: INFO Epoch 10 completed.
2024-11-02T01:06:57Z train_actions: I

### Optional: Add custom labels to the models that are generated

These labels can be used later to filter models based off custom parameters


In [19]:
usage = ["experimental", "staging", "production"]
i = 0
async for model in conn.list_models(labels={"workflow_id": workflow_id}):
    await model.add_labels(conn, usage=usage[i])
    i += 1

# 3. Generate synthetic data using the trained model


### Provide query params to the model_store search to get appropriate models as response

This can be used if the models trained were previously tagged, the default label that exists is 'workflow_id' which is the id of the workflow that trained the model


In [20]:
async for model in conn.list_models(labels={"usage": "production"}):
    print(model)

Model(id='2cdd3986-98b0-11ef-a2bf-7af83af06a78', labels={'usage': 'production', 'workflow_id': '6piKEAAJARevbodbSUIzMf'}, create_time=datetime.datetime(2024, 11, 2, 0, 19, 50, tzinfo=datetime.timezone.utc), size_bytes=256840192)
Model(id='2108ac09-98b5-11ef-9a67-22aa4ec1349b', labels={'usage': 'production', 'workflow_id': '6PynH6YbcdNT6AzqOUcj4M'}, create_time=datetime.datetime(2024, 11, 2, 0, 55, 18, tzinfo=datetime.timezone.utc), size_bytes=256840192)
Model(id='347fa99a-98b6-11ef-a2bf-7af83af06a78', labels={'usage': 'production', 'workflow_id': '6W6Mls8b0Q7wc3zCTMYOVD'}, create_time=datetime.datetime(2024, 11, 2, 1, 3, tzinfo=datetime.timezone.utc), size_bytes=256835072)
Model(id='da7b689e-98b6-11ef-9a67-22aa4ec1349b', labels={'usage': 'production', 'workflow_id': 'BFzmU53Fdt9heeFeTgAOw'}, create_time=datetime.datetime(2024, 11, 2, 1, 7, 38, tzinfo=datetime.timezone.utc), size_bytes=171769856)
Model(id='c00744e0-98a8-11ef-a2bf-7af83af06a78', labels={'usage': 'production', 'workflow_i

### Select a model from the list of queried models and fetch it from remote


In [22]:
model = await rf.Model.from_id(
    conn,
    model.id,  # insert model id here of the filtered model after querying
)
print(model)

Model(id='c00744e0-98a8-11ef-a2bf-7af83af06a78', labels={'usage': 'production', 'workflow_id': '3S0ArPDbXDAD5cNeF9cnlJ'}, create_time=datetime.datetime(2024, 11, 1, 23, 26, 41, tzinfo=datetime.timezone.utc), size_bytes=209343488)


### Provide the model and the synthesis config to a workflow to generate a synthetic dataset as the output


In [23]:
builder = rf.WorkflowBuilder()
builder.add(model)
builder.add(*generate_actions, parents=[model], alias="gen")
builder.add(ra.DatasetSave(name="syn_data"), parents=["gen"])
workflow = await builder.start(conn)
print(f"Workflow ID: {workflow.id()}")

Workflow ID: 4P0fla7gp6MU85Tqx5DuyQ


In [25]:
async for log in workflow.logs():
    print(log)

2024-11-02T01:10:13Z dataset-save: INFO using field 'session_key' to concatenate tables
2024-11-02T01:10:13Z dataset-save: INFO Saved dataset 'BC4sYkI65pgGwZ2QctElx' with 35068 rows
2024-11-02T01:09:58Z gen: INFO Downloading model with model_id='c00744e0-98a8-11ef-a2bf-7af83af06a78'...
2024-11-02T01:10:11Z gen: INFO Generating 200 sessions...


In [26]:
syn_data = await workflow.datasets().concat(conn)
syn_data.to_pandas()

Unnamed: 0,timestamp,amount,age,gender,merchant,category,fraud,session_key
0,2023-01-06 23:59:38.418,5163.387593,5,M,M1946091778,fashion,1,0.0
1,2023-01-07 20:43:36.007,6987.574420,4,M,M732195782,leisure,1,1.0
2,2023-01-06 00:13:24.170,1.165829,0,E,M349281107,contents,1,2.0
3,2023-01-07 07:06:30.770,4697.402428,1,F,M480139044,wellnessandbeauty,0,3.0
4,2023-01-06 13:44:56.971,106.728580,5,M,M349281107,wellnessandbeauty,0,4.0
...,...,...,...,...,...,...,...,...
35063,2023-01-07 01:01:53.614,403.839851,2,F,M2122776122,hyper,0,197.0
35064,2023-01-06 04:17:57.393,6550.755616,0,E,M1649169323,wellnessandbeauty,0,198.0
35065,2023-01-06 06:36:17.908,6514.552014,0,E,M1649169323,leisure,0,198.0
35066,2023-01-06 08:54:38.501,6511.151229,0,E,M1649169323,wellnessandbeauty,0,198.0
