# **DataPipeline Guide**

This notebook is a guide for DataPipeline, a configurable streaming dataset loader. DataPipeline's core functionality is to convert each row of a tabular dataset into a vector that Bolt can understand. What makes DataPipeline unique is that it allows you to configure what features to extract from a dataset and how to encode them as vectors. We do this by using what we call "blocks", objects that read specific columns of each row in a dataset and adds a feature segment to the row's corresponding vector.

To illustrate, let's take a look at an example use case. First, we generate a mock dataset. Each row of this dataset has two comma-delimited columns. The first column contains a class ID and the second column contains a sentence. This is typical of a text classification dataset, where the task is to predict the class ID from the given text.

In [None]:
n_classes = 3
n_samples = 10_000

with open("test.csv", "w") as f:
    for i in range(n_samples):
        sentiment = i % 3
        if sentiment == 0:
            f.write("0,bad stuff\n")
        elif sentiment == 1:
            f.write("1,good stuff\n")
        else:
            f.write("2,neutral stuff\n")

Given the shape of our dataset and the task that it is used for, we want to pass vectors that represent text from the second column to our Bolt model as input and pass vectors that represent categorical information from the first column as labels. This is what that looks like with DataPipeline.

In [None]:
from thirdai.dataset import DataPipeline, blocks

pipeline = DataPipeline(
    filename="test.csv", 
    input_blocks=[blocks.TextUniGram(col=1, dim=100_000)],
    label_blocks=[blocks.NumericalId(col=0, n_classes=n_classes)],
    batch_size=256)

We've covered the main ideas, so now let's have a closer look at how we can configure this more.

In [None]:
from thirdai.dataset import text_encodings, categorical_encodings

pipeline = DataPipeline(
    filename="test.csv", 
    input_blocks=[
        blocks.TextUniGram(col=1, dim=100_000),
        blocks.TextPairGram(col=1, dim=100_000),
        blocks.TextCharKGram(col=1, k=3, dim=100_000), # Character trigrams
        blocks.NumericalId(col=0, n_classes=2), # Default encoding for categorical block
    ],
    label_blocks=[blocks.NumericalId(col=0, n_classes=n_classes)],
    batch_size=256,
    has_header=False, # Pipeline discards the header if the dataset has one.
    delimiter=',') # Any character is a valid delimiter.

# **Use case**
As shown above, DataPipeline can be quite flexible – in fact, too flexible to be given to customers. However, this can be useful in cases where we don't know how to best encode features for a new dataset or task. We can try different combinations of features without writing new data processing scripts, without recompiling C++ code after each iteration, and with the speed of a data-parallel C++ implementation. We can also use this tool to prototype new end-to-end models. For example, suppose I want to create an end-to-end sequential recommendation system. Instead of building a data loader from scratch, I can wrap DataPipeline in an "autotuner" of sorts. Of course, a hard-coded specialized system can be faster than a generic one, so we can then profile this prototype and hard-code the system to speed it up as needed. 

# **Limitations**
You may also have noticed that we don't have that many blocks at all! This is because we have only implemented the bare minimum for a specific use case. The good news is that the block interface is highly extendable and we can keep adding new blocks as we see fit. To do this, simply add a new implementation of the `Block` abstract class found in `dataset/src/blocks/BlockInterface.h`, then write python bindings and tests as necessary. You can refer to `dataset/src/blocks/Text.h` as an example `Block` implementation.

# **Using DataPipeline with Bolt**
Finally, we'll look at how we can pass the data to Bolt. While DataPipeline supports streaming, the Bolt Python API currently does not support it, so we will load the entire dataset into memory for now.

In [None]:
from thirdai import bolt

pipeline = DataPipeline(
    filename="test.csv", 
    input_blocks=[blocks.Text(col=1, dim=100_000)],
    label_blocks=[blocks.NumericalId(col=0, n_classes=n_classes)],
    batch_size=256)

# Load into memory and get input as well as label dimensions.
data, labels = pipeline.load_in_memory()
input_dim = pipeline.get_input_dim()
n_classes = pipeline.get_label_dim()

layers = [
    bolt.FullyConnected(
        dim=1000,
        sparsity=0.1,
        activation_function=bolt.ActivationFunctions.ReLU,
    ),
    bolt.FullyConnected(
        dim=n_classes, activation_function=bolt.ActivationFunctions.Softmax
    ),
]

network = bolt.Network(layers=layers, input_dim=input_dim)

batch_size = 256
learning_rate = 0.001
epochs = 1
for i in range(epochs):
    network.train(
        train_data=data,
        train_labels=labels,
        batch_size=batch_size,
        loss_fn=bolt.CategoricalCrossEntropyLoss(),
        learning_rate=learning_rate,
        epochs=1,
        verbose=True,
    )
    metrics, preds = network.predict(
        test_data=data,
        test_labels=labels,
        batch_size=batch_size,
        metrics=["categorical_accuracy"],
        verbose=True,
    )