In [1]:
from zntrack import config

In [2]:
config.nb_name = "Workflow.ipynb"

In [3]:
import pathlib

import kaggle
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.utils import to_categorical
from zntrack import Node, NodeConfig, dvc, nodify, utils, zn
from zntrack.core import ZnTrackOption

2022-04-26 10:11:56.013829: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-04-26 10:11:56.013899: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


## Download the dataset using the `kaggle` package.
The dataset can be found at https://www.kaggle.com/datamunge/sign-language-mnist

1. Define the function and configure the DVC options via `@nodify`
2. call the function without arguments to add it to the DVC graph

you might have to set up your kaggle account or download the dataset manually into `dataset/` and skip this Node (by not calling `download_kaggle()`)

```python
import pathlib
import json
import getpass

kaggle_dir = pathlib.Path.home() / ".kaggle"
kaggle_dir.mkdir(exist_ok=True, parents=True)
kaggle_file = kaggle_dir / "kaggle.json"

username = input()
key = getpass.getpass()
_ = kaggle_file.write_text(json.dumps({"username": username,"key":key}))
```

In [4]:
@nodify(
    outs="dataset",
    params={"dataset": "datamunge/sign-language-mnist"}
)
def download_kaggle(cfg: NodeConfig):
    """Download dataset from kaggle"""
    kaggle.api.dataset_download_files(
        dataset=cfg.params.dataset, path=cfg.outs, unzip=True
    )


download_kaggle()

Submit issues to https://github.com/zincware/ZnTrack.


NodeConfig(params={'dataset': 'datamunge/sign-language-mnist'}, outs='dataset', outs_no_cache=None, outs_persist=None, outs_persist_no_cache=None, metrics=None, metrics_no_cache=None, deps=None, plots=None, plots_no_cache=None)

## Data Preprocessing
We use the class based API for a better structure and splitting up the steps into methods.
Additionally, we add a method `plot_image` which allows us to look at an arbitrary dataset point.

In [5]:
# zntrack: break
class DataPreprocessor(Node):
    """Prepare kaggle dataset for training

    * normalize and reshape the features
    * one-hot encode the labels
    """
    # dependencies and parameters
    data: pathlib.Path = dvc.deps(pathlib.Path("dataset"))
    dataset = zn.params("sign_mnist_train")
    # outputs
    features: np.ndarray = zn.outs()
    labels: np.ndarray = zn.outs()

    def run(self):
        """Primary Node Method"""
        df = pd.read_csv((self.data / self.dataset / self.dataset).with_suffix(".csv"))

        self.labels = df.values[:, 0]
        self.labels = to_categorical(self.labels)
        self.features = df.values[:, 1:]

        self.normalize_and_scale_data()

    def normalize_and_scale_data(self):
        self.features = self.features / 255
        self.features = self.features.reshape((-1, 28, 28, 1))

    def plot_image(self, index):
        plt.imshow(self.features[index])
        plt.title(f"Label {self.labels[index].argmax()}")
        plt.show()


DataPreprocessor().write_graph()



## Custom ZnTrackOption + Training Model

For the model we define a custom ZnTrackOption called `TFModel` which allows us to serialize a TensorFlow model.
We use the dvc_option `--outs` and use the zn_type `RESULTS`. These should be the prefered values for most custom serializations.

We can overwrite the `get_filename`, `save` and `get_data_from_files` methods as shown to save / load a TensorFlow model.

In [6]:
# zntrack: break
class TFModel(ZnTrackOption):
    dvc_option = "outs"
    zn_type = utils.ZnTypes.RESULTS

    def get_filename(self, instance) -> pathlib.Path:
        """Filename depending on the instance node_name"""
        return pathlib.Path("nodes", instance.node_name, "model")

    def save(self, instance):
        """Serialize and save values to file"""
        model = self.__get__(instance, self.owner)
        file = self.get_filename(instance)
        model.save(file)

    def get_data_from_files(self, instance):
        """Load values from file and deserialize"""
        file = self.get_filename(instance)
        model = keras.models.load_model(file)
        return model

with this custom Type we can define `model = TFModel()` and use it similar to the other `zn.<options>` but passing it a TensorFlow model.
Note: You can also register a custom `znjson` de/serializer and use `zn.outs` instead.

In this simple example we only define the epochs as parameters. For a more advanced Node you would try to catch all parameters, such as layer types, neurons, ... as `zn.params`.

In [7]:
class MLModel(Node):
    # dependencies
    train_data: DataPreprocessor = zn.deps(DataPreprocessor)
    # outputs
    training_history = zn.plots()
    metrics = zn.metrics()
    # custom model output
    model = TFModel()
    # parameter
    epochs = zn.params()
    filters = zn.params([4])
    dense = zn.params([4])

    def __init__(self, epochs: int = 3, **kwargs):
        super().__init__(**kwargs)
        self.epochs = epochs

        self.optimizer = "adam"

    def run(self):
        """Primary Node Method"""
        self.build_model()
        self.train_model()

    def train_model(self):
        """Train the model"""
        self.model.compile(
            optimizer=self.optimizer,
            loss="categorical_crossentropy",
            metrics=["accuracy"],
        )

        print(self.model.summary())

        history = self.model.fit(
            self.train_data.features,
            self.train_data.labels,
            validation_split=0.3,
            epochs=self.epochs,
            batch_size=64,
        )
        self.training_history = pd.DataFrame(history.history)
        self.training_history.index.name = "epoch"
        # use the last values for model metrics
        self.metrics = dict(self.training_history.iloc[-1])

    def build_model(self):
        """Build the model using keras.Sequential API"""

        inputs = keras.Input(shape=(28, 28, 1))
        cargo = inputs
        for filters in self.filters:
            cargo = layers.Conv2D(
                filters=filters, kernel_size=(3, 3), padding="same", activation="relu"
            )(cargo)
            cargo = layers.MaxPooling2D((2, 2))(cargo)

        cargo = layers.Flatten()(cargo)

        for dense in self.dense:
            cargo = layers.Dense(dense, activation="relu")(cargo)

        output = layers.Dense(25, activation="softmax")(cargo)

        self.model = keras.Model(inputs=inputs, outputs=output)

MLModel().write_graph()



## Process Test Data

We haven't processed our test data yet. We can use the same `DataPreprocessor` Node that we defined previously but give it a different name and pass the test dataset as parameter instead.

In [8]:
DataPreprocessor(dataset="sign_mnist_test", name="data_preprocess_test").write_graph()



## Evaluate the Model
We define an additional Node to evaluate the model against the test data. Here we use the `DataPreprocessor` as dependency.
Because we gave the test data Node a special name we can not use `DataPreprocessor` but must use `DataPreprocessor.load(name=<nodename>)` instead.

In [9]:
# zntrack: break
class EvaluateModel(Node):
    # dependencies
    ml_model: keras.Model = zn.deps(MLModel @ "model")
    test_data: DataPreprocessor = zn.deps()
    # metrics
    metrics = zn.metrics()
    confusion_matrix = zn.plots(template="confusion",x="predicted", y="actual")

    def run(self):
        """Primary Node Method"""
        loss, accuracy = self.ml_model.evaluate(
            self.test_data.features, self.test_data.labels
        )
        self.metrics = {"loss": loss, "accuracy": accuracy}

        prediction = self.ml_model.predict(self.test_data.features)

        self.confusion_matrix = pd.DataFrame([{"actual": np.argmax(true), "predicted": np.argmax(false)} for true, false in zip(self.test_data.labels, prediction)])

EvaluateModel(
    test_data=DataPreprocessor["data_preprocess_test"]
).write_graph()



# The Graph
We can have a brief look at the generated DAG

In [10]:
!dvc dag

                +-----------------+                    
                | download_kaggle |                    
                +-----------------+                    
                 ***            ***                    
               **                  **                  
             **                      **                
+------------------+                   **              
| DataPreprocessor |                    *              
+------------------+                    *              
          *                             *              
          *                             *              
          *                             *              
    +---------+             +----------------------+   
    | MLModel |             | data_preprocess_test |   
    +---------+**           +----------------------+   
                 ***            ***                    
                    **        **                       
                      **    ** 