# MindAlpha Tutorial

In this tutorial, we show how to use MindAlpha in the production environment.

Before you proceed, please make sure you have uploaded the demo dataset to your own s3 bucket. See the **Prepare Data** section in [MindAlpha Getting Started](mindalpha-getting-started.ipynb) for instructions. In the rest of the article, we assume the demo dataset has been uploaded to ``s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/data/``. You should replace ``{YOUR_S3_BUCKET}`` and ``{YOUR_S3_PATH}`` with actual values before executing code cells containing these placeholders.

The ``schema`` directory contains configuration files for ``ma.EmbeddingSumConcat`` operators and must also be uploaded to s3. In the rest of the article, we assume the ``schema`` directory has been uploaded to ``s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/schema/``.

If uploading hasn't been done, you can open a terminal by selecting the ``File`` -> ``New`` -> ``Terminal`` menu item and executing Bash commands similar to the following in it to upload these files to your own s3 bucket.

```text
aws s3 cp --recursive ${PWD}/data/ s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/data/
aws s3 cp --recursive ${PWD}/schema/ s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/schema/
```

In [None]:
S3_ROOT_DIR = 's3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/'

## Define the Model

Let's define our neural network model as the following ``DemoModule`` class. The is the same ``DemoModule`` class defined in [MindAlpha Getting Started](mindalpha-getting-started.ipynb).

In [None]:
import torch
import mindalpha as ma

class DemoModule(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self._embedding_size = 16
        self._schema_dir = S3_ROOT_DIR + 'demo/schema/'
        self._column_name_path = self._schema_dir + 'column_name_demo.txt'
        self._combine_schema_path = self._schema_dir + 'combine_schema_demo.txt'
        self._sparse = ma.EmbeddingSumConcat(self._embedding_size, self._column_name_path, self._combine_schema_path)
        self._sparse.updater = ma.FTRLTensorUpdater()
        self._sparse.initializer = ma.NormalTensorInitializer(var=0.01)
        self._dense = torch.nn.Sequential(
            ma.nn.Normalization(self._sparse.feature_count * self._embedding_size),
            torch.nn.Linear(self._sparse.feature_count * self._embedding_size, 1024),
            torch.nn.ReLU(),
            torch.nn.Linear(1024, 512),
            torch.nn.ReLU(),
            torch.nn.Linear(512, 1),
        )

    def forward(self, x):
        x = self._sparse(x)
        x = self._dense(x)
        return torch.sigmoid(x)

## Define the train function

Now we define the ``train()`` function to wrap all the logic in one function to make it easier to develop the model in Jupyter locally and submit it to Airflow later. This basicly combines the training and evaluation steps into a single function.

Most of the code fragments have been shown in [MindAlpha Getting Started](mindalpha-getting-started.ipynb).

``model_in_path`` specifies where to load a previously trained model from. With ``model_in_path`` and ``model_out_path``, we can train the model incrementally. If ``model_in_path`` is ``None``, the model is randomly initialized and trained from scratch.

``model_export_path``, ``model_version`` and ``experiment_name`` are used for model exporting. An exported model can be loaded by MindAlpha Serving for online prediction. For the exported model to be found by MindAlpha Serving, we need to set ``consul_host``, ``consul_port`` and ``consul_endpoint_prefix`` and call the ``PyTorchModel.publish()`` method.

``max_sparse_feature_age`` limits the existence of sparse features. If the embedding vector of a sparse feature is not updated for more than ``max_sparse_feature_age`` periods, it will finally be cleared out from the model. The value of ``max_sparse_feature_age`` should be adjusted to match model training frequency. For example, for daily training data, ``max_sparse_feature_age == 15`` means 15 days, whereas, for hourly training data, ``max_sparse_feature_age == 15 * 24`` means 15 days.

We use Python's with statement to ensure the invocation of the ``stop()`` method of ``spark_session``.

In [None]:
def train(local=True,
          batch_size=100,
          worker_count=1,
          server_count=1,
          worker_cpu=1,
          server_cpu=1,
          worker_memory='5G',
          server_memory='5G',
          coordinator_memory='5G',
          module_class=None,
          model_in_path=None,
          model_out_path=None,
          model_export_path=None,
          model_version=None,
          experiment_name=None,
          input_label_column_index=0,
          delimiter='\t',
          train_dataset_path=None,
          test_dataset_path=None,
          is_catchup=True,
          consul_host=None,
          consul_port=None,
          consul_endpoint_prefix=None,
          max_sparse_feature_age=15,
          metric_update_interval=10,
         ):
    import pyspark
    import mindalpha as ma
    if module_class is None:
        module_class = DemoModule
    print('local: %s' % local)
    print('batch_size: %d' % batch_size)
    print('worker_count: %d' % worker_count)
    print('server_count: %d' % server_count)
    print('worker_cpu: %d' % worker_cpu)
    print('server_cpu: %d' % server_cpu)
    print('worker_memory: %s' % worker_memory)
    print('server_memory: %s' % server_memory)
    print('coordinator_memory: %s' % coordinator_memory)
    print('module_class: %s' % module_class)
    print('model_in_path: %s' % model_in_path)
    print('model_out_path: %s' % model_out_path)
    print('model_export_path: %s' % model_export_path)
    print('model_version: %s' % model_version)
    print('experiment_name: %s' % experiment_name)
    print('input_label_column_index: %d' % input_label_column_index)
    print('delimiter: %r' % delimiter)
    print('train_dataset_path: %s' % train_dataset_path)
    print('test_dataset_path: %s' % test_dataset_path)
    print('is_catchup: %s' % is_catchup)
    print('consul_host: %s' % consul_host)
    print('consul_port: %s' % consul_port)
    print('consul_endpoint_prefix: %s' % consul_endpoint_prefix)
    print('max_sparse_feature_age: %d' % max_sparse_feature_age)
    print('metric_update_interval: %d' % metric_update_interval)
    module = module_class()
    estimator = ma.PyTorchEstimator(module=module,
                                    worker_count=worker_count,
                                    server_count=server_count,
                                    model_in_path=model_in_path,
                                    model_out_path=model_out_path,
                                    model_export_path=model_export_path,
                                    model_version=model_version,
                                    experiment_name=experiment_name,
                                    input_label_column_index=input_label_column_index,
                                    consul_host=consul_host,
                                    consul_port=consul_port,
                                    consul_endpoint_prefix=consul_endpoint_prefix,
                                    max_sparse_feature_age=max_sparse_feature_age,
                                    metric_update_interval=metric_update_interval,
                                   )
    spark_session = ma.spark.get_session(local=local,
                                         batch_size=batch_size,
                                         worker_count=estimator.worker_count,
                                         server_count=estimator.server_count,
                                         worker_cpu=worker_cpu,
                                         server_cpu=server_cpu,
                                         worker_memory=worker_memory,
                                         server_memory=server_memory,
                                         coordinator_memory=coordinator_memory,
                                        )
    with spark_session:
        train_dataset = ma.input.read_s3_csv(spark_session, train_dataset_path, delimiter=delimiter,
                                             shuffle=True, num_workers=estimator.worker_count)
        model = estimator.fit(train_dataset)
        if test_dataset_path is not None:
            test_dataset = ma.input.read_s3_csv(spark_session, test_dataset_path, delimiter=delimiter)
            result = model.transform(test_dataset)
            evaluator = pyspark.ml.evaluation.BinaryClassificationEvaluator()
            test_auc = evaluator.evaluate(result)
            print('test_auc: %g' % test_auc)
        if not is_catchup and model.consul_endpoint_prefix is not None:
            model.publish()

We can use example paths to call the ``train()`` function to test our model. Later, if you changed the class definition of the model, you can call the ``train()`` function again to test, which makes it convenient to develop the model in Jupyter interactively.

In [None]:
model_out_path = S3_ROOT_DIR + 'demo/output/dev/model_out/'
train_dataset_path = S3_ROOT_DIR + 'demo/data/train/day_0_0.001_train.csv'
test_dataset_path = S3_ROOT_DIR + 'demo/data/test/day_0_0.001_test.csv'
train(model_out_path=model_out_path,
      train_dataset_path=train_dataset_path,
      test_dataset_path=test_dataset_path)

## Schedule model training

To schedule model training, let's define the following variables.

The first group of variables identify our experiment. ``business_name`` is the name of the machine learning application in your organization. ``experiment_name`` specifies a name for the iteration of the model. ``job_name`` specifies a name for the machine learning task to distinguish it from other tasks in the Airflow DAG (such as data preprocessing tasks).

The second group of variables are related to Airflow. ``owner`` specifies Airflow DAG owner. ``schedule_interval`` specifies the scheduling frequency of model training. ``backfill_start_date`` and ``backfill_end_date`` specify the start and end date of the model backfill process, which are Python ``datetime.datetime`` in UTC timezone actually, but we name them "dates" to match the terms of Airflow.

The demo dataset contains data of 24 days. The rest of the document uses daily data to simulate 5 minutes ``schedule_interval`` model training so that the running of the demo can finish quickly. We compute ``backfill_start_date`` and ``backfill_end_date`` based on the current time so that the backfill process starts from 15 minutes (3 schedule intervals) ago and ends at 15 minutes (3 schedule intervals also) later from now. ``online_start_date`` is 20 minutes later from now. In real applications, ``backfill_start_date`` and ``backfill_end_date`` may be specified directly instead of computed, and you may would like to use an ``'@hourly'`` or ``'@daily'`` ``schedule_interval``.

**Note**

1. It's important to specify a **future** time for ``backfill_end_date``, otherwise there may be gaps between the end of the backfill process and the start of the first online task, and the ``model_in_path`` of the first online task will be incorrect.
2. ``backfill_start_date``, ``backfill_end_date`` and ``online_start_date`` must be timezone-aware ``datetime.datetime``s. Non-UTC timezone can be used, but Airflow always passes timezone-aware ``datetime.datetime`` in UTC timezone for the ``execution_date`` parameter of the ``execute()`` function mentioned later.

In [None]:
import datetime
business_name = 'jupyter_doc'
experiment_name = 'ma_tutorial'
job_name = 'train'
owner = 'admin'
schedule_interval = datetime.timedelta(minutes=5)
utc_now = datetime.datetime.utcnow().timestamp()
backfill_timestamp = int(utc_now / schedule_interval.total_seconds()) * schedule_interval.total_seconds()
backfill_start_date = datetime.datetime.fromtimestamp(backfill_timestamp, tz=datetime.timezone.utc) - schedule_interval * 3
backfill_end_date = backfill_start_date + schedule_interval * 5
online_start_date = backfill_end_date + schedule_interval
print(f'backfill_start_date: {backfill_start_date}')
print(f'backfill_end_date  : {backfill_end_date}')
print(f'online_start_date  : {online_start_date}')

For the trained model to be loaded by MindAlpha Serving for online prediction, the following variables should be set properly, see the documentation of MindAlpha Serving fore more information.

In [None]:
consul_host = None
consul_port = None
consul_endpoint_prefix = None

# consul_host = 'consul-host.example.com'
# consul_port = 8500
# consul_endpoint_prefix = 'demo/mindalpha-models'

Next, we define the ``execute()`` function to be invoked by Airflow every ``schedule_interval``.

``execute()`` uses ``execution_date`` to compute the arguments of ``train()`` and then calls ``train()`` with the computed arguments to train a model for ``execution_date``. Be careful to set ``model_in_path`` to ``None`` for the first model, otherwise ``train()`` will fail.

The demo code here computes ``data_part_index`` at first and then uses it to compute dateset paths. In real applications, we can usually compute dataset paths directly based on ``execution_date``.

The ``is_catchup`` parameter will be ``True`` for backfill invocation and ``False`` for online invocation.

In [None]:
def execute(execution_date, is_catchup, **kwargs):
    import datetime
    print('Train model for %s' % execution_date.isoformat())
    train_dataset_dir = S3_ROOT_DIR + 'demo/data/train/'
    test_dataset_dir = S3_ROOT_DIR + 'demo/data/test/'
    output_dir = S3_ROOT_DIR + 'demo/output/model_out/'
    export_dir = S3_ROOT_DIR + 'demo/output/model_export/'
    data_part_index = int(round((execution_date - backfill_start_date).total_seconds() / schedule_interval.total_seconds()))
    model_version_format = '%Y%m%d%H%M'
    model_version = execution_date.strftime(model_version_format)
    train_dataset_path = train_dataset_dir + 'day_%d_0.001_train.csv' % data_part_index
    test_dataset_path = test_dataset_dir + 'day_%d_0.001_test.csv' % data_part_index
    model_in_path = None
    if data_part_index > 0:
        previous_execution_date = execution_date - schedule_interval
        previous_model_version = previous_execution_date.strftime(model_version_format)
        model_in_path = output_dir + '%s/' % previous_model_version
    model_out_path = output_dir + '%s/' % model_version
    model_export_path = export_dir + '%s/' % model_version
    train(local=False,
          batch_size=100,
          worker_count=10,
          server_count=10,
          module_class=DemoModule,
          model_in_path=model_in_path,
          model_out_path=model_out_path,
          model_export_path=model_export_path,
          model_version=model_version,
          experiment_name=experiment_name,
          train_dataset_path=train_dataset_path,
          test_dataset_path=test_dataset_path,
          is_catchup=is_catchup,
          consul_host=consul_host,
          consul_port=consul_port,
          consul_endpoint_prefix=consul_endpoint_prefix,
         )

Now, we can create an ``ma.experiment.Experiment`` object and call its ``submit_backfill()`` method to let Airflow schedule training models for history data.

In [None]:
experiment = ma.experiment.Experiment(business_name=business_name,
                                      experiment_name=experiment_name,
                                      job_name=job_name,
                                      owner=owner,
                                      schedule_interval=schedule_interval,
                                      func=execute,
                                      start_date=backfill_start_date,
                                      end_date=backfill_end_date,
                                      extra_dag_conf={'depends_on_past': True})
experiment.submit_backfill()

To schedule training models for real-time data, we can create an ``ma.experiment.Experiment`` object and call its ``submit_backfill()`` method.

In [None]:
experiment = ma.experiment.Experiment(business_name=business_name,
                                      experiment_name=experiment_name,
                                      job_name=job_name,
                                      owner=owner,
                                      schedule_interval=schedule_interval,
                                      func=execute,
                                      start_date=online_start_date,
                                      extra_dag_conf={'depends_on_past': True})
experiment.submit_online()

## Summary

We illustrated how to use MindAlpha in the production environment. We defined the ``train()`` function which can be used to develop models in Jupyter notebook interactively and called by ``execute()`` to train models incrementally.