<img src="https://raw.githubusercontent.com/fluidml/fluidml/main/logo/fluid_ml_logo.png" width="400px"/>

# Transformer based Sequence to Sequence Translation using FluidML
In this notebook, we utilize FluidML to implement a complete ML pipeline that performs text translation from German to English.  
Our translation pipeline consists of the following tasks:
- **Dataset loading**: Downloads and parses the Multi30K dataset used for translation.
- **Tokenizer training**: Trains and saves a Byte Pair Encoding (BPE) Tokenizer used for text encoding and decoding.
- **Dataset encoding**: Encodes the dataset with the trained BPE Tokenizer.
- **Model training**: Trains the Transformer based Sequence to Sequence Model.
- **Model selection**: Selects from all trained model variations (different hyperparameter sweeps) the best performing one based on the validation set.
- **Model evaluation**: Evaluates the best performing model on the test set by calculating the test loss and the bleu score.

With FluidML, all of these steps are naturally implemented as individual tasks which register their dependencies and are chained together to a task graph. This graph is then executed in parallel by FluidML and all results are stored persistently in a local file store (see the Storage section for details).

## Setup

To run this example it makes sense to install FluidML with the additional example requirements (Of course you can also manually install all dependencies. Check `transformer_seq2seq_translation.py` for a complete list).

In [None]:
!pip install fluidml[examples]

**Note**: Due to the limitation of multiprocessing and jupyter, we have to import our defined tasks and some helper classes from a separate script. Hence, our task definitions are located in `transformer_seq2seq_translation.py`, which not only implements the tasks but also the entire functionality of this example. So the interested reader can also go ahead and execute the just mentioned script. In order to still make this notebook self-explanatory, we provide Markdown code snippets of the individual task implementations at the place where we would have defined the task.

In [1]:
# Python internal imports
import multiprocessing
import os
from typing import List, Dict, Any, Optional, Tuple

# External imports
import torch

# FluidML imports
from fluidml import Flow, TaskSpec
from fluidml.visualization import visualize_graph_in_console
# Task imports, file store import and resource class import (see above note)
from transformer_seq2seq_translation import DatasetLoading, TokenizerTraining, DatasetEncoding, Training, ModelSelection, Evaluation
from transformer_seq2seq_translation import TaskResource, MyLocalFileStore

**Note 2**: If you want to use FluidML's logging capability, please configure a logger using Python's `logging` API. For convenience, we provide a simple utility function which configures a visually appealing logger (using a specific handler from the `rich` library).

In [2]:
from fluidml.logging import configure_logging
configure_logging(level='INFO')

## Storage - Saving Objects with FluidML

Before we start with the implementation of our translation pipeline, we take a brief look at FluidML's storage API.  
Out of the box FluidML provides three different storage options, which share the same interface (the interested user can implement his own storage, as long as the storage class inherits from `fluidml.storage.base.ResultsStore` and implements all abstract methods):
- **InMemoryStore**: If no store object is provided, this is internally the default. Every saved object is stored in an in-memory manager dictionary, which is shared across all tasks and processes. Once the entire pipeline is executed, the results dictionary is returned to the user and it is the user's responsibility to actually save the results e.g. to disc. This store is only recommended for quick prototyping and small result objects, since intermediate task results cannot be stored persistently and the memory might not be sufficient to hold all task results.
- **LocalFileStore**: A persistent file store implementation that out of the box supports saving files as .json and .pickle. It can be easily extended by the user to support arbitrary file types and save options.
- **MongoDBStore**: A persistent MongoDBStore implementation, which stores saved objects as binary strings via GridFS in a Mongo DB.

In this example we utilize the `LocalFileStore` and extend it with our own custom saving types.  
In order to save an object within a task, one simply calls
```python
self.save(obj=model_state_dict, name="best_model", type_="torch", sub_dir="models")
self.save(obj=some_dict, name="some_dict", type_="json")
self.save(obj=some_serializable_obj, name="some_serializable_obj", type_="pickle", sub_dir="some/sub/dir")
```
Below, we implement `MyLocalFileStore`, which inherits from `LocalFileStore` and extends it by adding save and load functions for torch models and tokenizer objects. 
We register new types to the `_type_registry` dictionary from `LocalFileStore` by providing a `TypeInfo` dataclass instance containing the type's save and load function and file extension.

```python
class MyLocalFileStore(LocalFileStore):
    def __init__(self, base_dir: str):
        super().__init__(base_dir=base_dir)

        self._type_registry["torch"] = TypeInfo(
            save_fn=self._save_torch, load_fn=self._load_torch, extension="pt", needs_path=True
        )
        self._type_registry["tokenizer"] = TypeInfo(
            save_fn=self._save_tokenizer, load_fn=self._load_tokenizer, extension="json", needs_path=True
        )

    @staticmethod
    def _save_torch(obj: Any, path: str):
        torch.save(obj, f=path)

    @staticmethod
    def _load_torch(path: str) -> Any:
        return torch.load(path)

    @staticmethod
    def _save_tokenizer(obj: Tokenizer, path: str):
        obj.save(path=path)

    @staticmethod
    def _load_tokenizer(path: str) -> Tokenizer:
        return Tokenizer.from_file(path)
```

## Task Definitions

The following 6 sections describe our task definitions in detail.

### 1. Dataset Loading

For this example we use the [Multi30K](https://github.com/multi30k/dataset) translation dataset, considering only German and English. The dataset was published with a fixed split of 29,000 train, 1,000 validation and 1,000 test German-English text pairs.

We implement this Task by creating a custom `DatasetLoading` class, which inherits from FluidML's `Task` class. To comply with our interface a custom task just has to implement a `run()` method, which FluidML will execute internally.

Here is our complete implementation of DatasetLoading (imported above):


```python
import gzip
import requests


class DatasetLoading(Task):
    def __init__(self, base_url: str, data_split_names: Dict[str, List]):
        super().__init__()
        self.base_url = base_url
        self.data_split_names = data_split_names

    @staticmethod
    def download_and_extract_gz_from_url(url: str) -> List[str]:
        # download gz compressed data
        data_gz = requests.get(url=url)
        # decompress downloaded gz data to bytes object
        data_bytes = gzip.decompress(data_gz.content)
        # decode bytes object to utf-8 encoded str and convert to list by splitting on new line chars
        data = data_bytes.decode("utf-8").splitlines()
        return data

    def run(self):
        task_run_dir = self.get_store_context().run_dir
        logger.info(f'Download and save raw dataset to "{task_run_dir}".')

        for split_name, files in self.data_split_names.items():
            dataset = {}
            for file_name in files:
                # create download url
                url = self.base_url + file_name
                language = file_name.split(".")[1]
                # download and parse data
                data = DatasetLoading.download_and_extract_gz_from_url(url=url)
                dataset[language] = data
            # save train-, valid- and test-data as json via local file store
            self.save(obj=dataset, name=f"{split_name}_data", type_="json")

```

### 2. Tokenizer Training

We utilize Huggingface's [Tokenizers](https://github.com/huggingface/tokenizers) library to train a Byte Pair Encoding (BPE) tokenizer for the German and for the English sentences.


```python
from tokenizers.implementations import CharBPETokenizer
from tokenizers.processors import TemplateProcessing


class TokenizerTraining(Task):
    def __init__(self, vocab_size: int, min_frequency: int):
        super().__init__()
        self.vocab_size = vocab_size
        self.min_frequency = min_frequency

    def train_tokenizer(self, data: List[str]):
        # initialize and train a tokenizer
        tokenizer = CharBPETokenizer()
        tokenizer.train_from_iterator(
            iterator=data,
            vocab_size=self.vocab_size,
            min_frequency=self.min_frequency,
            special_tokens=["<unk>", "<bos>", "<eos>", "<pad>"],
            show_progress=True,
        )

        # add template rule to automatically add <bos> and <eos> to the encoding
        tokenizer.post_processor = TemplateProcessing(
            single="<bos> $A <eos>",
            pair=None,
            special_tokens=[
                ("<bos>", tokenizer.token_to_id("<bos>")),
                ("<eos>", tokenizer.token_to_id("<eos>")),
            ],
        )
        return tokenizer

    def run(self, train_data: Dict[str, List[str]]):
        task_run_dir = self.get_store_context().run_dir

        # train german tokenizer
        de_tokenizer = self.train_tokenizer(data=train_data["de"])

        # train english tokenizer
        en_tokenizer = self.train_tokenizer(data=train_data["en"])

        # save tokenizers
        logger.info(f'Save trained tokenizers to "{task_run_dir}".')
        self.save(obj=de_tokenizer, name="de_tokenizer", type_="tokenizer")

```

### 3. Dataset Encoding

We use the trained tokenizers for German and English to encode the previously saved datasets and save them as json files.  
**Note**: FluidML automatically collects the required task inputs from the saved predecessor task results; in this case the datasets saved from `DatasetLoading` and the tokenizers trained in `TokenizerTraining`.


```python
class DatasetEncoding(Task):
    def __init__(self):
        super().__init__()

    @staticmethod
    def encode_data(
        data: Dict[str, List[str]], src_tokenizer: Tokenizer, trg_tokenizer: Tokenizer
    ) -> List[Tuple[List[int], List[int]]]:

        src_encoded = src_tokenizer.encode_batch(data["de"])
        trg_encoded = trg_tokenizer.encode_batch(data["en"])
        return [(src.ids, trg.ids) for src, trg in zip(src_encoded, trg_encoded)]

    def run(
        self,
        train_data: Dict[str, List[str]],
        valid_data: Dict[str, List[str]],
        test_data: Dict[str, List[str]],
        de_tokenizer: Tokenizer,
        en_tokenizer: Tokenizer,
    ):
        task_run_dir = self.get_store_context().run_dir

        train_encoded = DatasetEncoding.encode_data(train_data, de_tokenizer, en_tokenizer)
        valid_encoded = DatasetEncoding.encode_data(valid_data, de_tokenizer, en_tokenizer)
        test_encoded = DatasetEncoding.encode_data(test_data, de_tokenizer, en_tokenizer)

        logger.info(f'Save encoded dataset to "{task_run_dir}".')
        self.save(obj=train_encoded, name="train_encoded", type_="json")
        self.save(obj=valid_encoded, name="valid_encoded", type_="json")
        self.save(obj=test_encoded, name="test_encoded", type_="json")

```

### 4. Model Training

Our dataset is encoded using the trained BPE tokenizers, so the next step is to train the actual translation model. We utilize the well known transformer architecture first described in the [Attention is all you need](https://papers.nips.cc/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf) paper. Both, the encoder and decoder consist of several multi-head attention layers, which are the backbone of this architecture. Since this tutorial is about FluidML, we won't go into the implementational details of a transformer model. We refer the interested reader to [Ben Trevett's tutorial on transformers for translation](https://github.com/bentrevett/pytorch-seq2seq/blob/master/6%20-%20Attention%20is%20All%20You%20Need.ipynb). Also, the pytorch transformer implementation used in this example is taken from Ben's tutorial.

**Note**: The custom pytorch dataset, `TranslationDataset`, and the batch collate callable, `BatchCollator`, implementations can be also found in the above mentioned script, `transformer_seq2seq_translation.py`, from where the task classes are imported. Due to its complexity the transformer model implementation is also imported in this notebook and can be found in the `transformer_model.py` script. The definition of the `set_seed()` function is also located in `transformer_seq2seq_translation.py`.

You also might note that we use `self.resource.device` in our training task without explicitly defining it in the `__init__()` method. When initializing FluidML's main classes `Flow` and `Swarm`, the user can optionally provide a list of resources that will be made available to all tasks and processes. In a machine learning context such resources could be but are not limited to cuda devices (see this example). Below we will go through how to define and provide the list of resources to `Flow` and how the resources will be automatically distributed to tasks.


```python
from datetime import datetime

import torch
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm


class Training(Task):
    def __init__(
        self,
        hid_dim: int,
        enc_layers: int,
        dec_layers: int,
        enc_heads: int,
        dec_heads: int,
        enc_pf_dim: int,
        dec_pf_dim: int,
        enc_dropout: float,
        dec_dropout: float,
        learning_rate: float,
        clip_grad: float,
        train_batch_size: int,
        valid_batch_size: int,
        num_epochs: int,
        seed: int,
    ):
        super().__init__()

        # transformer model parameters
        self.hid_dim = hid_dim
        self.enc_layers = enc_layers
        self.dec_layers = dec_layers
        self.enc_heads = enc_heads
        self.dec_heads = dec_heads
        self.enc_pf_dim = enc_pf_dim
        self.dec_pf_dim = dec_pf_dim
        self.enc_dropout = enc_dropout
        self.dec_dropout = dec_dropout

        # optimizer parameters
        self.learning_rate = learning_rate
        self.clip_grad = clip_grad

        # dataloader and training loop parameters
        self.train_batch_size = train_batch_size
        self.valid_batch_size = valid_batch_size
        self.num_epochs = num_epochs
        self.seed = seed

    def _init_training(self, input_dim: int, output_dim: int, src_pad_idx: int, trg_pad_idx: int):
        """Initialize all training components."""

        # initialize the encoder and decoder block
        enc = Encoder(
            input_dim,
            self.hid_dim,
            self.enc_layers,
            self.enc_heads,
            self.enc_pf_dim,
            self.enc_dropout,
            self.resource.device,
        )

        dec = Decoder(
            output_dim,
            self.hid_dim,
            self.dec_layers,
            self.dec_heads,
            self.dec_pf_dim,
            self.dec_dropout,
            self.resource.device,
        )

        # initialize the full transformer sequence to sequence model
        model = Seq2SeqTransformer(enc, dec, src_pad_idx, trg_pad_idx, self.resource.device).to(self.resource.device)

        # initialize the optimizer
        optimizer = torch.optim.Adam(model.parameters(), lr=self.learning_rate)

        # initialize the loss criterion
        criterion = nn.CrossEntropyLoss(ignore_index=trg_pad_idx)
        return model, optimizer, criterion

    def _train_epoch(self, model, iterator, optimizer, criterion):
        """Train loop to iterate over batches"""
        model.train()

        epoch_loss = 0

        for i, (src, trg) in enumerate(iterator):

            optimizer.zero_grad()

            output, _ = model(src, trg[:, :-1])
            output_dim = output.shape[-1]
            output = output.contiguous().view(-1, output_dim)  # [batch size * trg len - 1, output dim]
            trg = trg[:, 1:].contiguous().view(-1)  # [batch size * trg len - 1]

            loss = criterion(output, trg)
            loss.backward()

            torch.nn.utils.clip_grad_norm_(model.parameters(), self.clip_grad)

            optimizer.step()
            epoch_loss += loss.item()
        return epoch_loss / len(iterator)

    @staticmethod
    def validate_epoch(model, iterator, criterion):
        """Validation loop to iterate over batches"""
        model.eval()

        epoch_loss = 0

        with torch.no_grad():
            for src, trg in iterator:

                output, _ = model(src, trg[:, :-1])
                output_dim = output.shape[-1]
                output = output.contiguous().view(-1, output_dim)  # [batch size * trg len - 1, output dim]
                trg = trg[:, 1:].contiguous().view(-1)  # [batch size * trg len - 1]

                loss = criterion(output, trg)
                epoch_loss += loss.item()
        return epoch_loss / len(iterator)

    def _train(self, model, train_iterator, valid_iterator, optimizer, criterion):
        """Train loop."""
        task_run_dir = self.get_store_context().run_dir
        model_dir = os.path.join(task_run_dir, "models")
        logger.info(f'Save model checkpoints to "{model_dir}".')

        best_valid_loss = float("inf")
        best_model = None

        for epoch in range(self.num_epochs):

            start_time = datetime.now()
            train_loss = self._train_epoch(model, train_iterator, optimizer, criterion)
            valid_loss = Training.validate_epoch(model, valid_iterator, criterion)
            end_time = datetime.now()

            # if the current validation loss is below the previous best, update the best loss and
            # save the new best model.
            if valid_loss < best_valid_loss:
                best_valid_loss = valid_loss
                best_model = model.state_dict()
                self.save(obj=best_model, name="best_model", type_="torch")
                self.save(obj={"epoch": epoch, "valid_loss": best_valid_loss}, name="best_model_metric", type_="json")

            logger.info(
                f"\nEpoch: {epoch + 1:02} | Time: {end_time - start_time}\n"
                f"\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}\n"
                f"\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}"
            )

        assert best_model is not None
        return best_model, best_valid_loss

    def run(
        self,
        train_encoded: List[Tuple[List[int], List[int]]],
        valid_encoded: List[Tuple[List[int], List[int]]],
        de_tokenizer: Tokenizer,
        en_tokenizer: Tokenizer,
    ):
        set_seed(self.seed)

        # instantiate the collate fn for the dataloader
        batch_collator = BatchCollator(
            de_pad_idx=de_tokenizer.token_to_id("<pad>"),
            en_pad_idx=en_tokenizer.token_to_id("<pad>"),
            device=self.resource.device,
        )

        # instantiate train and validation datasets using a pytorch's Dataset class
        train_dataset = TranslationDataset(data=train_encoded)
        valid_dataset = TranslationDataset(data=valid_encoded)

        # instantiate train and validation dataloader
        train_iterator = DataLoader(
            train_dataset, batch_size=self.train_batch_size, shuffle=True, collate_fn=batch_collator
        )
        valid_iterator = DataLoader(
            valid_dataset, batch_size=self.valid_batch_size, shuffle=False, collate_fn=batch_collator
        )

        input_dim = de_tokenizer.get_vocab_size()
        output_dim = en_tokenizer.get_vocab_size()
        src_pad_idx = de_tokenizer.token_to_id("<pad>")
        trg_pad_idx = en_tokenizer.token_to_id("<pad>")

        # instantiate all training components
        model, optimizer, criterion = self._init_training(
            input_dim=input_dim, output_dim=output_dim, src_pad_idx=src_pad_idx, trg_pad_idx=trg_pad_idx
        )

        # train the model on the training set and evaluate after every epoch on the validation set
        self._train(
            model=model,
            train_iterator=train_iterator,
            valid_iterator=valid_iterator,
            optimizer=optimizer,
            criterion=criterion,
        )

```

### 5. Model Selection

We have not yet talked about grid search and training several models with different hyperparameter combinations in parallel. FluidML provides a simple interface to allow just that, which we will describe further below when instantiating our tasks. For now let's assume that the previous tasks might have been executed multiple times with a set of different parameter combinations yielding several trained model variations.

This task is a so called `reduce=True` task (will be used when instantiating the task), which means that it collects the results from all predecessor variations in order to compare and select the best performing variation. In this example it selects the best model variation based on the validation loss performance. 

**Note**: The `run` method of a reduce task loads the defined predecessor results as `List[Sweep]` objects. In this example `best_model_metric` is a `List[Sweep]` which holds the respective value and config of all expanded training runs. A `Sweep` object is a simple `dataclass` with 2 attributes, `value` and `config`.


```python
class ModelSelection(Task):
    def __init__(self):
        super().__init__()

    @staticmethod
    def _select_best_model_from_sweeps(best_model_metric: List[Sweep]) -> Dict:
        config = None
        best_valid_loss = float("inf")
        for sweep in best_model_metric:
            if sweep.value["valid_loss"] <= best_valid_loss:
                best_valid_loss = sweep.value["valid_loss"]
                config = sweep.config
        return config

    def run(self, best_model_metric: List[Sweep]):
        task_run_dir = self.get_store_context().run_dir

        # select the best run config by comparing model performances from different parameter sweeps
        # on the validation set
        best_run_config = self._select_best_model_from_sweeps(best_model_metric=best_model_metric)

        logger.info(f'Save best run config to "{task_run_dir}".')
        self.save(obj=best_run_config, name="best_run_config", type_="json")

```

### 6. Model Evaluation

This is the final task in our pipeline. It expects the previously determined `best_run_config` dictionary as input, loads the corresponding best model and tokenizers and evaluates said model on the test dataset.  
First, we calculate the test set loss and perplexity. Second, we calculate the test set bleu score, since this is the standard metric of evaluating machine translation models.



```python
class Evaluation(Task):
    def __init__(self, test_batch_size: int, seed: int):
        super().__init__()

        self.batch_size = test_batch_size
        self.seed = seed

    def _init_model(
        self, train_config: Dict, input_dim: int, output_dim: int, src_pad_idx: int, trg_pad_idx: int
    ) -> nn.Module:
        """Initialize the model and its components."""

        enc = Encoder(
            input_dim,
            train_config["hid_dim"],
            train_config["enc_layers"],
            train_config["enc_heads"],
            train_config["enc_pf_dim"],
            train_config["enc_dropout"],
            self.resource.device,
        )

        dec = Decoder(
            output_dim,
            train_config["hid_dim"],
            train_config["dec_layers"],
            train_config["dec_heads"],
            train_config["dec_pf_dim"],
            train_config["dec_dropout"],
            self.resource.device,
        )

        model = Seq2SeqTransformer(enc, dec, src_pad_idx, trg_pad_idx, self.resource.device).to(self.resource.device)
        return model

    def translate_sentence(self, src_encoded, bos_idx, eos_idx, model, max_len=50):
        """Translate an encoded sentence."""

        model.eval()

        src_tensor = torch.LongTensor(src_encoded).unsqueeze(0).to(self.resource.device)
        src_mask = model.make_src_mask(src_tensor)

        with torch.no_grad():
            enc_src = model.encoder(src_tensor, src_mask)

        trg_indices = [bos_idx]
        for i in range(max_len):
            trg_tensor = torch.LongTensor(trg_indices).unsqueeze(0).to(self.resource.device)
            trg_mask = model.make_trg_mask(trg_tensor)
            with torch.no_grad():
                output, attention = model.decoder(trg_tensor, enc_src, trg_mask, src_mask)
            pred_token = output.argmax(2)[:, -1].item()
            trg_indices.append(pred_token)
            if pred_token == eos_idx:
                break

        return trg_indices[1:]

    def calculate_bleu(self, data_encoded, en_tokenizer, model, max_len=50):
        """Calculate the bleu score on the test set."""

        trgs = []
        pred_trgs = []
        bos_idx = en_tokenizer.token_to_id("<bos>")
        eos_idx = en_tokenizer.token_to_id("<eos>")

        worker_name = multiprocessing.current_process().name
        with tqdm(
            desc=f"{worker_name} - Calculating BLEU",
            total=len(data_encoded),
            unit="sample",
            ascii=False,
        ) as progress_bar:
            for src, trg in data_encoded:

                pred_trg = self.translate_sentence(src, bos_idx, eos_idx, model, max_len)

                # cut off <eos> token
                pred_trg = pred_trg[:-1]

                pred_trg_decoded = en_tokenizer.decode(pred_trg)
                pred_trgs.append(pred_trg_decoded.split())

                trg_decoded = en_tokenizer.decode(trg)
                trgs.append([trg_decoded.split()])
                progress_bar.update()

        return bleu_score(pred_trgs, trgs)

    def run(self, best_run_config: Dict):
        set_seed(self.seed)

        # load the best model, test-data and the tokenizers based on the previously selected best run config
        model_state_dict = self.load(name="best_model", task_name="Training", task_unique_config=best_run_config)
        test_encoded = self.load(name="test_encoded", task_name="DatasetEncoding", task_unique_config=best_run_config)
        de_tokenizer = self.load(name="de_tokenizer", task_name="TokenizerTraining", task_unique_config=best_run_config)
        en_tokenizer = self.load(name="en_tokenizer", task_name="TokenizerTraining", task_unique_config=best_run_config)

        # instantiate the batch collator
        batch_collator = BatchCollator(
            de_pad_idx=de_tokenizer.token_to_id("<pad>"),
            en_pad_idx=en_tokenizer.token_to_id("<pad>"),
            device=self.resource.device,
        )

        # instantiate the test dataset
        test_dataset = TranslationDataset(data=test_encoded)

        # instantiate the test dataloader
        test_iterator = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False, collate_fn=batch_collator)

        input_dim = de_tokenizer.get_vocab_size()
        output_dim = en_tokenizer.get_vocab_size()
        src_pad_idx = de_tokenizer.token_to_id("<pad>")
        trg_pad_idx = en_tokenizer.token_to_id("<pad>")

        # instantiate the transformer model
        model = self._init_model(
            train_config=best_run_config["Training"],
            input_dim=input_dim,
            output_dim=output_dim,
            src_pad_idx=src_pad_idx,
            trg_pad_idx=src_pad_idx,
        )
        model.load_state_dict(model_state_dict)

        # instantiate the loss criterion
        criterion = nn.CrossEntropyLoss(ignore_index=trg_pad_idx)

        # evaluate the model on the test set -> calculate the test set loss and perplexity
        test_loss = Training.validate_epoch(model=model, iterator=test_iterator, criterion=criterion)
        logger.info(f"| Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f} |")

        # calculate the model's bleu score on the test set
        bleu = self.calculate_bleu(test_encoded, en_tokenizer, model)
        logger.info(f"BLEU score = {bleu * 100:.2f}")

```

## Create and Run the Pipeline/Task-Graph via FluidML

So far, we have looked into implementing our individual pipeline steps using FluidML's Task class and it was very straightforward.
You might be wondering, how to put these tasks together and make them work together as a single pipeline?

Thanks to FluidML's TaskSpec API, you can connect these tasks like Lego blocks :)

### 1. Instantiate Task Specs
`TaskSpec` is a simple wrapper class that allows specification of task details and task arguments which will be used during instantiation of the task.
Let's go ahead and create specs for all our tasks.

**Note 1**: If you want to do a grid search on a specific task, e.g. `TokenizerTraining` and `Training`, you should set the `expand` parameter either to `"product"` or `"zip"`. For a task where no hyperparameter tuning or grid search is required, `expand` should be set to `None` or simply not declared.

Below we define for each task the necessary parameter dictionary which we feed into the task spec class.  

**Note 2**: All parameters of a `TaskSpec` with the `expand` parameter set to `"product"` or `"zip"`, that are stored in a list, will be automatically expanded by `Flow` depending on the selected grid search expansion method. Namely, `expand="product"` (default) will create different task instances for each explicit cross-product parameter combination. For example, considering the `Training` task, internally flow will instantiate 4 train tasks with the cross product combinations of different `train_batch_size` and `learning_rate`. Alternatively, `expand="zip"` expands the config parameters by zipping over the different parameter lists. For example, `config = {"learning_rate": [0.1, 0.01, 0.001], "batch_size": [64, 128, 256]}` would expand to three distinct configs with the respective learning rates and batch sizes of `(0.1, 64), (0.01, 128), (0.001, 256)`. When choosing `"zip"` one has to make sure that all parameter lists are of equal lengths.

**Note 3**: If you have to provide a parameter to a `TaskSpec` with the `expand` parameter set to `"product"` or `"zip"`, which is of type `List` and it should not get expanded, you have to wrap it again in a second list. E.g. `layer_dimensions: [[64, 128, 64]]`.

**Note 4**: The `ModelSelection` task is a so called `reduce=True` task, which means it receives the combined input of all expanded direct predecessor tasks (in this case 4 train tasks).

In [3]:
# create all task specs
seed = 1234
dataset_loading_params = {
        "base_url": "https://raw.githubusercontent.com/multi30k/dataset/" "master/data/task1/raw/",
        "data_split_names": {
            "train": ["train.de.gz", "train.en.gz"],
            "valid": ["val.de.gz", "val.en.gz"],
            "test": ["test_2016_flickr.de.gz", "test_2016_flickr.en.gz"],
        },
    }

tokenizer_training_params = {"vocab_size": 30000, "min_frequency": 2}

training_params = {
    "hid_dim": 256,
    "enc_layers": 3,
    "dec_layers": 3,
    "enc_heads": 8,
    "dec_heads": 8,
    "enc_pf_dim": 512,
    "dec_pf_dim": 512,
    "enc_dropout": 0.1,
    "dec_dropout": 0.1,
    "learning_rate": [0.0005, 0.001],
    "clip_grad": 1.0,
    "train_batch_size": [128, 256],
    "valid_batch_size": 128,
    "num_epochs": 10,
    "seed": seed,
}

evaluation_params = {"test_batch_size": 128, "seed": seed}

# create all task specs
dataset_loading = TaskSpec(task=DatasetLoading, config=dataset_loading_params)
tokenizer_training = TaskSpec(task=TokenizerTraining, config=tokenizer_training_params)
dataset_encoding = TaskSpec(task=DatasetEncoding)
training = TaskSpec(task=Training, config=training_params, expand="product")
model_selection = TaskSpec(task=ModelSelection, reduce=True)
evaluate = TaskSpec(task=Evaluation, config=evaluation_params)

### 2. Registering all Task Dependencies
After having instantiated all task specs, we utilize the `requires()` method to register dependencies between tasks.

Using these task dependencies, FluidML's `Flow` class properly expands all `GridTaskSpecs` and creates a task graph. Next, FluidML's `Swarm` class internally schedules and performs the parallel task executions considering the registered dependencies.

In [4]:
# register dependencies between tasks

tokenizer_training.requires(dataset_loading)
dataset_encoding.requires(tokenizer_training, dataset_loading)
training.requires(dataset_encoding, tokenizer_training)
model_selection.requires(training)
evaluate.requires(model_selection)

### 3. Creating the List of Task Spec Instances
We pack all these task specs in a list which gets passed to FluidML. `Flow` internally creates the task instances.

In [5]:
# all tasks
tasks = [
    dataset_loading,
    tokenizer_training,
    dataset_encoding,
    training,
    model_selection,
    evaluate,
]

### 4. Setting all Meta Parameters

In [6]:
# dynamically get the directory of this script
current_dir = os.path.abspath('')

# define the base directory where all our task results will be stored in a structured way using LocalFileStore
base_dir = os.path.join(current_dir, 'seq2seq_experiments')

# select the number of workers (processes used to execute tasks in parallel)
num_workers = 4
 
# set force to 
#  1) 'all' if all tasks in the pipeline have to be force-executed.
#  2) None if already existing tasks are skipped so that results can be loaded from the store.
#  3) a task name (eg. "PreProcessTask") or list of task names (eg. ["PreProcessTask1", "PreProcessTask2])
#     Additionally, each task name can have the suffix '+' to re-run also its successors (eg. "PreProcessTask+")
force = None

# try to use cuda GPU's if available
use_cuda = True

# define which GPU's should be used
cuda_ids = [0]

### 5. Define and instantiate Resources to share across all Tasks

We mentioned already during the `Training` task that FluidML enables the user to conveniently share resources across all tasks instead of providing them explicitely to each task individually.  
The user achieves this by creating his own Resource dataclass or any other resource, like a list of available GPUs. In the dataclass used here we define all resources, in our case only the cuda device, which we make available to all tasks through the `self.resource` attribute.

```python
@dataclass
class TaskResource():
    device: str
```

Further, we utilize a little helper function to distribute our available cuda devices euqally across the number of defined workers.  
E.g. let's assume we selected `num_workers = 4` and we have access to two GPU's, the below function would return the following balanced list of devices:

```python
print(devices)
-> ['cuda:0', 'cuda:1', 'cuda:0', 'cuda:1']
```

In [7]:
def get_balanced_devices(
    count: Optional[int] = None, use_cuda: bool = True, cuda_ids: Optional[List[int]] = None
) -> List[str]:
    count = count if count is not None else multiprocessing.cpu_count()
    if use_cuda and torch.cuda.is_available():
        if cuda_ids is not None:
            devices = [f"cuda:{id_}" for id_ in cuda_ids]
        else:
            devices = [f"cuda:{id_}" for id_ in range(torch.cuda.device_count())]
    else:
        devices = ["cpu"]
    factor = int(count / len(devices))
    remainder = count % len(devices)
    devices = devices * factor + devices[:remainder]
    return devices

devices = get_balanced_devices(count=num_workers, use_cuda=use_cuda, cuda_ids=cuda_ids)

Finally, we create our list of resource objects which we will feed to the `Flow` class during instantiation.
**Note**: `len(resources) == num_workers`

In [8]:
# create list of resources
resources = [TaskResource(device=devices[i]) for i in range(num_workers)]

### 6. Instantiate the previously defined File Store

In [9]:
# create local file storage used for versioning
results_store = MyLocalFileStore(base_dir=base_dir)

### 7. Run the Pipeline/Task-Graph

All that is left is to create the `Flow` object, which will handle executing the created task graph in parallel and builds the user defined task specifier graph and the expanded task graph. The latter two can be easily accessed as attributes and visualized using our console rendering utility function. Finally, we execute the expanded task graph by calling `flow.run()`

All saved task results will be availabe in the previously selected output directory (if not changed: `seq2seq_experiments`).  
Feel free to play around with different hyperparameter combinations to improve your model's translation performance.

In [10]:
# create flow (expanded task graph)
flow = Flow(tasks=tasks)

# visualize graphs
visualize_graph_in_console(flow.task_spec_graph, use_pager=True, use_unicode=True)
visualize_graph_in_console(flow.task_graph, use_pager=True, use_unicode=True)

# run linearly without swarm if num_workers is set to 1
# else run graph in parallel using multiprocessing
# create list of resources which is distributed among workers
# e.g. to manage that each worker has dedicated access to specific gpus
flow.run(
    num_workers=num_workers,
    resources=resources,
    results_store=results_store,
    project_name="transformer_seq2seq_translation_example",
    force=force,
)

task spec graph
                 ╭────────────────╮                   
                 │ DatasetLoading │                   
                 ╰────────────────╯                   
                  ··             ···                  
               ···                  ···               
             ··                        ··             
╭───────────────────╮                    ··           
│ TokenizerTraining │                     ·           
╰───────────────────╯···                  ·           
          ·             ······            ·           
          ·                   ·····       ·           
          ·                        ···    ·           
          ··                    ╭─────────────────╮   
            ···                 │ DatasetEncoding │   
               ···              ╰─────────────────╯   
                  ··             ···                  
                    ···        ··                     
                       ··    ··                  

task graph
                                                        ╭────────────────╮                                                           
                                                        │ DatasetLoading │                                                           
                                                        ╰────────────────╯·                                                          
                                                           ···             ····                                                      
                                                          ·                    ···                                                   
                                                       ···                        ····                                               
                                                      ·                               ····                                           
                                                   

Dolphin-2 - Calculating BLEU: 100%|██████████| 1000/1000 [00:52<00:00, 19.05sample/s]


<img src="https://raw.githubusercontent.com/fluidml/fluidml/main/logo/fluid_ml_logo.png" width="400px"/>