## It's coding time!

<center>
<div>
<img src="Images/Lecture-3/programming_skills.png" width="1800" alt='programming_skills'/>
</div>
</center>

<br/>
<br/>
<br/>

<center>
<div>
<img src="Images/Lecture-3/framework_knowledge.png" width="2200" alt='framework_knowledge'/>
</div>
</center>

## Why do we need this lecture?

Whether you like it or not, experimental setting might require you to do some **coding stuff**.

Coding translates to: 

1. Transparency (*don't you dare do some cheap tricks!*)
2. Correctness (*your code should reflect your paper statements*) 
3. **Readability** (*please, don't make this a nightmare*)
4. **Efficiency** (*time is money*)
5. **Maintainability** (*I'm sure you'll re-use this code*)

We should have an idea about [1-2] from past lectures!

We mainly focus on [4] in this lecture, while we provide some tips & tricks concerning [3, 5] in the next lecture

## What are we going to cover?

- Dataset encoding and pre-processing pipeline
- Modeling

# Preliminaries

### Requirements.txt

In [None]:
## Install if needed
absl_py==1.4.0
keras==2.9.0
matplotlib==3.5.3
memory_profiler==0.61.0
numpy==1.21.6
pandas==1.3.5
scikit_learn==1.0.2
tensorflow_gpu==2.9.0
torch==1.13.1
torchdata==0.5.1
torchtext==0.14.1
tqdm==4.64.1

#### How did you do that?

In [None]:
!pip install pipreqs

pipreqs /path/to/project-directory

### Working environment

In this lecture, we are going to alternate between jupyter notebook and Pycharm code

#### Why?

- Coding on Jupyter is hellish (*don't do it!*)
- I suck with notebooks and I wasn't able to even run a script
- **Debugging** (*you know what I'm talking about, don't you?*)

### Task

For simplicity, we are going to consider a NLP classification task on Argument Mining

### Data

We are going to consider a medium-size dataset to show the benefits of efficient coding $\rightarrow$ time is precious!

#### IBM2015 text dataset

- Dataset for Argument Mining: we use for argument sentence detection (binary classification task)
- ~82k sentences from Wikipedia articles
- 58 controversial topics (e.g., violent videogames)


### Data Loading

First of all, we need to load our dataset

#### Script

```dataset_loading/ibm2015_loader.py```

In [None]:
class IBM2015Loader:

    def __init__(
            self,
            load_path: AnyStr
    ):
        self.load_path = load_path

    def load(
            self
    ) -> [pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        df = pd.read_csv(self.load_path)
        train_test_df = df[df['Data-set'] == 'train and test']
        val_df = df[df['Data-set'] == 'held-out']

        splitter = GroupShuffleSplit(n_splits=1, test_size=0.2, train_size=0.8)
        train_indexes, test_indexes = list(splitter.split(X=train_test_df['Sentence'].values,
                                                          y=train_test_df['Label'].values,
                                                          groups=train_test_df['Topic id'].values))[0]
        train_df = train_test_df.iloc[train_indexes]
        test_df = train_test_df.iloc[test_indexes]

        return train_df, val_df, test_df

In [None]:
@evaluate_time
def load_ibm2015_dataset(
        samples_amount: int = -1
):
    # Setting
    this_path = os.path.dirname(os.path.abspath(__file__))
    base_dir = os.path.normpath(os.path.join(this_path,
                                             os.pardir,
                                             os.pardir,
                                             os.pardir))
    log_dir = os.path.join(base_dir, 'logs')
    if not os.path.isdir(log_dir):
        os.makedirs(log_dir)

    Logger.set_log_path(name='logger',
                        log_path=log_dir)
    logger = Logger.get_logger(__name__)

    load_path = os.path.normpath(os.path.join(base_dir,
                                              'data',
                                              'lecture_three',
                                              'dataset.csv'))
    logger.info(f'Attempting to load dataset from path: {load_path}')

    # Actually loading the data
    loader = IBM2015Loader(load_path=load_path)
    train_df, val_df, test_df = loader.load()

    if samples_amount > 0:
        logger.info(f'Samples amount given: {samples_amount} -- Taking a slice of retrieved datasets...')
        train_df = train_df[:samples_amount]
        val_df = val_df[:samples_amount]
        test_df = test_df[:samples_amount]

    logger.info(f'''Loaded data: 
                Train: {train_df.shape}
                Val: {val_df.shape}
                Test: {test_df.shape}
                ''')
    return train_df, val_df, test_df

# Dataset encoding and pre-processing

## Tensorflow

Tensorflow has a good support for efficiently handling **data streams** via [tf.data.Dataset](https://www.tensorflow.org/api_docs/python/tf/data/Dataset)

### What are we going to see

- Naive pipeline
- tf.data.Dataset.from_generator pipeline
- tf.data.Dataset.from_generator w/ tf.py_function pipeline
- tf.data.Dataset.from_slices pipeline
- tf.data.TFRecordDataset
- tf.data.TFRecordDataset w/ multi-processing
- [**bonus**] tf.data.Dataset w/ checkpointing

### Naive Pipeline

In [None]:
class Preprocessor:

    def __init__(
            self
    ):
        self.tokenizer = Tokenizer()

    def setup(
            self,
            train_df: pd.DataFrame
    ):
        # ...

    def preprocess_text(
            self,
            text: str
    ) -> str:
        # ...

    def parse_inputs(
            self,
            df: pd.DataFrame,
    ) -> [np.ndarray, np.ndarray]:
        # ...

    def get_steps(
            self,
            data: np.ndarray
    ) -> int:
        # ...

    def make_iterator(
            self,
            df: pd.DataFrame,
            batch_size: int = 32,
            shuffle: bool = False
    ) -> Iterator:
        # ...


In [None]:
    def make_iterator(
            self,
            df: pd.DataFrame,
            batch_size: int = 32,
            shuffle: bool = False
    ) -> Iterator:
        texts, labels = self.parse_inputs(df=df)

        assert len(texts) == len(labels), f'Inconsistent number of texts and labels'

        num_batches = self.get_steps(data=texts)
        for batch_idx in range(num_batches):
            if shuffle:
                batch_indexes = np.random.randint(low=0, high=len(texts), size=batch_size)
            else:
                start_index = batch_idx * batch_size
                end_index = min(batch_idx * batch_size + batch_size, len(texts))
                batch_indexes = np.arange(start_index, end_index)
 
            assert len(batch_indexes) <= batch_size

            batch_texts = texts[batch_indexes]
            text_max_length = max(list(map(lambda t: len(t), batch_texts)))

            batch_texts = pad_sequences(sequences=batch_texts,
                                        maxlen=text_max_length,
                                        padding='post',
                                        truncating='post')

            yield batch_texts, labels[batch_indexes]

In [None]:
    def setup(
            self,
            train_df: pd.DataFrame
    ):
        texts = train_df['Sentence'].values
        texts = list(map(lambda t: self.preprocess_text(t), texts))
        self.tokenizer.fit_on_texts(texts=texts)

    def preprocess_text(
            self,
            text: str
    ) -> str:
        text = text.lower()
        text = text.strip()
        return text
    
    def parse_inputs(
            self,
            df: pd.DataFrame,
    ) -> [np.ndarray, np.ndarray]:
        texts = df['Sentence'].values
        labels = df['Label'].values

        texts = list(map(lambda t: self.preprocess_text(t), texts))
        texts = self.tokenizer.texts_to_sequences(texts)

        return np.array(texts, dtype=object), labels
    
    def get_steps(
            self,
            data: np.ndarray
    ) -> int:
        num_batches = int(np.ceil(len(data) / batch_size))
        return num_batches

Let's try it out!

#### Script

```dataset_loading/tf_naive_pipeline.py```

### tf.data.Dataset

How does tf.data.Dataset work first of all?


<center>
<div>
<img src="Images/Lecture-3/za-warudo.gif" width="1800" alt='JOJO_ZAWARUDO'/>
</div>
</center>


Simply put, tf.data.Dataset defines a **stream** of data

<center>
<div>
<img src="Images/Lecture-3/tf.data-simple-pipeline.png" width="2200" alt='tf.data'/>
</div>
</center>

In particular, we can

- Load data (from generators, files, multiple inputs, arrays, etc...)
- Transform data (tf.data.Dataset.map)
- Shuffle data (tf.data.Dataset.shuffle)
- Pre-load data (tf.data.Dataset.prefetch)
- Interleave I/O and GPU operations (tf.data.Dataset.interleave)
- Batch data (tf.data.Dataset.batch, tf.data.Dataset.padded_batch)
- Split data into multiple workers (tf.data.Dataset.shard)
- ...

### tf.data.Dataset.from_generator

In [None]:
    # Note: the tf.data.Dataset.from_generator and self._make_iterator must be executed by the same python process!
    def make_iterator(
            self,
            df: pd.DataFrame,
            batch_size: int = 32,
            shuffle: bool = False,
            prefetch: bool = False,
    ):
        data_generator = partial(self.light_iterator, df=df)
        data = tf.data.Dataset.from_generator(generator=data_generator,
                                              output_signature=(
                                                  tf.TensorSpec(shape=(), dtype=tf.string),
                                                  tf.TensorSpec(shape=(), dtype=tf.int64)
                                              ))
        if shuffle:
            data = data.shuffle(buffer_size=100)

        data = data.map(map_func=self.parse_inputs,
                        num_parallel_calls=tf.data.AUTOTUNE)
 
        data = data.padded_batch(batch_size=batch_size,
                                 padded_shapes=([None], []))

        if prefetch:
            data = data.prefetch(buffer_size=tf.data.AUTOTUNE)

        return iter(data)

In [None]:
    def __init__(
            self
    ):
        self.tokenizer = TextVectorization()

    def setup(
            self,
            train_df: pd.DataFrame
    ):
        texts = train_df['Sentence'].values
        data = tf.data.Dataset.from_tensors(texts)
        self.tokenizer.adapt(data=data)

    def parse_inputs(
            self,
            text: tf.Tensor,
            label: tf.Tensor
    ) -> [np.ndarray, np.ndarray]:
        text = self.tokenizer(tf.expand_dims(text, 0))[0]   # expand to add 'batch' dimension
        return text, label
    
    def light_iterator(
            self,
            df: pd.DataFrame,
    ) -> Iterator:
        texts, labels = df['Sentence'].values, df['Label'].values

        assert len(texts) == len(labels), f'Inconsistent number of texts and labels'

        for (text, label) in zip(texts, labels):
            yield text, label

Let's try it out!

#### Script

```dataset_loading/tf_data_pipeline_gen.py```

### tf.data.Dataset.from_generator w/ tf.py_function

In [None]:
    def make_iterator(
            self,
            df: pd.DataFrame,
            batch_size: int = 32,
            shuffle: bool = False,
            prefetch: bool = False,
    ):
        data_generator = partial(self.light_iterator, df=df)
        data = tf.data.Dataset.from_generator(generator=data_generator,
                                              output_types=tf.int32)
        if shuffle:
            data = data.shuffle(buffer_size=100)

        data = data.map(map_func=lambda idx: tf.py_function(func=partial(self.parse_inputs, df=df),
                                                            inp=[idx],
                                                             Tout=[tf.int32, tf.int32]),
                        num_parallel_calls=tf.data.AUTOTUNE)

        data = data.padded_batch(batch_size=batch_size,
                                 padded_shapes=([None], []))

        if prefetch:
            data = data.prefetch(buffer_size=tf.data.AUTOTUNE)

        return iter(data)

In [None]:
    def preprocess_text(
            self,
            text: str
    ) -> str:
        text = text.lower()
        text = text.strip()
        return text

    def parse_inputs(
            self,
            index: tf.Tensor,
            df: pd.DataFrame
    ) -> [tf.Tensor, tf.Tensor]:
        texts = df.iloc[index.numpy()]['Sentence']
        labels = df.iloc[index.numpy()]['Label']

        texts = list(map(lambda t: self.preprocess_text(t), [texts]))
        texts = self.tokenizer.texts_to_sequences(texts)[0]
        return texts, labels

    def get_steps(
            self,
            data: np.ndarray
    ) -> int:
        num_batches = int(np.ceil(len(data) / batch_size))
        return num_batches

    def light_iterator(
            self,
            df: pd.DataFrame,
    ) -> Iterator:
        for idx in range(df.shape[0]):
            yield idx

Let's try it out!

#### Script

```dataset_loading/tf_data_pipeline_gen_pyfunc.py```

### tf.data.Dataset.from_slices

In [None]:
    def make_iterator(
            self,
            df: pd.DataFrame,
            batch_size: int = 32,
            shuffle: bool = False,
            prefetch: bool = False,
    ):
        data = tf.data.Dataset.from_tensor_slices(dict(df))

        if shuffle:
            data = data.shuffle(buffer_size=100)

        data = data.map(map_func=self.parse_inputs,
                        num_parallel_calls=tf.data.AUTOTUNE)

        data = data.padded_batch(batch_size=batch_size,
                                 padded_shapes=([None], []))

        if prefetch:
            data = data.prefetch(buffer_size=tf.data.AUTOTUNE)

        return iter(data)

In [None]:
    def setup(
            self,
            train_df: pd.DataFrame
    ):
        texts = train_df['Sentence'].values
        data = tf.data.Dataset.from_tensors(texts)
        self.tokenizer.adapt(data=data)

    def parse_inputs(
            self,
            inputs: Dict,
    ) -> [np.ndarray, np.ndarray]:

        text = inputs['Sentence']
        text = self.tokenizer(tf.expand_dims(text, 0))[0]   # expand to add 'batch' dimension

        return text, inputs['Label']

### tf.data.TFRecordDataset

In [None]:
    def parse_inputs(
            self,
            texts: np.ndarray,
            labels: np.ndarray
    ) -> [List[str], np.ndarray]:
        texts = list(map(lambda t: self.preprocess_text(t), texts))
        texts = self.tokenizer.texts_to_sequences(texts)

        return texts, labels
    
    def serialize_data(
            self,
            df: pd.DataFrame,
            suffix: str
    ):
        texts, labels = df['Sentence'].values, df['Label'].values
        texts, labels = self.parse_inputs(texts=texts, labels=labels)

        Logger.get_logger(__name__).info('Serializing data...')
        with tf.io.TFRecordWriter(self.serialization_path + f'_{suffix}') as writer:
            with tqdm(total=len(texts)) as pbar:
                for idx, (text, label) in enumerate(zip(texts, labels)):
                    feature = InputFeature(token_ids=text,
                                           label_id=label)
                    feature_records = InputFeature.get_feature_records(feature=feature)
                    tf_example = tf.train.Example(features=tf.train.Features(feature=feature_records))
                    writer.write(tf_example.SerializeToString())

                    pbar.set_description(desc=f'Serializing example {idx}/{len(texts)}')
                    pbar.update(1)

In [None]:
class InputFeature:

    def __init__(
            self,
            token_ids: np.ndarray,
            label_id: int
    ):
        self.token_ids = token_ids
        self.label_id = label_id

    @classmethod
    def get_mappings(
            cls,
    ):
        mappings = {
            'token_ids': tf.io.VarLenFeature(tf.int64),
            'label_id': tf.io.FixedLenFeature([1], tf.int64),
        }
        return mappings

    @classmethod
    def get_feature_records(
            cls,
            feature
    ):
        features = dict()
        features['token_ids'] = create_int_feature(feature.token_ids)
        features['label_id'] = create_int_feature([feature.label_id])
        return features

    @classmethod
    def get_dataset_selector(cls):
        def _selector(record):
            x = record['token_ids']
            y = record['label_id']
            return x, y

        return _selector

In [None]:
    def make_iterator(
            self,
            df: pd.DataFrame,
            suffix: str,
            batch_size: int = 32,
            shuffle: bool = False,
            prefetch: bool = False,
    ):
        # Serialize only if needed!
        if not os.path.isfile(self.serialization_path + f'_{suffix}'):
            self.serialize_data(df=df,
                                suffix=suffix)

        data = tf.data.TFRecordDataset(self.serialization_path + f'_{suffix}')

        if shuffle:
            data = data.shuffle(buffer_size=100)

        data = data.map(lambda record: self.decode_record(record,
                                                          name_to_features=InputFeature.get_mappings()),
                        num_parallel_calls=tf.data.AUTOTUNE)
        data = data.padded_batch(batch_size=batch_size,
                                 padded_shapes=({'token_ids': [None],
                                                 'label_id': []}))

        if prefetch:
            data = data.prefetch(buffer_size=tf.data.AUTOTUNE)

        return iter(data)

In [None]:
    def decode_record(
            self,
            record,
            name_to_features
    ):
        """
        TPU does not support int64
        """
        example = tf.io.parse_single_example(record, name_to_features)

        for name in list(example.keys()):
            t = example[name]
            if t.dtype == tf.int64:
                t = tf.cast(t, tf.int32)
            example[name] = t

        example['token_ids'] = tf.sparse.to_dense(example['token_ids'])
        example['label_id'] = tf.reshape(example['label_id'], ())

        return example

Let's try it out!

#### Script

```dataset_loading/tf_data_pipeline_pyrecord.py```

### tf.data.TFRecordDataset w/ multiprocessing

We can actually serialize the dataset way faster by leveraging multi-processing!

In [None]:
    def split_data(
            self,
            data: pd.DataFrame,
            splits=5
    ):
        return np.array_split(data, splits)

    def convert_split_data(
            self,
            split_data: List[pd.DataFrame],
            serialization_path: AnyStr,
            n_processes: int = 4
    ):
        Logger.get_logger(__name__).info(f'''Serializing data...
            Multiprocessing info:
                - n_processes: {n_processes}
                - splits: {len(split_data)}
            ''')

        splits = len(split_data)
        pbar = tqdm(total=splits)

        for split in range(splits):
            pool = mp.Pool(n_processes)
            returns = []

            output_files = [f'{serialization_path}_{split}_{proc_idx}' for proc_idx in range(n_processes)]
            output_files = [item for item in output_files if not os.path.isfile(item)]

            if not len(output_files):
                continue

            proc_split_data = np.array_split(split_data[split], n_processes)
            for proc_idx in range(min(n_processes, len(output_files))):
                r = pool.apply_async(self.serialize_data, args=[proc_split_data[proc_idx], output_files[proc_idx]])
                returns.append(r)
            pool.close()
            for r in returns:
                r.get()
            pool.join()
            pbar.set_description(desc='Completed serialization processes')
            pbar.update(1)
        pbar.close()

In [None]:
    def serialize_data(
            self,
            df: pd.DataFrame,
            output_file: AnyStr
    ):
        texts, labels = df['Sentence'].values, df['Label'].values
        texts, labels = self.parse_inputs(texts=texts, labels=labels)

        with tf.io.TFRecordWriter(output_file) as writer:
            for idx, (text, label) in enumerate(zip(texts, labels)):
                feature = InputFeature(token_ids=text,
                                       label_id=label)
                feature_records = InputFeature.get_feature_records(feature=feature)
                tf_example = tf.train.Example(features=tf.train.Features(feature=feature_records))
                writer.write(tf_example.SerializeToString())

In [None]:
    def make_iterator(
            self,
            df: pd.DataFrame,
            suffix: str,
            splits: int = 5,
            n_processes: int = 4,
            batch_size: int = 32,
            shuffle: bool = False,
            prefetch: bool = False,
    ):
        # Serialize only if needed!
        base_dir = os.path.dirname(self.serialization_path)
        basename = os.path.basename(self.serialization_path + f'_{suffix}')
        if not [filename for filename in os.listdir(base_dir)
                if basename.casefold() in filename.casefold()]:
            split_data = self.split_data(data=df, splits=splits)
            self.convert_split_data(split_data=split_data,
                                    serialization_path=os.path.join(base_dir, basename),
                                    n_processes=n_processes)

        data = tf.data.Dataset.list_files(file_pattern=os.path.join(base_dir, basename + '_*'))
        data = data.interleave(tf.data.TFRecordDataset, num_parallel_calls=tf.data.AUTOTUNE)

        if shuffle:
            data = data.shuffle(buffer_size=100)

        data = data.map(lambda record: self.decode_record(record,
                                                          name_to_features=InputFeature.get_mappings()),
                        num_parallel_calls=tf.data.AUTOTUNE)
        data = data.padded_batch(batch_size=batch_size,
                                 padded_shapes=({'token_ids': [None],
                                                 'label_id': []}))

        if prefetch:
            data = data.prefetch(buffer_size=tf.data.AUTOTUNE)

        return iter(data)

Let's try it out!

#### Script

```dataset_loading/tf_data_pipeline_pyrecord_fast.py```

### Method comparison

We have run all these data loading variants...but which is **better**?

We inspect:
- Timing
- Memory usage

### Script

```dataset_loading/show_stats.py```

### [Bonus] tf.data.Dataset w/ checkpointing

One cool feature of tf.data.Dataset is that an iterator 'status' can be **saved** and **re-stored**

$\rightarrow$ Quite useful if a training routine gets **interrupted** and we want to **quickly re-cover it**

In [None]:
    # Loading
    train_df, val_df, test_df = load_ibm2015_dataset(samples_amount=samples_amount)

    # Pre-processing pipeline
    preprocessor = Preprocessor()
    preprocessor.setup(train_df=train_df)

    timing_info = {}
    memory_info = {}

    train_steps = preprocessor.get_steps(data=train_df['Sentence'].values)
    train_iterator = partial(preprocessor.make_iterator,
                             df=train_df,
                             batch_size=batch_size,
                             prefetch=prefetch,
                             shuffle=True)
    train_iterator = train_iterator()

    ckpt = tf.train.Checkpoint(step=tf.Variable(0), iterator=train_iterator)
    manager = tf.train.CheckpointManager(ckpt, os.path.join(info_save_dir, 'tf_data_pipeline_slices_ckpt'), max_to_keep=2)

    run_iterator(iterator=train_iterator,
                 takes=batches_to_take)

    save_path = manager.save()

    run_iterator(iterator=train_iterator,
                 takes=batches_to_take)

    ckpt.restore(manager.latest_checkpoint)

    run_iterator(iterator=train_iterator,
                 takes=batches_to_take)

Let's try it out!

#### Script

```dataset_loading/tf_bonus_ckpt.py```

## Torch

Similar to Tensorflow, Torch has a good support for efficiently handling data streams.

Torch data handling is just like tf.data.Dataset in terms of functionalities

Currently, data handling is supported via:
   - torch ([torch.utils.data](https://pytorch.org/docs/stable/data.html))
   - [torchdata](https://github.com/pytorch/data)

### What are we going to see

- Naive pipeline
- torchdata IterableWrapper
- torchdata TFRecordLoader

### Datasets, DataPipes and DataLoaders

Pytorch defines two main objects for data handling

- Dataset: wraps a dataset to define an iterator
- DataLoader: handles a Dataset object to enable batching, multiprocessing and shuffling

In [None]:
# taken from https://pytorch.org/tutorials/beginner/basics/data_tutorial.html
import os
import pandas as pd
from torchvision.io import read_image

class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [3]:
# taken from https://pytorch.org/tutorials/beginner/basics/data_tutorial.html
from torch.utils.data import DataLoader

train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=True)

NameError: name 'training_data' is not defined

As an emerging alternative to torch.data.Dataset, ```torchdata``` offers DataPipes

$\rightarrow$ a DataPipe is like a tf.data.Dataset

- We can define it from generators, iterators, files, etc..
- We can still define our custom DataPipe alike torch.data.Dataset

### Naive pipeline

In [None]:
    def make_iterator(
            self,
            df: pd.DataFrame,
            batch_size: int = 32,
            shuffle: bool = False
    ) -> Iterator:
        texts, labels = self.parse_inputs(df=df)

        assert len(texts) == len(labels), f'Inconsistent number of texts and labels'

        num_batches = self.get_steps(data=texts)
        for batch_idx in range(num_batches):
            if shuffle:
                batch_indexes = np.random.randint(low=0, high=len(texts), size=batch_size)
            else:
                start_index = batch_idx * batch_size
                end_index = min(batch_idx * batch_size + batch_size, len(texts))
                batch_indexes = np.arange(start_index, end_index)

            assert len(batch_indexes) <= batch_size

            batch_texts = texts[batch_indexes].tolist()
            text_max_length = max(list(map(lambda t: len(t), batch_texts)))

            batch_texts = map(lambda t: t + [0] * (text_max_length - len(t)), batch_texts)

            yield batch_texts, labels[batch_indexes]

In [None]:
    def __init__(
            self
    ):
        self.tokenizer = get_tokenizer(tokenizer='basic_english')
        self.vocab = None

    def setup(
            self,
            train_df: pd.DataFrame
    ):
        texts = train_df['Sentence'].values
        texts = map(lambda t: self.tokenizer(self.preprocess_text(t)), texts)
        self.vocab = build_vocab_from_iterator(iterator=texts, specials=['<UNK>'])

    def preprocess_text(
            self,
            text: str
    ) -> str:
        text = text.lower()
        text = text.strip()
        return text

    def parse_inputs(
            self,
            df: pd.DataFrame,
    ) -> [np.ndarray, np.ndarray]:
        texts = df['Sentence'].values
        labels = df['Label'].values

        texts = list(map(lambda t: self.preprocess_text(t), texts))
        texts = list(map(lambda t: self.vocab(self.tokenizer(t)), texts))
        return np.array(texts, dtype=object), labels

    def get_steps(
            self,
            data: np.ndarray
    ) -> int:
        num_batches = int(np.ceil(len(data) / batch_size))
        return num_batches

Let's try it out!

#### Script

```dataset_loading/torch_naive_pipeline.py```

### torchdata IterableWrapper

In [None]:
    def make_iterator(
            self,
            df: pd.DataFrame,
            batch_size: int = 32,
            num_workers: int = 4,
            shuffle: bool = False
    ) -> Iterator:
        data_generator = partial(self.light_iterator, df=df)
        data = IterableWrapper(data_generator(), deepcopy=False)

        if shuffle:
            data = data.shuffle(buffer_size=100)

        # Required for parallel processing
        data = data.sharding_filter()

        data = data.map(fn=self.parse_inputs)
        data = DataLoader(data,
                          shuffle=shuffle,  # ensures the previous shuffle works
                          batch_size=batch_size,
                          num_workers=num_workers,
                          collate_fn=self.batch_data)
        return iter(data)

In [None]:
    def parse_inputs(
            self,
            input_data: Tuple[str, int]
    ) -> [List[int], int]:
        text, label = input_data
        text = self.preprocess_text(text=text)
        tokens = self.vocab(self.tokenizer(text))
        return tokens, label
    
    def light_iterator(
            self,
            df: pd.DataFrame,
    ) -> Iterator:
        texts, labels = df['Sentence'].values, df['Label'].values

        assert len(texts) == len(labels), f'Inconsistent number of texts and labels'

        for (text, label) in zip(texts, labels):
            yield text, label
            
    def batch_data(
            self,
            input_batch
    ):
        texts, labels = [], []
        for item in input_batch:
            texts.append(torch.tensor(item[0], dtype=torch.int32))
            labels.append(item[1])

        texts = pad_sequence(texts, batch_first=True, padding_value=0)
        labels = torch.tensor(labels, dtype=torch.int32)
        return texts, labels

Let's try it out!

#### Script

```dataset_loading/torch_datapipe_pipeline.py```

### torchdata TFRecordLoader

A super cool feature of torchdata is that we can directly load from tfrecord data!!!

In [None]:
    def batch_data(
            self,
            input_batch
    ):
        texts, labels = [], []
        for item in input_batch:
            texts.append(item['token_ids'])
            labels.append(item['label_id'])

        texts = pad_sequence(texts, batch_first=True, padding_value=0)
        labels = torch.tensor(labels, dtype=torch.int32)
        return {'token_ids': texts,
                'label_ids': labels}
    
    def make_iterator(
            self,
            suffix: str,
            batch_size: int = 32,
            num_workers: int = 4,
            shuffle: bool = False
    ) -> Iterator:
        base_dir = os.path.dirname(self.serialization_path)
        basename = os.path.basename(self.serialization_path + f'_{suffix}')
        data = FileLister(root=base_dir,
                          masks=basename + '_*')
        data = FileOpener(data, mode='b')
        data = TFRecordLoader(data)

        if shuffle:
            data = data.shuffle(buffer_size=100)

        data = data.sharding_filter()
        data = DataLoader(data,
                          shuffle=shuffle,  # ensures the previous shuffle works
                          batch_size=batch_size,
                          num_workers=num_workers,
                          collate_fn=self.batch_data)
        return iter(data)

Let's try it out!

#### Script

```dataset_loading/torch_datapipe_record_pipeline.py```

## Takeaways

### Tensorflow

- Offers quite **a lot of solutions** for data handling via a unified functionality: ```tf.data.Dataset```

- Tensorflow graph mode is **tricky** and defining a proper tensorflow-like pipeline is non-trivial (**more on this later!**)

- We can define a simpler pipeline via **serialization** (*my preferred solution*)
    - Define your pythonic data pipeline
    - Serialize it
    - Load it efficiently via tf.data.Dataset

- Serialized pipeline has also the advantage of performing a time-consuming data pipeline **just once**! (*useful for multiple concurrent training runs*)

- tf.data.Dataset pipeline is very powerful and flexible but it can be tricky to find the right **pipeline ordering**

### Torch

- Powerful like Tensorflow

- Currently we need ```torch``` and ```torchdata```

- Compared to Tensorflow, pipeline design is **much easier** (*personal opinion*)

- Some datapipes are not so easy to grasp as in tensorflow (*personal opinion*)

- Supports compatibility with tensorflow serialization! (*super cool*)

# Modeling

## What are going to cover?

- [**TF, Torch**] Training example
- [**TF, Torch**] deterministic behaviour
- [**TF**] tf.function
- [**TF, Torch**] dual optimizers
- [**TF, Torch**] broadcasting
- [**TF, Torch**] einsum notation
- [**TF, Torch**] tf.gather

### Training example

Let's train a simple LSTM-based model on our data pipeline

#### Tensorflow

In [None]:
    # Loading
    train_df, val_df, test_df = load_ibm2015_dataset(samples_amount=samples_amount)

    # Pre-processing pipeline
    preprocessor = Preprocessor()
    preprocessor.setup(train_df=train_df)

    train_steps = preprocessor.get_steps(data=train_df['Sentence'].values,
                                         batch_size=batch_size)
    train_iterator = partial(preprocessor.make_iterator,
                             df=train_df,
                             batch_size=batch_size,
                             prefetch=prefetch,
                             shuffle=True)

    model = MyModel(vocab_size=preprocessor.tokenizer.vocabulary_size() + 1)
    trainer = TFTrainer(epochs=epochs)
    training_time, memory_usage = trainer.run(model=model,
                                              train_data_iterator=train_iterator,
                                              steps=train_steps)

In [None]:
class M_LSTM(tf.keras.Model):

    def __init__(
            self,
            embedding_dimension,
            vocab_size,
            lstm_weights,
            answer_units,
            l2_regularization=0.,
            **kwargs
    ):
        super(M_LSTM, self).__init__(**kwargs)
        self.input_embedding = tf.keras.layers.Embedding(input_dim=vocab_size,
                                                         output_dim=embedding_dimension,
                                                         mask_zero=True,
                                                         name='input_embedding')
        # LSTM blocks
        self.lstm_block = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(lstm_weights))
        self.final_block = tf.keras.layers.Dense(units=answer_units,
                                                 kernel_regularizer=tf.keras.regularizers.l2(l2_regularization))

    def call(
            self,
            input_ids,
            training=False
    ):
        # [bs, N, d']
        input_emb = self.input_embedding(input_ids,
                                         training=training)

        # [bs, d']
        encoded_inputs = self.lstm_block(input_emb,
                                         training=training)

        # [bs, d'']
        answer = self.final_block(encoded_inputs,
                                  training=training)
        return answer

In [None]:
class TFModelWrapper:

    @abc.abstractmethod
    def loss_op(
            self,
            x,
            targets,
            training=False
    ):
        pass

    @abc.abstractmethod
    def train_op(
            self,
            x,
            y
    ):
        pass

    @abc.abstractmethod
    def batch_fit(
            self,
            x,
            y
    ):
        pass

In [None]:
class MyModel(TFModelWrapper):

    def __init__(
            self,
            vocab_size: int,
            lstm_weights: int = 64,
            answer_units: int = 64,
            l2_regularization: float = 0.
    ):
        self.model = M_LSTM(embedding_dimension=50,
                            vocab_size=vocab_size,
                            lstm_weights=lstm_weights,
                            answer_units=answer_units,
                            l2_regularization=l2_regularization)
        self.optimizer = tf.keras.optimizers.Adam()
        
    def loss_op(
            self,
            x,
            targets,
            training=False
    ):
        logits = self.model(x,
                            training=training)

        # Cross entropy
        ce = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=targets,
                                                            logits=logits)
        total_loss = tf.reduce_mean(ce)

        loss_info = dict()
        loss_info['CE'] = total_loss

        # L2 regularization
        if self.model.losses:
            additional_losses = tf.reduce_sum(self.model.losses)
            total_loss += additional_losses
            loss_info['L2'] = additional_losses
            
        return total_loss, loss_info

In [None]:
    def train_op(
            self,
            x,
            y
    ):
        with tf.GradientTape() as tape:
            loss, loss_info = self.loss_op(x=x,
                                           targets=y,
                                           training=True)
        grads = tape.gradient(loss, self.model.trainable_variables)
        return loss, loss_info, grads

    def batch_fit(
            self,
            x,
            y
    ):
        loss, loss_info, grads = self.train_op(x, y)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
        return loss, loss_info

Let's try it out!

#### Script

```modeling/tf_training_gen_example.py```

#### Torch

In [None]:
# Loading
    train_df, val_df, test_df = load_ibm2015_dataset(samples_amount=samples_amount)

    # Pre-processing pipeline
    preprocessor = Preprocessor()
    preprocessor.setup(train_df=train_df)

    train_steps = preprocessor.get_steps(data=train_df['Sentence'].values,
                                        batch_size=batch_size)
    train_iterator = partial(preprocessor.make_iterator,
                             df=train_df,
                             batch_size=batch_size,
                             num_workers=num_workers,
                             shuffle=True)

    model = MyModel(vocab_size=len(preprocessor.vocab) + 1)
    trainer = ThTrainer(epochs=epochs)
    training_time, memory_usage = trainer.run(model=model,
                                              train_data_iterator=train_iterator,
                                              steps=train_steps)

In [None]:
class M_LSTM(th.nn.Module):

    def __init__(
            self,
            embedding_dimension,
            vocab_size,
            lstm_weights,
            answer_units,
    ):
        super(M_LSTM, self).__init__()

        self.input_embedding = th.nn.Embedding(num_embeddings=vocab_size,
                                               embedding_dim=embedding_dimension)

        # LSTM blocks
        self.lstm_block = th.nn.LSTM(input_size=embedding_dimension,
                                     hidden_size=lstm_weights,
                                     num_layers=1,
                                     batch_first=True,
                                     bidirectional=True)
        self.final_block = th.nn.Linear(in_features=lstm_weights * 2,
                                        out_features=answer_units)
        self.final_activation = th.nn.ReLU()

    def forward(
            self,
            input_ids
    ):
        # [bs, N, d']
        input_emb = self.input_embedding(input_ids)

        # [bs, d']
        _, (h_n, c_n) = self.lstm_block(input_emb)
        encoded_inputs = th.permute(h_n, [1, 0, 2])
        encoded_inputs = encoded_inputs.reshape(encoded_inputs.shape[0], -1)

        # [bs, d'']
        answer = self.final_block(encoded_inputs)
        answer = self.final_activation(answer)
        return answer


In [None]:
class THModelWrapper:

    def __init__(
            self,
    ):
        self.model = None

    @abc.abstractmethod
    def build_model(
            self,
            *args,
            **kwargs
    ):
        pass

    @abc.abstractmethod
    def loss_op(
            self,
            x,
            targets
    ):
        pass

    @abc.abstractmethod
    def train_op(
            self,
            x,
            y
    ):
        pass

    @abc.abstractmethod
    def batch_fit(
            self,
            x,
            y
    ):
        pass

In [None]:
class MyModel(THModelWrapper):

    def __init__(
            self,
            vocab_size: int,
            lstm_weights: int = 64,
            answer_units: int = 64,
            l2_regularization: float = 0.
    ):
        super().__init__()
        self.build_model(vocab_size=vocab_size,
                         lstm_weights=lstm_weights,
                         answer_units=answer_units)
        self.optimizer = th.optim.Adam(params=self.model.parameters(),
                                       weight_decay=l2_regularization)
        self.criterion = th.nn.CrossEntropyLoss(reduction='mean')

    def build_model(
            self,
            vocab_size: int,
            lstm_weights: int = 64,
            answer_units: int = 64,
    ):
        self.model = M_LSTM(embedding_dimension=50,
                            vocab_size=vocab_size,
                            lstm_weights=lstm_weights,
                            answer_units=answer_units)

    def loss_op(
            self,
            x,
            targets
    ):
        logits = self.model(x)

        # Cross entropy
        ce = self.criterion(logits, targets.long())
        total_loss = ce

        loss_info = dict()
        loss_info['CE'] = ce

        return total_loss, loss_info

In [7]:
    def train_op(
            self,
            x,
            y
    ):
        self.optimizer.zero_grad()

        loss, loss_info = self.loss_op(x=x,
                                       targets=y)
        loss.backward()

        self.optimizer.step()
        return loss, loss_info

    def batch_fit(
            self,
            x,
            y
    ):
        return self.train_op(x, y)

Let's try it out!

#### Script

```modeling/torch_training_datapipe_example.py```

### Deterministic Behaviour

Tensorflow allows to set a manual seed and to enable deterministic operations

#### Note

Depending on your Tensorflow version, some layers **may not support deterministic operations**! $\rightarrow$ you'll get an runtime error

#### Tensorflow

In [None]:
# Seeding
fix_seed(seed=random_seed)
tf_fix_seed(seed=random_seed)

# Loading
train_df, val_df, test_df = load_ibm2015_dataset(samples_amount=samples_amount)

...

In [None]:
def fix_seed(
        seed: int
):
    Logger.get_logger(__name__).info(f'Fixing seed to: {seed}')
    random.seed(seed)
    np.random.seed(seed)
    

def tf_fix_seed(
        seed: int
):
    tf.random.set_seed(seed)
    tf.config.experimental.enable_op_determinism()
    os.environ['TF_DETERMINISTIC_OPS'] = '1'

#### Torch

In [None]:
def th_fix_seed(
        seed: int
):
    th.manual_seed(seed)
    torch.use_deterministic_algorithms(True)

### tf.function

Tensorflow operates by building a **computation graph**, which is a data structure containing the operations required for executing the function.

We can wrap costly tensorflow-like functions via ```@tf.function``` decorator or by wrapping them via ```tf.function(func)```

### Computation graph
   - [**tracing**] it is executed in a pythonic-way **only one time** $\rightarrow$ the computation graph is **built**
   - new inputs flow through the computation graph $\rightarrow$ the computation graph is **executed**

In [None]:
    # We need reduce_tracing=True since we have variable size input sequences! (token_ids)
    @tf.function(reduce_retracing=True)
    def batch_fit( 
            self,
            x,
            y
    ):
        loss, loss_info, grads = self.train_op(x, y)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
        return loss, loss_info

#### Tricky behaviours with computational graph (*know your enemy!*)

In Tensorflow, the computational graph is built during tracing

This means that we **cannot change the structure of the graph**, but just play with its pins (**inputs, outputs**)

#### Example: coefficient update

Suppose, we have our model with a custom loss function

The value of the loss is regulated by a scalar ```coefficient```

In [None]:
class ExampleModel:

    def __init__(
            self
    ):
        self.coefficient = 1.0

    @tf.function
    def compute_overlap_constraint(
            self,
            assignment_matrix
    ):
        K = assignment_matrix.shape[-1]

        # [bs, K, K]
        root_intensities = tf.matmul(assignment_matrix, assignment_matrix, transpose_a=True)
        root_intensities = root_intensities / stable_norm(root_intensities, axis=[1, 2])[:, None, None]

        # [K, K]
        eye_matrix = tf.eye(K)
        eye_matrix = eye_matrix / stable_norm(eye_matrix)

        # [bs, K, K]
        penalty = root_intensities - eye_matrix[None, :]
        penalty = stable_norm(penalty, axis=[1, 2])
        return tf.reduce_mean(penalty) * self.coefficient

In [None]:
def compute_constraint(
        assignment_matrix
):
    pairwise_diff = model.compute_overlap_constraint(assignment_matrix=assignment_matrix[np.newaxis, :, :])
    return pairwise_diff.numpy()

def simulate_coefficient_annealing(
        model,
        assignment_matrix: np.ndarray,
        iterations: int = 100
):
    coefficient_values = np.linspace(0, 1.0, iterations)[::-1]
    logger.info('Simulating coefficient annealing...')
    values = []
    with tqdm(total=iterations) as pbar:
        for it in range(iterations):
            constraint = compute_constraint(assignment_matrix=assignment_matrix)
            values.append(constraint)

            # Update coefficient
            model.coefficient = coefficient_values[it]

            pbar.update(1)
            pbar.set_description(desc=f'Simulating coefficient = {model.coefficient}')

    logger.info(f'Constraint values: {values}')
    plot_coefficients(values=values)

In [None]:
    @tf.function
    def compute_overlap_constraint_with_coefficient(
            self,
            assignment_matrix,
            coefficient
    ):
        K = assignment_matrix.shape[-1]

        # [bs, K, K]
        root_intensities = tf.matmul(assignment_matrix, assignment_matrix, transpose_a=True)
        root_intensities = root_intensities / stable_norm(root_intensities, axis=[1, 2])[:, None, None]

        # [K, K]
        eye_matrix = tf.eye(K)
        eye_matrix = eye_matrix / stable_norm(eye_matrix)

        # [bs, K, K]
        penalty = root_intensities - eye_matrix[None, :]
        penalty = stable_norm(penalty, axis=[1, 2])
        return tf.reduce_mean(penalty) * tf.cast(coefficient, penalty.dtype)

In [None]:
def compute_constraint_with_coefficient(
        assignment_matrix,
        coefficient
):
    pairwise_diff = model.compute_overlap_constraint_with_coefficient(
        assignment_matrix=assignment_matrix[np.newaxis, :, :],
        coefficient=coefficient)
    return pairwise_diff.numpy()

def simulate_coefficient_annealing_with_coefficient(
        model,
        assignment_matrix: np.ndarray,
        iterations: int = 100
):
    coefficient_values = np.linspace(0, 1.0, iterations)[::-1]
    logger.info('Simulating coefficient annealing...')
    values = []
    with tqdm(total=iterations) as pbar:
        for it in range(iterations):
            constraint = compute_constraint_with_coefficient(assignment_matrix=assignment_matrix,
                                                             coefficient=model.coefficient)
            values.append(constraint)

            # Update coefficient
            model.coefficient = coefficient_values[it]

            pbar.update(1)
            pbar.set_description(desc=f'Simulating coefficient = {model.coefficient}')

    logger.info(f'Constraint values: {values}')
    plot_coefficients(values=values)

Let's try it out!

#### Script

```modeling/tf_function_graph_mode_example.py```

### Dual Optimizers

In many cases, we may need **multiple optimizers**

- Different learning rates for different parameters
- Multi-step training (GANs, Dual Lagrangian method, etc...)

#### Tensorflow

In [None]:
class MyModel(TFModelWrapper):

    def __init__(
            self,
            vocab_size: int,
            lstm_weights: int = 64,
            answer_units: int = 64,
            l2_regularization: float = 0.
    ):
        self.model = M_LSTM(embedding_dimension=50,
                            vocab_size=vocab_size,
                            lstm_weights=lstm_weights,
                            answer_units=answer_units,
                            l2_regularization=l2_regularization)
        self.optimizer_a = tf.keras.optimizers.Adam()
        self.optimizer_b = tf.keras.optimizers.Adam(learning_rate=1e-02)

In [None]:
    def train_op(
            self,
            x,
            y
    ):
        # We set persistent=True to track multiple gradients within a single forward pass
        # Without persistent=True, the second tape.gradient(...) fails
        with tf.GradientTape(persistent=True) as tape:
            loss, loss_info = self.loss_op(x=x,
                                           targets=y,
                                           training=True)
        grads_a = tape.gradient(loss, self.model.trainable_variables[:-2])
        grads_b = tape.gradient(loss, self.model.trainable_variables[-2:])
        return loss, loss_info, grads_a, grads_b
    
    # We need reduce_tracing=True since we have variable size input sequences! (token_ids)
    @tf.function(reduce_retracing=True)
    def batch_fit(
            self,
            x,
            y
    ):
        loss, loss_info, grads_a, grads_b = self.train_op(x, y)
        self.optimizer_a.apply_gradients(zip(grads_a, self.model.trainable_variables[:-2]))
        self.optimizer_b.apply_gradients(zip(grads_b, self.model.trainable_variables[-2:]))
        return loss, loss_info

Let's try it out!

#### Script

```modeling/tf_dual_optimizers_example.py```

#### Torch

In [None]:
class MyModel(THModelWrapper):

    def __init__(
            self,
            vocab_size: int,
            lstm_weights: int = 64,
            answer_units: int = 64,
            l2_regularization: float = 0.
    ):
        super().__init__()
        self.build_model(vocab_size=vocab_size,
                         lstm_weights=lstm_weights,
                         answer_units=answer_units)

        clf_layer_parameters = list(self.model.final_block.named_parameters())
        clf_layer_parameter_names = [f'final_block.{item[0]}' for item in clf_layer_parameters]
        clf_layer_parameters = [item[1] for item in clf_layer_parameters]
        other_parameters = [v for k, v in self.model.named_parameters() if k not in clf_layer_parameter_names]

        self.optimizer = th.optim.Adam(params=[
            {'params': clf_layer_parameters, "lr": 1e-02},
            {'params': other_parameters, "lr": 1e-03}
        ],
            weight_decay=l2_regularization)
        self.criterion = th.nn.CrossEntropyLoss(reduction='mean')

Let's try it out!

#### Script

```modeling/torch_different_lrs_example.py```

In [None]:
class MyModel(THModelWrapper):

    def __init__(
            self,
            vocab_size: int,
            lstm_weights: int = 64,
            answer_units: int = 64,
            l2_regularization: float = 0.
    ):
        super().__init__()
        self.build_model(vocab_size=vocab_size,
                         lstm_weights=lstm_weights,
                         answer_units=answer_units)

        clf_layer_parameters = list(self.model.final_block.named_parameters())
        clf_layer_parameter_names = [f'final_block.{item[0]}' for item in clf_layer_parameters]
        self.clf_layer_parameters = [item[1] for item in clf_layer_parameters]
        self.other_parameters = [v for k, v in self.model.named_parameters() if k not in clf_layer_parameter_names]

        self.optimizer_a = th.optim.Adam(params=self.clf_layer_parameters)
        self.optimizer_b = th.optim.Adam(params=self.other_parameters, weight_decay=l2_regularization)
        self.criterion = th.nn.CrossEntropyLoss(reduction='mean')
        
    def train_op(
            self,
            x,
            y
    ):
        self.optimizer_a.zero_grad()
        self.optimizer_b.zero_grad()

        loss, loss_info = self.loss_op(x=x,
                                       targets=y)

        loss.backward(retain_graph=True)
        self.optimizer_a.step()
        self.optimizer_b.step()

        return loss, loss_info

Let's try it out!

#### Script

```modeling/torch_dual_optimizers_example.py```

### Broadcasting

When working with tensors, broadcasting is the **101** of Tensorflow

We really need to understand broadcasting since
- It **eases** our scripting life
- It may be **tricky** to spot some **unwanted behaviours** caused by broadcasting (*a.k.a. bugs/features*)

#### Tensorflow

In [None]:
@tf.function
def ptk_overlap_constraint(pooling_matrix):
    # pooling_matrix is [bs, N, K]
    K = pooling_matrix.shape[-1]

    # [bs, K, N]
    pooling_matrix = tf.transpose(pooling_matrix, [0, 2, 1])

    # [bs, K, 1, N] - [bs, 1, K, N]
    # [bs, K, K]
    penalty = tf.reduce_sum(tf.abs(pooling_matrix[:, :, None, :] - pooling_matrix[:, None, :, :]), axis=-1)
    penalty = tf.nn.relu(1.0 - penalty)

    diag_mask = tf.ones_like(penalty) - tf.eye(penalty.shape[-1], dtype=tf.float32)[None, :, :]
    penalty *= diag_mask

    # [bs,]
    denominator = K ** 2 - K
    denominator = tf.maximum(tf.cast(denominator, tf.float32), 1.0)

    penalty = tf.reduce_sum(penalty, axis=(-1, -2)) / denominator
    return penalty

Let's try it out!

#### Script

```modeling/tf_broadcasting_example.py```

#### Torch

In [None]:
def ptk_overlap_constraint(
        pooling_matrix
):
    pooling_matrix = th.tensor(pooling_matrix)

    # pooling_matrix is [bs, N, K]
    K = pooling_matrix.shape[-1]

    # [bs, K, N]
    pooling_matrix = th.permute(pooling_matrix, [0, 2, 1])

    # [bs, K, 1, N] - [bs, 1, K, N]
    # [bs, K, K]
    penalty = th.sum(th.abs(pooling_matrix[:, :, None, :] - pooling_matrix[:, None, :, :]), dim=-1)
    penalty = th.relu(1.0 - penalty)

    diag_mask = th.ones_like(penalty) - th.eye(penalty.shape[-1], dtype=th.float32)[None, :, :]
    penalty *= diag_mask

    # [bs,]
    denominator = K ** 2 - K
    denominator = th.tensor(np.maximum(denominator, 1.0))

    penalty = th.sum(penalty, dim=(-1, -2)) / denominator
    return penalty

Let's try it out!

#### Script

```modeling/torch_broadcasting_example.py```

### Einsum notation

The Einstein summation convention is the **ultimate generalization of products** such as matrix multiplication to multiple dimensions. $\rightarrow$ [source](https://obilaniu6266h16.wordpress.com/2016/02/04/einstein-summation-in-numpy/)

#### [A simple example](https://rockt.github.io/2018/04/30/einsum)

Let's say we want to multiply matrix $A \in \mathbb{R}^{I \times K}$ with $B \in \mathbb{R}^{K \times J}$ followed by the sum of each column vector

We can do

$$
    c_j = \sum_i \sum_k A_{ik}B_{kj} = A_{ik}B_{kj}
$$

where $c_j \in \mathbb{R}^{J}$ is each individual element derived by multiplying values in the column vectors $A_{i:}$ and row vectors $B_{:j}$ and summing them up

The above equation can be written via Einstein notation as follows

$$
    ik,kj \rightarrow j
$$

#### Einsum is all you need

Actually, this notation is very powerful since we can define **many operations** (*you name it!*)

- Matrix transpose: ```ij->ji```
- Sum: ```ij->```
- Sum over axis: ```ij->j``` (axis=0), ```ij->i``` (axis=1)
- Matrix-vector multiplication: ```ik,k->i```
- Matrix-matrix multiplication: ```ik,kj->ij```
- Batch matrix multiplication: ```ijk,ikl->ijl```
- Dot product: ```i,i->```
- Matrix dot product: ```ij,ij->```
- Hadamard product: ```ij,ij->ij```
- Outer product: ```i,j->ij```
- ...

#### Tensorflow

In [None]:
@tf.function
def test_transpose(x):
    x_T = tf.transpose(x, [0, 2, 1])
    ein_T = tf.einsum('ijk->ikj', x)
    return x_T, ein_T


@tf.function
def test_sum(x):
    sum_x = tf.reduce_sum(x)
    ein_sum = tf.einsum('ijk->', x)
    return sum_x, ein_sum


@tf.function
def test_axis_sum(x):
    sum_x = tf.reduce_sum(x, axis=-1)
    ein_sum = tf.einsum('ijk->ij', x)
    return sum_x, ein_sum


@tf.function
def test_mm(x):
    mm_x = tf.matmul(x, x, transpose_a=True)
    ein_mm = tf.einsum('ijk,ikl->ijl', tf.einsum('ijk->ikj', x), x)
    return mm_x, ein_mm

Let's try it out!

#### Script

```modeling/tf_einsum_example.py```

#### Torch

In [None]:
def test_transpose(x):
    x = th.tensor(x)
    x_T = th.permute(x, [0, 2, 1])
    ein_T = th.einsum('ijk->ikj', x)
    return x_T, ein_T


def test_sum(x):
    x = th.tensor(x)
    sum_x = th.sum(x)
    ein_sum = th.einsum('ijk->', x)
    return sum_x, ein_sum


def test_axis_sum(x):
    x = th.tensor(x)
    sum_x = th.sum(x, dim=-1)
    ein_sum = th.einsum('ijk->ij', x)
    return sum_x, ein_sum


def test_mm(x):
    x = th.tensor(x)
    mm_x = th.matmul(th.permute(x, [0, 2, 1]), x)
    ein_mm = th.einsum('ijk,ikl->ijl', th.einsum('ijk->ikj', x), x)
    return mm_x, ein_mm

Let's try it out!

#### Script

```modeling/torch_einsum_example.py```

### tf.gather

tf.gather is an important operation since it allows to perform **multi-dimensional tensor indexing**

#### Tensorflow

In [None]:
def supervision_loss(
        prob_dist,
        positive_indexes,
        negative_indexes,
        mask_indexes,
        supervision_margin=0.5
):
    padding_amount = positive_indexes.shape[-1]

    # Masking...

    # Split each similarity score for a target into a separate sample
    # similarities shape: [batch_size, memory_max_length]
    # positive_idxs shape: [batch_size, padding_amount]
    # gather_nd shape: [batch_size, padding_amount]
    # pos_scores shape: [batch_size * padding_amount, 1]
    pos_scores = tf.gather(prob_dist, positive_indexes, batch_dims=1)
    pos_scores = tf.reshape(pos_scores, [-1, 1])

    # Repeat similarity scores for non-target memories for each positive score
    # similarities shape: [batch_size, memory_max_length]
    # negative_idxs shape: [batch_size, padding_amount]
    # neg_scores shape: [batch_size * padding_amount, padding_amount]
    neg_scores = tf.gather(prob_dist, negative_indexes, batch_dims=1)
    neg_scores = tf.tile(neg_scores, multiples=[1, padding_amount])
    neg_scores = tf.reshape(neg_scores, [-1, padding_amount])

    # Compare each single positive score with all corresponding negative scores
    # [batch_size * padding_amount, padding_amount]
    # [batch_size, padding_amount]
    # [batch_size, 1]
    # Samples without supervision are ignored by applying a zero mask (mask_res)
    hop_supervision_loss = tf.maximum(0., supervision_margin - pos_scores + neg_scores)
    hop_supervision_loss = hop_supervision_loss * tf.cast(mask_res, dtype=hop_supervision_loss.dtype)
    hop_supervision_loss = tf.reshape(hop_supervision_loss, [-1, padding_amount, padding_amount])

    hop_supervision_loss = tf.reduce_sum(hop_supervision_loss, axis=[1, 2])
    
    # Masking...
    
    return hop_supervision_loss

In [None]:
def test_gather_nd_dim0():
    params = tf.reshape(tf.range(15), [5, 3])
    indexes = tf.reshape(tf.constant([[0, 0], [1, 1], [2, 2],
                                      [0, 0], [0, 1], [4, 2]]), [2, -1, 2])
    res = tf.gather_nd(params, indices=indexes)
    print(f'Params: {os.linesep}{params}')
    print(f'Indexes: {os.linesep}{indexes}')
    print(f'Gather: {os.linesep}{res}')

Let's try it out!

#### Script

```modeling/tf_gather_example.py```

#### Torch

In [None]:
def test_gather_dim0():
    params = th.arange(15).reshape(5, 3)
    indexes = th.tensor([0, 1, 2,
                         0, 0, 4]).reshape(2, -1)
    res = th.gather(params, dim=0, index=indexes)
    print(f'Params: {os.linesep}{params}')
    print(f'Indexes: {os.linesep}{indexes}')
    print(f'Gather: {os.linesep}{res}')


def test_gather_dim1():
    params = th.arange(15).reshape(5, 3)
    indexes = th.tensor([0, 1, 2,
                         0, 0, 2]).reshape(2, -1)
    res = th.gather(params, dim=1, index=indexes)
    print(f'Params: {os.linesep}{params}')
    print(f'Indexes: {os.linesep}{indexes}')
    print(f'Gather: {os.linesep}{res}')

Let's try it out!

#### Script

```modeling/torch_gather_example.py```

## Concluding Remarks

- Data pipelines are quite useful and powerful to define efficient model training/evaluation

- Model design can lead to a lot of implementation challenges to design efficient code execution

#### There's lot to be covered yet

#### Tensorflow

- Sampling
- Distributed training
- Tensorflow probability
- Tensorflow addons
- Transformers

#### Torch

- Sampling
- Distributed training
- Torch lightning
- Transformers

<center>
<div>
<img src="Images/Lecture-1/jojo_tbc.gif" width="600" alt='JOJO_tbc'/>
</div>
</center>

## 次回 (Jikai!)

Knowing a library's functionalities is just the **tip of the iceberg**!

We need to learn **coding best practices**

- Debugging
- Typing and type-checking
- Model definition, training, evaluation
- Logging
- Unit tests
- Controlled enviroments (e.g., Docker)

# Any questions?

<center>
<div>
<img src="Images/Lecture-1/jojo-arrivederci.gif" width="1200" alt='JOJO_arrivederci'/>
</div>
</center>