# MetaSpore Getting Started

MetaSpore is a machine learning platform, which provides a one-stop solution for data preprocessing, model training and online prediction.

In this article, we introduce the basic API of MetaSpore briefly.

## Prepare Data

We use the publicly available dataset [Terabyte Click Logs](https://labs.criteo.com/2013/12/download-terabyte-click-logs-2/) published by CriteoLabs as our demo dataset.

We sample the dataset with sampling rate 0.001 so that the running of the demo can finish quickly. More information about the demo dataset can be found in [MetaSpore Demo Dataset](https://ks3-cn-beijing.ksyuncs.com/dmetasoul-bucket/demo/criteo/index.html).

Execute the following cell to download the demo dataset into the working directory. Those data files take up about 2.1 GiB disk space and the downloading process may take sveral minutes. If the downloading fails, please refer to [MetaSpore Demo Dataset](https://ks3-cn-beijing.ksyuncs.com/dmetasoul-bucket/demo/criteo/index.html) and download the dataset manually.

In [1]:
import metaspore
metaspore.demo.download_dataset()

MetaSpore demo dataset already downloaded




You can check the downloaded dataset by executing the following cell.

In [2]:
!ls -l ${PWD}/data/

total 8
drwxrwxr-x 2 ec2-user ec2-user 4096 Jul  6 03:32 test
drwxrwxr-x 2 ec2-user ec2-user 4096 Jul  6 03:30 train


(Optional) To upload the dataset to your own s3 bucket:

1. Fill ``{YOUR_S3_BUCKET}`` and ``{YOUR_S3_PATH}`` with your preferred values in the following cell.
2. Uncomment the cell by removing the leading ``#`` character.
3. Execute the cell.

In [3]:
#!aws s3 cp --recursive ${PWD}/data/ s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/data/

Alternatively, you can open a terminal by selecting the ``File`` -> ``New`` -> ``Terminal`` menu item and executing Bash commands in it.

You can check the uploaded dataset in your s3 bucket by uncommenting and executing the following cell.

In [4]:
#!aws s3 ls s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/data/

The ``schema`` directory contains configuration files and must also be uploaded to s3 so that the model can be trained in cluster environment. 

In [5]:
#!aws s3 cp --recursive ${PWD}/schema/ s3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo/schema/

In the rest of the article, we assume the demo dataset and schemas has been uploaded to `ROOT_DIR`.

In [6]:
# ROOT_DIR = 's3://{YOUR_S3_BUCKET}/{YOUR_S3_PATH}/demo'
ROOT_DIR = '.'

## Define the Model

We can define our neural network model by subclassing ``torch.nn.Module`` as usual PyTorch models. The following ``DemoModule`` class provides an example.

Compared to usual PyTorch models, the notable difference is the ``_sparse`` layer created by instantiating ``ms.EmbeddingSumConcat`` which takes an embedding size and paths of two text files. ``ms.EmbeddingSumConcat`` makes it possible to define large-scale sparse models in PyTorch, which is a distinguishing feature of MetaSpore.

The ``_schema_dir`` field is an s3 directory which makes it possible to use the ``DemoModule`` class in cluster environment.

In [7]:
import torch
import metaspore as ms

class DemoModule(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self._embedding_size = 16
        self._schema_dir = ROOT_DIR + '/schema/'
        self._column_name_path = self._schema_dir + 'column_name_demo.txt'
        self._combine_schema_path = self._schema_dir + 'combine_schema_demo.txt'
        self._sparse = ms.EmbeddingSumConcat(self._embedding_size, self._column_name_path, self._combine_schema_path)
        self._sparse.updater = ms.FTRLTensorUpdater()
        self._sparse.initializer = ms.NormalTensorInitializer(var=0.01)
        self._dense = torch.nn.Sequential(
            ms.nn.Normalization(self._sparse.feature_count * self._embedding_size),
            torch.nn.Linear(self._sparse.feature_count * self._embedding_size, 1024),
            torch.nn.ReLU(),
            torch.nn.Linear(1024, 512),
            torch.nn.ReLU(),
            torch.nn.Linear(512, 1),
        )

    def forward(self, x):
        x = self._sparse(x)
        x = self._dense(x)
        return torch.sigmoid(x)

Instantiating the ``DemoModule`` class to define our PyTorch model.

In [8]:
module = DemoModule()

[32mloaded combine schema from[m [32mcombine schema file [m'./schema/combine_schema_demo.txt'[2024-07-06 03:34:31.240] [info] [local_filesys.cpp:116] Opening local file ./schema/combine_schema_demo.txt with mode r

integer_feature_1
[2024-07-06 03:34:31.240] [info] add expr bkdr_hash(integer_feature_1, StringBKDRHashFunctionOption::name=integer_feature_1)
integer_feature_2
[2024-07-06 03:34:31.240] [info] add expr bkdr_hash(integer_feature_2, StringBKDRHashFunctionOption::name=integer_feature_2)
integer_feature_3
[2024-07-06 03:34:31.240] [info] add expr bkdr_hash(integer_feature_3, StringBKDRHashFunctionOption::name=integer_feature_3)
integer_feature_4
[2024-07-06 03:34:31.240] [info] add expr bkdr_hash(integer_feature_4, StringBKDRHashFunctionOption::name=integer_feature_4)
integer_feature_5
[2024-07-06 03:34:31.240] [info] add expr bkdr_hash(integer_feature_5, StringBKDRHashFunctionOption::name=integer_feature_5)
integer_feature_6
[2024-07-06 03:34:31.240] [info] add expr bkdr_h

## Train the Model

To train our model, first we need to create a ``ms.PyTorchEstimator`` passing in several arguments including our PyTorch model ``module`` and the number of workers and servers.

``model_out_path`` specifies where to store the trained model.

``input_label_column_index`` specifies the column index of the label column in the dataset, which is ``0`` for the demo dataset.

In [9]:
model_out_path = ROOT_DIR + '/output/dev/model_out/'
estimator = ms.PyTorchEstimator(module=module,
                                worker_count=1,
                                server_count=1,
                                model_out_path=model_out_path,
                                experiment_name='0.1',
                                input_label_column_index=0)

Next, we create a Spark session by calling ``ms.spark.get_session()`` and load the training dataset by call ``ms.input.read_s3_csv()``.

``delimiter`` specifies the column delimiter of the dataset, which is the TAB character ``'\t'`` for the demo dataset.

We also need to pass column names because the csv files do not contain headers.

In [10]:
column_names = []
with open(f'{ROOT_DIR}/schema/column_name_demo.txt', 'r') as f:
    for line in f:
        column_names.append(line.split(' ')[1].strip())
print(column_names)

['label', 'integer_feature_1', 'integer_feature_2', 'integer_feature_3', 'integer_feature_4', 'integer_feature_5', 'integer_feature_6', 'integer_feature_7', 'integer_feature_8', 'integer_feature_9', 'integer_feature_10', 'integer_feature_11', 'integer_feature_12', 'integer_feature_13', 'categorical_feature_1', 'categorical_feature_2', 'categorical_feature_3', 'categorical_feature_4', 'categorical_feature_5', 'categorical_feature_6', 'categorical_feature_7', 'categorical_feature_8', 'categorical_feature_9', 'categorical_feature_10', 'categorical_feature_11', 'categorical_feature_12', 'categorical_feature_13', 'categorical_feature_14', 'categorical_feature_15', 'categorical_feature_16', 'categorical_feature_17', 'categorical_feature_18', 'categorical_feature_19', 'categorical_feature_20', 'categorical_feature_21', 'categorical_feature_22', 'categorical_feature_23', 'categorical_feature_24', 'categorical_feature_25', 'categorical_feature_26']


In [11]:
train_dataset_path = ROOT_DIR + '/data/train/day_0_0.001_train.csv'

spark_session = ms.spark.get_session(local=True,
                                     batch_size=100,
                                     worker_count=estimator.worker_count,
                                     server_count=estimator.server_count,
                                     log_level='INFO',
                                     spark_confs={'spark.eventLog.enabled':'true'})
train_dataset = ms.input.read_s3_csv(spark_session, train_dataset_path, delimiter='\t', column_names=column_names)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/06 03:34:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/06 03:34:34 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
24/07/06 03:34:34 INFO SharedState: Warehouse path is 'file:/home/ec2-user/SageMaker/MetaSpore/tutorials/spark-warehouse'.
24/07/06 03:34:34 INFO InMemoryFileIndex: It took 41 ms to list leaf files for 1 paths.


ignore shuffle


Finally, we call the ``fit()`` method of ``ms.PyTorchEstimator`` to train our model. This will take several minutes and you can see the progress by looking at the output of the cell. The trained model is stored in ``model_out_path`` and the ``model`` variable.

In [12]:
train_dataset = train_dataset.limit(100)

In [None]:
model = estimator.fit(train_dataset)

[2024-07-06 03:34:35.701] [info] PS job with coordinator address 172.16.14.249:47275 started.
[2024-07-06 03:34:35.701] [info] PSRunner::RunPS: pid: 31787, tid: 5449, thread: 0x7f10ef598700
[2024-07-06 03:34:35.701] [info] PSRunner::RunPSCoordinator: pid: 31787, tid: 5449, thread: 0x7f10ef598700
[2024-07-06 03:34:35.702] [info] ActorProcess::Receiving: Coordinator pid: 31787, tid: 5454, thread: 0x7f10e7795700


24/07/06 03:34:35 INFO SparkContext: Starting job: collect at /home/ec2-user/anaconda3/envs/metaspore/lib/python3.8/site-packages/metaspore/agent.py:308
24/07/06 03:34:35 INFO SparkContext: Starting job: collect at /home/ec2-user/anaconda3/envs/metaspore/lib/python3.8/site-packages/metaspore/agent.py:291
24/07/06 03:34:35 INFO DAGScheduler: Got job 0 (collect at /home/ec2-user/anaconda3/envs/metaspore/lib/python3.8/site-packages/metaspore/agent.py:308) with 1 output partitions
24/07/06 03:34:35 INFO DAGScheduler: Final stage: ResultStage 0 (collect at /home/ec2-user/anaconda3/envs/metaspore/lib/python3.8/site-packages/metaspore/agent.py:308)
24/07/06 03:34:35 INFO DAGScheduler: Parents of final stage: List()
24/07/06 03:34:35 INFO DAGScheduler: Missing parents: List()
24/07/06 03:34:35 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[3] at collect at /home/ec2-user/anaconda3/envs/metaspore/lib/python3.8/site-packages/metaspore/agent.py:308), which has no missing parents
24/07/06 

[2024-07-06 03:34:37.551] [info] C[0]:9: The coordinator has connected to 1 servers and 1 workers.
PS Coordinator node [32mC[0]:9[m is ready.


24/07/06 03:34:37 INFO DAGScheduler: Missing parents: List()
24/07/06 03:34:37 INFO DAGScheduler: Submitting ResultStage 2 (PythonRDD[5] at RDD at PythonRDD.scala:53), which has no missing parents
24/07/06 03:34:37 INFO BlockManagerInfo: Removed broadcast_0_piece0 on ip-172-16-14-249.us-west-2.compute.internal:39421 in memory (size: 4.9 KiB, free: 2.8 GiB)
24/07/06 03:34:37 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 5.4 KiB, free 2.8 GiB)
24/07/06 03:34:37 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.3 KiB, free 2.8 GiB)
24/07/06 03:34:37 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on ip-172-16-14-249.us-west-2.compute.internal:39421 (size: 3.3 KiB, free: 2.8 GiB)
24/07/06 03:34:37 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1585
24/07/06 03:34:37 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (PythonRDD[5] at RDD at PythonRDD.scala:53) (first 15

## Evaluate the Model

To evaluate our model, we use the ``ms.input.read_s3_csv()`` function again to load the test dataset, passing in the column delimiter ``'\t'``.

In [None]:
test_dataset_path = ROOT_DIR + '/data/test/day_0_0.001_test.csv'
test_dataset = ms.input.read_s3_csv(spark_session, test_dataset_path, delimiter='\t', column_names=column_names)

Next, we call the ``model.transform()`` method to transform the test dataset, which will add a column named ``rawPrediction`` to the test dataset representing the predicted labels. For ease of integration with Spark MLlib, ``model.transform()`` will also add a column named ``label`` to the test dataset representing the actual labels.

Like the training process, this will take several minutes and you can see the progress by looking at the output of the cell. The transformed test dataset is stored in the ``result`` variable.

In [None]:
result = model.transform(test_dataset)

``result`` is a normal PySpark DataFrame and can be inspected by its methods.

In [None]:
result.show(5)

Finally, we use ``pyspark.ml.evaluation.BinaryClassificationEvaluator`` to compute test AUC.

In [None]:
import pyspark
evaluator = pyspark.ml.evaluation.BinaryClassificationEvaluator()
test_auc = evaluator.evaluate(result)
print('test_auc: %g' % test_auc)

When all computations are done, we should call the ``stop()`` method of ``spark_session`` to make sure all the resources are released.

In [None]:
spark_session.stop()

## Summary

We illustrated how to train and evaluate neural network model in MetaSpore. Users familiar with PyTorch and Spark MLlib should get started easily, which is the design goal of MetaSpore.