# ML Recipes - From File

In this page, we will show you how to solve your own ML tasks on top of a file dataset.

> It is recommended to go through some basic recipes ([`models`](../models.ipynb), [`losses`](../losses.ipynb), [`metrics`](../metrics.ipynb), [`from_numpy`](./from_numpy.ipynb)) first, but if you just want to utilize `carefree-learn` as soon as possible, this page is pretty self-contained as well!

We'll use the famous iris dataset to illustrate the basic concepts:



In [1]:
import os

data_path = os.path.join("data", "iris.data")
with open(data_path, "r") as f:
    print(f.readlines()[:3])

['5.1,3.5,1.4,0.2,Iris-setosa\n', '4.9,3.0,1.4,0.2,Iris-setosa\n', '4.7,3.2,1.3,0.2,Iris-setosa\n']


# Table of Content

- [Data Processing](#Data-Processing)
  - [Out of the Loop](#Out-of-the-Loop)
  - [Utilize the Integrated `IMLData` System](#Utilize-the-Integrated-IMLData-System)
    - [Example](#Example)
    - [Explanations](#Explanations)
      - [General](#General)
      - [`build_with`](#build_with)
      - [`preprocess`](#preprocess)
      - [`dumps` / `loads`](#dumps-/-loads)
- [Processing Labels](#Processing-Labels)
- [Optional Callbacks](#Optional-Callbacks)
  - [`get_num_samples`](#get_num_samples)
  - [`fetch_batch`](#fetch_batch)
  - [`postprocess_batch`](#postprocess_batch)
  - [`postprocess_results`](#postprocess_results)

# Preparations

In [2]:
import torch
import cflearn

import numpy as np
import torch.nn as nn

from torch import Tensor
from typing import Dict
from typing import Tuple

np.random.seed(142857)
torch.manual_seed(142857)

<torch._C.Generator at 0x2b69910e3f0>

# Data Processing

The main difficulty of dealing with file dataset lies on the data processing procedure, so we will mainly focus on this part.

> For the 'common' parts of handling ML tasks (e.g. building custom models, losses, metrics), please refer to the basic recipes ([`models`](../models.ipynb), [`losses`](../losses.ipynb), [`metrics`](../metrics.ipynb), [`from_numpy`](./from_numpy.ipynb)) for more details.

## Out of the Loop

The most intuitive way is to process the file dataset 'out of the loop'. That is, to process the file dataset 'somewhere else' before we start using `carefree-learn`. It suffers from certain disadvantages, though, that the final solution can hardly be 'out of the box'. One of the powerful functions that `carefree-learn` supports is that we can save everything into a single `zip` file, so reprodution / deployment will be very easy. But if we process the dataset 'out of the loop', we have to manage to save / load the data processing stuffs, which makes it more difficult to manage the solutions.

## Utilize the Integrated `IMLData` System

To make things easier, `carefree-learn` introduces the `IMLData` system. By following some protocols, we can integrate the data processing stuffs into `carefree-learn`, so they can be saved into the same `zip` file.

### Example

Here's a minimal example where we can integrate the file processing procedure with this system:

In [3]:
from cflearn import IMLData
from cflearn import IMLDataProcessor
from cflearn import IMLPreProcessedData
from cflearn import register_ml_data
from cflearn import register_ml_data_processor


@register_ml_data_processor("my_fancy_processor", allow_duplicate=True)
class MyFancyProcessor(IMLDataProcessor):
    label_dictionary: Dict[str, int]
    
    def build_with(self, x_train: str) -> None:
        all_labels = set()
        with open(x_train, "r") as f:
            for line in f:
                line = line.strip()
                # skip empty line
                if not line:
                    continue
                # last element of each line is the label
                label = line.strip().split(",")[-1]
                all_labels.add(label)
        # create a mapping that map the original str labels to int labels
        self.label_dictionary = {}
        for i, label in enumerate(sorted(all_labels)):
            self.label_dictionary[label] = i

    def _read(self, file_path: str) -> Tuple[np.ndarray, np.ndarray]:
        x = []
        y = []
        with open(file_path, "r") as f:
            for line in f:
                line = line.strip()
                # skip empty line
                if not line:
                    continue
                line = line.split(",")
                # last element of each line is the label
                label = self.label_dictionary[line.pop()]
                x.append(line)
                # here we append [label] to ensure that the final `y` is a 2d array
                y.append([label])
        return np.array(x, np.float32), np.array(y, int)

    def preprocess(self, x_train, x_valid) -> IMLPreProcessedData:
        x_train, y_train = self._read(x_train)
        if x_valid is None:
            x_valid = y_valid = None
        else:
            x_valid, y_valid = self._read(x_valid)
        return IMLPreProcessedData(
            x_train,
            y_train,
            x_valid,
            y_valid,
            num_classes=len(self.label_dictionary),
        )

    def dumps(self):
        return {
            "label_dictionary": self.label_dictionary,
        }

    def loads(self, dumped) -> None:
        self.label_dictionary = dumped["label_dictionary"]


@register_ml_data("my_fancy_data", allow_duplicate=True)
class MyFancyData(IMLData):
    processor_type = "my_fancy_processor"

Although it seems to be a lot of codes, they should be pretty straight forward to understand. We'll dive into the details in the [Explanations](#Explanations) section, for now let's just try it out:

In [4]:
m = cflearn.api.fit_ml(
    MyFancyData(data_path),
    is_classification=True,
)

                                Internal Default Configurations Used by `carefree-learn`                                
------------------------------------------------------------------------------------------------------------------------
                             train_samples   |   150
                             valid_samples   |   None
                         max_snapshot_file   |   25
                                 input_dim   |   4
                                output_dim   |   3
                                 loss_name   |   focal
                                 workplace   |   _logs\2022-08-19_10-37-03-356143
                             monitor_names   |   ['mean_std', 'plateau']
                      additional_callbacks   |   ['_log_metrics_msg', '_inject_loader_name']
                   log_metrics_msg_verbose   |   True
                              metric_names   |   ['acc', 'auc']
----------------------------------------------------------------------------

Seems that everything works like a charm!

As we mentioned before, the best advantage of utilizing the `IMLData` system is that, we can save the data processing procedure into the same `zip` file that we use to save our models. For example, by running:

In [5]:
m.save("./test")

<cflearn.api.ml.pipeline.MLPipeline at 0x2b695948220>

This will generate a `test.zip` file which can be loaded:

In [6]:
m2 = cflearn.api.load("./test")

We can use it to make predictions and see if the data processing procedure is preserved:

In [7]:
# the `make_inference_data` API can help us making an `IMLData` instance that is suitable to pass into the `predict` method
idata = m2.make_inference_data(data_path)
# specifying `return_classes=True` can force `predict` to return classes instead of the 'raw' logits
predictions = m2.predict(idata, return_classes=True)[cflearn.PREDICTIONS_KEY]
# calculate the accuracy
print((idata.train_data.y == predictions).mean())

0.9866666666666667


As we can see, the data processing procedure is perfectly preserved!

### Explanations

To start up, we can first write down these two parts:

```python
# The name, `xxx_processor`, can be an arbitrary name. Just make sure it is 'unique' so it will not collide with others.
@register_ml_data_processor("xxx_processor", allow_duplicate=True)
class Processor(IMLDataProcessor):
    ...

# The name, `xxx_data`, can be an arbitrary name. Just make sure it is 'unique' so it will not collide with others.
@register_ml_data("xxx_data")
class Data(IMLData):
    # make sure that the `processor_type` matches the name you used in the `register_ml_data_processor` above
    processor_type = "xxx_processor"
```

These will register a new `IMLDataProcessor` and a new `IMLData` who uses the new `IMLDataProcessor` into `carefree-learn`. After these, we can use them by constructing the training data with the new `IMLData`:

```python
data = Data(x, y)
```

We support using validation dataset as well:

```python
data = Data(x_train, y_train, x_valid, y_valid)
```

With the `fit_ml` API, we can utilize the `data` easily:

```python
m = cflearn.api.fit_ml(
    data,
    ...
)
```

So the key tasks are: how to define the details in `IMLDataProcessor`? We'll walk through each of its `abstractmethod` in the following sections.

#### General

In order to reduce the number of boiler plate codes, `carefree-learn` allows you to 'choose' the arguments that you want for your implementations. For example, the `build_with` defined in `IMLDataProcessor` is:

```python
@abstractmethod
def build_with(
    self,
    config: Dict[str, Any],
    x_train: Union[np.ndarray, str],
    y_train: Optional[Union[np.ndarray, str]],
    x_valid: Optional[Union[np.ndarray, str]],
    y_valid: Optional[Union[np.ndarray, str]],
) -> None:
    pass
```

But in the `MyFancyProcessor`, we actually wrote:

```python
def build_with(self, x_train: str) -> None:
    ...
```

As you can see, we only 'chose' `x_train` for our implementations. In fact, you can 'choose' any number of arguments based on your actual requirements, without having to write down all five arguments defined in the `IMLDataProcessor` interface!

> In the following sections, we will introduce all the arguments defined in the interface, but keep in mind that you don't need to write down all of them in your own implementations, as you can choose what you need!

#### `build_with`

```python
@abstractmethod
def build_with(
    self,
    config: Dict[str, Any],
    x_train: Union[np.ndarray, str],
    y_train: Optional[Union[np.ndarray, str]],
    x_valid: Optional[Union[np.ndarray, str]],
    y_valid: Optional[Union[np.ndarray, str]],
) -> None:
    pass
```

- `config`: configurations specified by the corresponding `IMLData`.
  - It will be defined in the `processor_build_config` property, as explained below.
- `x_train`: training data, could be `str` when we need to handle file datasets.
- `y_train`: training labels, could be `None` if `x_train` is a file or not provided.
  - It is common that labels are not provided at inference time. 
- `x_valid`: validation data, could be `str` when we need to handle file datasets, could be `None` if not provided.
- `y_valid`: validation labels, could be `None` if not provided.

This method will only be called when we are instantiating a `IMLData` instance. Here's the pseudo codes of the `__init__` process:

```python
class IMLData:
    def __init__(
        self,
        x_train,
        y_train,
        x_valid,
        y_valid,
        ...,
    ):
        ...
        # here, we should define `processor_build_config` to pass some configs to our processor
        kw = dict(
            config=self.processor_build_config,
            x_train=x_train,
            y_train=y_train,
            x_valid=x_valid,
            y_valid=y_valid,
        )
        # `build_with` will be called here
        processor.build_with(**kw)
    
    @property
    def processor_build_config(self) -> Dict[str, Any]:
        ...
        
```

So if out processor needs to be configured, we need to pass the configurations to our own `IMLData` instance, and then define them in the `processor_build_config` property:

In [8]:
@register_ml_data_processor("foo_processor", allow_duplicate=True)
class FooProcessor(IMLDataProcessor):
    def build_with(self, config, x_train):
        print("> x_train   ", x_train)
        print("> config.foo", config["foo"])
    
    def preprocess(self):
        pass
    
    def dumps(self):
        pass
    
    def loads(self):
        pass

@register_ml_data("foo_data", allow_duplicate=True)
class FooData(IMLData):
    processor_type = "foo_processor"
    
    def __init__(self, x_train, foo):
        # the extra assignments should take place before the `super` call, because
        # `processor_build_config` will be used in the `super` call
        self.foo = foo
        super().__init__(x_train)
    
    @property
    def processor_build_config(self):
        return dict(
            foo=self.foo,
        )

data = FooData("foo.train", 1.2345)

> x_train    foo.train
> config.foo 1.2345


#### `preprocess`

```python
class IMLPreProcessedData(NamedTuple):
    x_train: np.ndarray
    y_train: Optional[np.ndarray] = None
    x_valid: Optional[np.ndarray] = None
    y_valid: Optional[np.ndarray] = None
    # if input_dim is not specified, `x_train.shape[-1]` will be used
    input_dim: Optional[int] = None
    num_history: Optional[int] = None
    num_classes: Optional[int] = None
    is_classification: Optional[bool] = None

@abstractmethod
def preprocess(
    self,
    config: Dict[str, Any],
    x_train: Union[np.ndarray, str],
    y_train: Optional[Union[np.ndarray, str]],
    x_valid: Optional[Union[np.ndarray, str]],
    y_valid: Optional[Union[np.ndarray, str]],
    *,
    for_inference: bool,
) -> IMLPreProcessedData:
    pass
```

- `config`: configurations specified by the corresponding `IMLData`.
  - It will be defined in the `processor_preprocess_config` property, as explained below.
- `x_train`: original training data, could be `str` when we need to handle file datasets.
- `y_train`: original training labels, could be `None` if `x_train` is a file or not provided.
- `x_valid`: original validation data, could be `None` if not provided.
- `y_valid`: original validation labels, could be `None` if not provided.

The `preprocess` method should return a `IMLPreProcessedData` namedtuple:
- `x_train`: preprocessed training features.
- `y_train`: preprocessed training labels, could be `None` if not provided.
  - It is common that labels are not provided at inference time.
- `x_valid`: preprocessed validation features, could be `None` if not provided.
- `y_valid`: preprocessed validation labels, could be `None` if not provided.
- `input_dim`: input feature dim that the model will receive.
  - If not provided, `x_train.shape[-1]` will be used.
  - If `encoder` is used, this setting will not represent the final input dim that your model will receive, because the `encoder` might 'expand' the dimension with some encoding methods.
- `num_history`: number of history, useful in time series tasks.
  - If not provided, we will use the default value defined in the pipeline.
- `num_classes`: number of classes, will be used as `output_dim` if `is_classification` is True & `output_dim` is not specified.
  - In the [Example](#Example) section above, we returned the `num_classes` in the `MyFancyProcessor` and it is used as the `output_dim`.
  - If not provided, we will use the default value defined in the pipeline.
- `is_classification`: whether current task is a classification task.
  - If not provided, we will use the default value defined in the pipeline.
  
This method, as its name suggests, will be called before `IMLData` construct its datasets and dataloaders.

#### `dumps` / `loads`

```python
@abstractmethod
def dumps(self) -> Any:
    pass

@abstractmethod
def loads(self, dumped: Any) -> None:
    pass
```

- `dumps`: return an object that holds the necessary information for the `loads` method.
- `loads`: setup the processor with the object (`dumped`) returned by the `dumps` method.

These two methods are the key parts of the serialization process. When we save / load our pipeline via `m.save` / `cflearn.api.load`, these methods will be called respectively at the proper places.

Although almost anything can be handled by `carefree-learn`, following these best practices can make your processor more light-weight and performant:

- return a JSON-serializable object in the `dumps` method.
- Turn small `np.ndarray` into a python `list`.
- Save large `np.ndarray` to a certain path, and then dump the path instead of the `np.ndarray` itself.
  - This practice applies to other large objects as well.

> For the last suggestion, we should keep in mind that this situation can hardly happen if our processor is defined properly. Because in most cases, a processor should only hold the necessary information for processing the data, so it hardly needs to contain large `np.ndarray`s / objects!

# Processing Labels

For now, our model is returning the numerical class labels. This is already usable, but not perfect because we usually want it to return the 'original' labels. This can be achieved by implement the `postprocess_results` callback:

In [9]:
@register_ml_data_processor("my_fancy_label_processor", allow_duplicate=True)
class MyFancyLabelProcessor(MyFancyProcessor):
    def postprocess_results(self, forward, return_classes):
        # we only want to handle the situation when `return_classes` is specified
        if not return_classes:
            return forward
        # create a reverse dictionary to map int labels to str labels
        rev_dict = {v: k for k, v in self.label_dictionary.items()}
        y = forward[cflearn.PREDICTIONS_KEY]
        y = y.ravel().tolist()
        # make sure that the final y is a 2d array
        y = [[rev_dict[label]] for label in y]
        y = np.array(y)
        forward[cflearn.PREDICTIONS_KEY] = y
        return forward

@register_ml_data("my_fancy_label_data", allow_duplicate=True)
class MyFancyLabelData(IMLData):
    processor_type = "my_fancy_label_processor"
    
m = cflearn.api.fit_ml(
    MyFancyLabelData(data_path),
    is_classification=True,
)

                                Internal Default Configurations Used by `carefree-learn`                                
------------------------------------------------------------------------------------------------------------------------
                             train_samples   |   150
                             valid_samples   |   None
                         max_snapshot_file   |   25
                                 input_dim   |   4
                                output_dim   |   3
                                 loss_name   |   focal
                                 workplace   |   _logs\2022-08-19_10-37-04-286146
                             monitor_names   |   ['mean_std', 'plateau']
                      additional_callbacks   |   ['_log_metrics_msg', '_inject_loader_name']
                   log_metrics_msg_verbose   |   True
                              metric_names   |   ['acc', 'auc']
----------------------------------------------------------------------------

Let's make an inference again:

In [10]:
idata = m.make_inference_data(data_path)
p1 = m.predict(idata, return_classes=True)[cflearn.PREDICTIONS_KEY]
print(p1[:3])

[['Iris-setosa']
 ['Iris-setosa']
 ['Iris-setosa']]


Great! Now our model will return the 'original' labels as expected!

> More details of `postprocess_results` will be covered in the [Optional Callbacks](#postprocess_results) section.

Let's try serializations as well:

In [11]:
m.save("./test")
m2 = cflearn.api.load("./test")
idata = m2.make_inference_data(data_path)
p2 = m2.predict(idata, return_classes=True)[cflearn.PREDICTIONS_KEY]
print("all same:", np.all(p1 == p2))

all same: True


As we can see, the data processing procedure is perfectly preserved!

# Optional Callbacks

Besides the above processing methods, the `IMLData` system also provides several useful callbacks in order to enable the full control of the data flow. We will introduce these callbacks in the following sections.

## `get_num_samples`

```python
def get_num_samples(self, x: np.ndarray) -> Optional[int]:
    return None
```

This method can override how the number of samples is calculated. `len(x)` will be used if `None` is returned.

## `fetch_batch`

```python
class IMLBatch(NamedTuple):
    input: np.ndarray
    labels: Optional[np.ndarray]
    others: Optional[np_dict_type] = None

def fetch_batch(
    self,
    x: np.ndarray,
    y: Optional[np.ndarray],
    indices: Union[int, List[int], np.ndarray],
) -> IMLBatch:
    return IMLBatch(x[indices], None if y is None else y[indices])
```

This method defines how the batch will be fetched. The `others` field allows you to inject some additional data to your batch, for example:

In [12]:
from cflearn import IMLBatch

@register_ml_data_processor("inject_foo_processor", allow_duplicate=True)
class InjectFooProcessor(MyFancyProcessor):
    def fetch_batch(self, x, y, indices):
        foo = np.full([len(indices), 1], 1.234)
        return IMLBatch(
            x[indices],
            None if y is None else y[indices],
            others={"foo": foo},
        )

@register_ml_data("inject_foo_data", allow_duplicate=True)
class InjectFooData(IMLData):
    processor_type = "inject_foo_processor"

@cflearn.register_ml_module("inject_foo_model", allow_duplicate=True)
class InjectFooModel(nn.Module):
    def __init__(self, in_dim, out_dim):
        super().__init__()
        self.linear = nn.Linear(in_dim, out_dim)
    
    def forward(self, batch):
        print("> foo")
        print(batch["foo"][:3])
        return self.linear(batch[cflearn.INPUT_KEY])

m = cflearn.api.fit_ml(
    InjectFooData(data_path),
    core_name="inject_foo_model",
    is_classification=True,
    # debug
    debug=True,
)

                                Internal Default Configurations Used by `carefree-learn`                                
------------------------------------------------------------------------------------------------------------------------
                             train_samples   |   150
                             valid_samples   |   None
                         max_snapshot_file   |   25
                                 input_dim   |   4
                                output_dim   |   3
                                 loss_name   |   focal
                                 workplace   |   _logs\2022-08-19_10-37-05-191141
                             monitor_names   |   ['mean_std', 'plateau']
                      additional_callbacks   |   ['_log_metrics_msg', '_inject_loader_name']
                   log_metrics_msg_verbose   |   True
                              metric_names   |   ['acc', 'auc']
----------------------------------------------------------------------------

## `postprocess_batch`

```python
# changes can happen inplace
def postprocess_batch(self, batch: np_dict_type) -> np_dict_type:
    return batch
```

This method allows you to postprocess the batch.
> In most cases, specifying `fetch_batch` is already enough.

## `postprocess_results`

```python
# changes can happen inplace
def postprocess_results(
    self,
    forward: np_dict_type,
    *,
    return_classes: bool,
    binary_threshold: float,
    return_probabilities: bool,
) -> np_dict_type:
    return forward
```

This method allows you to postprocess the inference results.
> Notice that:
> - the arguments of this method is 'optional'.
> - this will only affect the inference methods (e.g. `predict`) and will not affect the training process.

- `forward`: the 'raw' inference results.
- `return_classes`: whether we need to return class labels instead of the 'raw' results (e.g. logits).
- `binary_threshold`: threshold used in binary classification tasks.
- `return_probabilities`: whether we need to return the probability predictions.

Although there are some flags in this method (`return_classes`, etc.), they are just telling you what kinds of data `forward` will hold, and do not require you to handle them because they have already been handled. For example:

In [13]:
@register_ml_data_processor("inspect_processor", allow_duplicate=True)
class InspectProcessor(MyFancyProcessor):    
    def postprocess_results(self, forward, *, return_classes, binary_threshold, return_probabilities):
        print("> return_classes", return_classes)
        print("> binary_threshold", binary_threshold)
        print("> return_probabilities", return_probabilities)
        print(forward[cflearn.PREDICTIONS_KEY][:3])
        return forward

@register_ml_data("inspect_data", allow_duplicate=True)
class InspectData(IMLData):
    processor_type = "inspect_processor"


m = cflearn.api.fit_ml(
    InspectData(data_path),
    is_classification=True,
)

                                Internal Default Configurations Used by `carefree-learn`                                
------------------------------------------------------------------------------------------------------------------------
                             train_samples   |   150
                             valid_samples   |   None
                         max_snapshot_file   |   25
                                 input_dim   |   4
                                output_dim   |   3
                                 loss_name   |   focal
                                 workplace   |   _logs\2022-08-19_10-37-05-235147
                             monitor_names   |   ['mean_std', 'plateau']
                      additional_callbacks   |   ['_log_metrics_msg', '_inject_loader_name']
                   log_metrics_msg_verbose   |   True
                              metric_names   |   ['acc', 'auc']
----------------------------------------------------------------------------

In [14]:
idata = m.make_inference_data(data_path)
predictions = m.predict(idata)

> return_classes False
> binary_threshold 0.5
> return_probabilities False
[[ 0.07631765 -0.08687203  0.01616419]
 [ 0.07477259 -0.06446083  0.01953134]
 [ 0.07901555 -0.07111384  0.01131298]]


In [15]:
predictions = m.predict(idata, return_classes=True)

> return_classes True
> binary_threshold 0.5
> return_probabilities False
[[0]
 [0]
 [0]]


In [16]:
predictions = m.predict(idata, return_probabilities=True)

> return_classes False
> binary_threshold 0.5
> return_probabilities True
[[0.3582881  0.30434066 0.33737123]
 [0.3550781  0.3089268  0.33599508]
 [0.35776448 0.30789092 0.33434463]]


As shown above, when it comes to the `postprocess_results` method, the `forward` is already processed with those flags, so we can focus on the 'real' postprocess procudure in the `postprocess_results` method.