# Using VineyardRuntime for Efficient Intermediate Data Management in Data Processing Pipelines

## Overview
Today's BigData/AI applications often need to be realized using an end-to-end pipeline, as exemplified by the data operation flow of a risk control job shown in the following figure: First, order-related data needs to be exported from the database; subsequently, the graph computation engine will process these raw data to construct a user-product relationship graph and, through the graph algorithm, initially screen out the potential cheating gangs that are hidden therein; next, the machine learning algorithm will attribute cheating to these potential gangs and screen out more accurate results; finally, these results will be screened manually and finally make business processing.

![Workflow](./static/workflow.png)

In such a scenario, we often encounter the following problems:
1. Differences between development and production environments make the development and debugging of data workflows complex and inefficient:

    Data scientists develop data operations on their own computers using Python code, but then need to convert the code into YAML files in production environments that they are not familiar with to utilize Kubernetes-based workflow engines such as Argo, Tekton, and so on, which greatly reduces the efficiency of development and deployment, and brings risks resulting from the discrepancy between development and production environments.

2. It is necessary to introduce new distributed storage to realize intermediate temporary data exchange, which brings additional development, expense, operation and maintenance costs:

    The data exchange between subtasks of end-to-end tasks usually relies on distributed file systems or object storage systems (e.g., HDFS, S3, OSS), which makes the whole workflow require a lot of data format conversion and adaptation work, resulting in redundant I/O operations, and the use of a distributed storage system leads to additional costs due to the short-term nature of intermediate data.

3. The efficiency of data processing in a large-scale Kubernetes cluster environment:

    When using the existing distributed file system to process data in a large-scale Kubernetes cluster, the scheduling system lacks sufficient understanding of the read and write locality of the data, does not effectively consider the location of the data, and does not fully utilize the locality of the data, which results in the inability to avoid a large number of repetitive data pulling operations when processing data exchanges between nodes. This operation increases the I/O consumption and reduces the overall operational efficiency.

![workflow with vineyard](./static/workflow_with_vineyard.png)

To address the above problems with existing data flow operations in Big Data/AI, we combine Vineyard's data sharing mechanisms with Fluid's data orchestration capabilities.
1. Fluid's Python SDK enables easy orchestration of data flow, providing data scientists skilled in Python with an easy way to build and commit workflows centered on dataset operations. Specifically, data flows are managed through a single set of code in both development and production environments on the cloud.
2. Vineyard makes data sharing between tasks in end-to-end workflows more efficient by enabling zero-copy data sharing through memory mapping, which avoids the additional IO overheads that are key to data sharing efficiency gains.
3. By utilizing Fluid's data affinity scheduling capability, the Pod scheduling policy takes into account information about which nodes the data is written to, thus reducing the network overhead introduced by data migration and improving end-to-end performance.

## Code Example

In the next example, we will use VineyardRuntime and DataFlow in Fluid to show how to achieve efficient intermediate data management in the data processing pipeline.DataFlow is a built-in data flow orchestration capability provided by Fluid, which allows you to chain multiple data operations in the data processing pipeline to achieve a simple logical orchestration. For more advanced workflow orchestration capabilities, VineyardRuntime also supports integration with workflow orchestration engines such as Argo Workflow.

### 1. Dataset Preparation

In [None]:
%pip install oss2 numpy pandas

In [None]:
import os
os.environ["OSS_ACCESS_KEY_ID"] = "<YOUR_ACCESS_KEY>"
os.environ["OSS_ACCESS_KEY_SECRET"] = "<YOUR_ACCESS_SECRET>"

In [None]:
import numpy as np
import pandas as pd

# Fake data
num_rows = 600 * 1000
df = pd.DataFrame({
    'Id': np.random.randint(1, 100000, num_rows),
    'MSSubClass': np.random.randint(20, 201, size=num_rows),
    'LotFrontage': np.random.randint(50, 151, size=num_rows),
    'LotArea': np.random.randint(5000, 20001, size=num_rows),
    'OverallQual': np.random.randint(1, 11, size=num_rows),
    'OverallCond': np.random.randint(1, 11, size=num_rows),
    'YearBuilt': np.random.randint(1900, 2022, size=num_rows),
    'YearRemodAdd': np.random.randint(1900, 2022, size=num_rows),
    'MasVnrArea': np.random.randint(0, 1001, size=num_rows),
    'BsmtFinSF1': np.random.randint(0, 2001, size=num_rows),
    'BsmtFinSF2': np.random.randint(0, 1001, size=num_rows),
    'BsmtUnfSF': np.random.randint(0, 2001, size=num_rows),
    'TotalBsmtSF': np.random.randint(0, 3001, size=num_rows),
    '1stFlrSF': np.random.randint(500, 4001, size=num_rows),
    '2ndFlrSF': np.random.randint(0, 2001, size=num_rows),
    'LowQualFinSF': np.random.randint(0, 201, size=num_rows),
    'GrLivArea': np.random.randint(600, 5001, size=num_rows),
    'BsmtFullBath': np.random.randint(0, 4, size=num_rows),
    'BsmtHalfBath': np.random.randint(0, 3, size=num_rows),
    'FullBath': np.random.randint(0, 5, size=num_rows),
    'HalfBath': np.random.randint(0, 3, size=num_rows),
    'BedroomAbvGr': np.random.randint(0, 11, size=num_rows),
    'KitchenAbvGr': np.random.randint(0, 4, size=num_rows),
    'TotRmsAbvGrd': np.random.randint(0, 16, size=num_rows),
    'Fireplaces': np.random.randint(0, 4, size=num_rows),
    'GarageYrBlt': np.random.randint(1900, 2022, size=num_rows),
    'GarageCars': np.random.randint(0, 5, num_rows),
    'GarageArea': np.random.randint(0, 1001, num_rows),
    'WoodDeckSF': np.random.randint(0, 501, num_rows),
    'OpenPorchSF': np.random.randint(0, 301, num_rows),
    'EnclosedPorch': np.random.randint(0, 201, num_rows),
    '3SsnPorch': np.random.randint(0, 101, num_rows),
    'ScreenPorch': np.random.randint(0, 201, num_rows),
    'PoolArea': np.random.randint(0, 301, num_rows),
    'MiscVal': np.random.randint(0, 5001, num_rows),
    'TotalRooms': np.random.randint(2, 11, num_rows),
    "GarageAge": np.random.randint(1, 31, num_rows),
    "RemodAge": np.random.randint(1, 31, num_rows),
    "HouseAge": np.random.randint(1, 31, num_rows),
    "TotalBath": np.random.randint(1, 5, num_rows),
    "TotalPorchSF": np.random.randint(1, 1001, num_rows),
    "TotalSF": np.random.randint(1000, 6001, num_rows),
    "TotalArea": np.random.randint(1000, 6001, num_rows),
    'MoSold': np.random.randint(1, 13, num_rows),
    'YrSold': np.random.randint(2006, 2022, num_rows),
    'SalePrice': np.random.randint(50000, 800001, num_rows),
})

print("DataFrame initialized.")

import oss2
import io
from oss2.credentials import EnvironmentVariableCredentialsProvider
# Please set your OSS accessKeyID and accessKeySecret to the environment variables OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET respectively.
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
# Please replace OSS_ENDPOINT and BUCKET_NAME with your OSS Endpoint and Bucket
bucket = oss2.Bucket(auth, '<OSS_ENDPOINT>', '<OSS_BUCKET_NAME>')

bytes_buffer = io.BytesIO()
df.to_pickle(bytes_buffer)
bucket.put_object("df.pkl", bytes_buffer.getvalue())

### 2. Create Fluid Dataset and VineyardRuntime

In [1]:
# Setting fluidsdk logger level to DEBUG for detailed messages
import logging
import sys
logger = logging.getLogger("fluidsdk")
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(stream_handler)
logger.setLevel(logging.DEBUG)

In [2]:
import fluid

from fluid import constants
from fluid import models

# Connect to the Fluid using the default kubeconfig file and create a Fluid client instance
client_config = fluid.ClientConfig()
fluid_client = fluid.FluidClient(client_config)

# Create a Dataset named vineyard under default namespace
fluid_client.create_dataset(
    dataset_name="vineyard",
)

# Get the vineyard Dataset
dataset = fluid_client.get_dataset(dataset_name="vineyard")

# Initialize the configuration of the vineyard runtime and bind the vineyard Dataset to that runtime.
# The number of replicas is 2 and the memory is 30Gi respectively
dataset.bind_runtime(
    runtime_type=constants.VINEYARD_RUNTIME_KIND,
    replicas=2,
    cache_capacity_GiB=30,
    cache_medium="MEM",
    wait=True
)

2024-03-08 12:08:26,145 - fluidsdk - DEBUG - Dataset "default/vineyard" created


In the above code snippet:
- Creating Fluid Client: This code is responsible for establishing a connection to the Fluid using the default kubeconfig file and creating an instance of the Fluid client.
- Creating and Configuring the vineyard Dataset and Runtime: Next, the code creates a Dataset named Vineyard, then gets that Dataset and initializes the vineyard Runtime, setting the number of replicas and memory size and binding the dataset to the runtime environment.

### 3. Define the Fluid DataFlow

In [3]:
from kubernetes.client import models as k8s_models
# Define the task template and mount the OSS Volume
def create_processor(process_func, packages_to_install, pip_index_url):
    extra_volumes = k8s_models.V1Volume(
                name="data",
                persistent_volume_claim=k8s_models.V1PersistentVolumeClaimVolumeSource(
                    claim_name="pvc-oss"
                )
            )
    extra_volume_mount = k8s_models.V1VolumeMount(
                name="data",
                mount_path="/data"
            )
    
    from fluid.utils import processor as processor_utils
    debug_mode = True # Setting debug_mode to True for verbose
    processor = processor_utils.make_processor_from_func(process_func, packages_to_install=packages_to_install, pip_index_url=pip_index_url, volumes=[extra_volumes], volume_mounts=[extra_volume_mount], debug_mode=debug_mode)

    return processor

In the above code snippet:
- **Creating a task template:** The code encapsulates a task template function called `create_processor`, which takes a Python function object and automatically parses the contents of the code in the Python function object, and finally passes in the code as a startup command for some container. The `create_processor` function also sets the Python version (defaults to version 3.10, refer to `processor_utils.make_processor_from_func` for method signatures) and PyPI dependencies required to run the function. The container will also mount the OSS storage data source in the `/data` directory.

> Note: Before mounting an OSS storage data source you need to create a PersistentVolumeClaim (PVC) resource named `pvc-oss` in the cluster in advance and bind it to an OSS type PersistentVolume (PV). The PV needs to specify the path to the Bucket that is uploaded in the Dataset Preparation step.

In [4]:
# Define data preprocessing scripts
def preprocess():
    from sklearn.model_selection import train_test_split

    import pandas as pd
    import vineyard
    
    df = pd.read_pickle('/data/df.pkl')
    
    # Preprocess Data
    df = df.drop(df[(df['GrLivArea']>4800)].index)
    X = df.drop('SalePrice', axis=1)  # Features
    y = df['SalePrice']  # Target variable
    
    del df
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    
    del X, y
    
    vineyard.put(X_train, name="x_train", persist=True)
    vineyard.put(X_test, name="x_test", persist=True)
    vineyard.put(y_train, name="y_train", persist=True)
    vineyard.put(y_test, name="y_test", persist=True)


def train():
    from sklearn.linear_model import LinearRegression

    import joblib
    import pandas as pd
    import vineyard

    x_train_data = vineyard.get(name="x_train", fetch=True)
    y_train_data = vineyard.get(name="y_train", fetch=True)

    model = LinearRegression()
    model.fit(x_train_data, y_train_data)

    joblib.dump(model, '/data/model.pkl')


def test():
    from sklearn.linear_model import LinearRegression
    from sklearn.metrics import mean_squared_error

    import vineyard
    import joblib
    import pandas as pd

    x_test_data = vineyard.get(name="x_test", fetch=True)
    y_test_data = vineyard.get(name="y_test", fetch=True)

    model = joblib.load("/data/model.pkl")
    y_pred = model.predict(x_test_data)

    err = mean_squared_error(y_test_data, y_pred)

    with open('/data/output.txt', 'a') as f:
        f.write(str(err))


packages_to_install = ["numpy", "pandas", "pyarrow", "requests", "vineyard", "scikit-learn==1.4.0", "joblib==1.3.2"]
pip_index_url = "https://pypi.tuna.tsinghua.edu.cn/simple"

preprocess_processor = create_processor(preprocess, packages_to_install, pip_index_url)
train_processor = create_processor(train, packages_to_install, pip_index_url)
test_processor = create_processor(test, packages_to_install, pip_index_url)

The above code snippets define three steps in the data processing pipeline: data preprocessing, model training, and model testing, respectively. The Python functions corresponding to these three steps are passed into the `create_processor` function to be encapsulated into three processors.

In [5]:
# Task workflow for creating a linear regression model: data preprocessing -> model training -> model testing
# The following mount path "/var/run" is the default path for vineyard configuration files
flow = dataset.process(processor=preprocess_processor, dataset_mountpath="/var/run") \
              .process(processor=train_processor, dataset_mountpath="/var/run") \
              .process(processor=test_processor, dataset_mountpath="/var/run")

In [6]:
# Submit the data processing workflow for the linear regression model and wait for its completion
run = flow.run(run_id="linear-regression-with-vineyard")
run.wait()

2024-03-08 12:13:09,983 - fluidsdk - INFO - DataProcess linear-regression-with-vineyard-step1 completed
2024-03-08 12:15:26,417 - fluidsdk - INFO - DataProcess linear-regression-with-vineyard-step2 completed
2024-03-08 12:17:39,682 - fluidsdk - INFO - DataProcess linear-regression-with-vineyard-step3 completed


### 4. Clean Up

In [7]:
# 清理所有资源
dataset.clean_up(wait=True)