# CloudEdge DataEngineer (Inference Stage)

****Inference Scenarios****

| scenarios | reference app | framework | model/dataset |
| ---- | ---- | ---- | ---- |
| batch-inference-workflow | [scenarios/job-pipeline](https://github.com/peiniliu/inference/tree/dev/vision/classification_and_detection/scenarios/job-pipeline) | tensorflow | resnet/dumy |

## Architecture

Make sure to set these environment variables in your session with the proper values. All of them are mandatory except:
- `DOCKER_REGISTRY`: if you plan to push the images to a private registry
- `DOCKER_TAG`: if you don't want to leave the default `latest` tag
- `DOCKER_REGISTRY_USERNAME`: if your private registry requires authentication
- `DOCKER_REGISTRY_PASSWORD`: if your private registry requires authentication

In [None]:
# Only for debug purposes, don't leave them enable in the repository!!!
%env WORKDIR=/root/cloudskin/data-connector
%env REACTIVE_MIGRATION_DATAENGINEER_APP_DIR=examples/cloudedge-reactive-migration/dataengineer
%env SCANFLOW_SERVER_URI=http://10.0.26.8:32002
%env SCANFLOW_TRACKER_URI=http://10.0.26.8:32002
%env MLFLOW_S3_ENDPOINT_URL=http://10.0.26.8:32000
# MinIO API endpoint, not console!
%env AWS_ACCESS_KEY_ID=admin
%env AWS_SECRET_ACCESS_KEY=scanflow123
%env DOCKER_REGISTRY=registry.gitlab.bsc.es/datacentric-computing/cloudskin-project/cloudskin-registry
# If you use invalid characters for a tag, Scanflow will replace them with '-'
%env DOCKER_TAG=feat/reactive-migration
%env DOCKER_REGISTRY_USERNAME=cloudskin-scanflow-builds
%env DOCKER_REGISTRY_PASSWORD=sEPN9vEtSjCFodUvVa1M
# This is to avoid CI pipelines to deploy anything
%env LOCAL_DEPLOY=1

In [2]:
import sys
import os
sys.path.insert(0,'../..')

from scanflow.client import ScanflowClient
from scanflow.client import ScanflowTrackerClient
from scanflow.client import ScanflowDeployerClient

In [None]:
from scanflow.tools import env
print(env.get_env("SCANFLOW_SERVER_URI"))
print(env.get_env("SCANFLOW_TRACKER_URI"))
print(env.get_env("MLFLOW_S3_ENDPOINT_URL"))
print(env.get_env("AWS_ACCESS_KEY_ID"))
print(env.get_env("AWS_SECRET_ACCESS_KEY"))
print(env.get_env("DOCKER_REGISTRY"))
print(env.get_env("DOCKER_TAG"))

In [None]:
# App folder - Must point to the folder includeing all 'dataengineer' and 'datascience' folders
# for cloudedge-reactive-migration, allocated in examples/cloudedge-reactive-migration
app_dir = os.path.join(env.get_env('WORKDIR'), env.get_env('REACTIVE_MIGRATION_DATAENGINEER_APP_DIR'))
print(app_dir)
app_name = "cloudedge-reactive-migration"
team_name = "dataengineer"

# Initialize the Scanflow Client
client = ScanflowClient(
    #if you defined "SCANFLOW_SERVER_URI", you dont need to provide this
    registry=env.get_env("DOCKER_REGISTRY"),
    verbose=True)

## Batch-inference-graph for prediction

### Predictor

In [5]:
# Predictor stages
# - Executor 1: Data retrieval from Prometheus
# - Executor 2: Data pre-processing + QoS Predictor
executor_1 = client.ScanflowExecutor(
    name="data-retrieval",
    mainfile="data-retrieval.py",
    dockerfile="Dockerfile_data_retrieval_no_buildkit",
    parameters={
        'app_name': app_name,
        'team_name': team_name,
        #'promcsv_config': "/app/data-retrieval/promql_queries.json" # Config file already included in the Docker image
        'promcsv_config': "/workflow/promql_queries.json" # Config file for debug purposes, manually included in the workflow PVC
    }
)

executor_2 = client.ScanflowExecutor(
    name="qos-upload",
    mainfile="qos-upload.py",
    dockerfile="Dockerfile_qos_upload_no_buildkit",
    parameters={
        'name': "QoS preprocessing and upload",
        'app_name': app_name,
        'team_name': team_name,
        'csv_path': "/workflow/migration_experiment", # We expect each experiment run to store results at /workflow/migration_experiment/run_at_${execution_timestamp} folder
        'csv_sep': ";"
    }
)

# Stages dependencies
# TODO: define them once other stages have been developed
dependency_1 = client.ScanflowDependency(
    dependee='data-retrieval',
    depender='qos-upload'
)

# Predictor workflow: batch-inference-reactive-graph
# TODO: add missing executors and dependencies
workflow_1 = client.ScanflowWorkflow(
    name="batch-inference-reactive-graph",
    nodes=[executor_1, executor_2],
    edges=[dependency_1],
    type="batch",
    cron="*/5 * * * *",
    output_dir="/workflow",
    image_pull_secrets=["cloudskin-registry"] # Required for Workflow templates
)

### Planner

In [6]:
trigger = client.ScanflowAgentSensor_IntervalTrigger(minutes=5)
sensor = client.ScanflowAgentSensor(
    name="reactive_watch_qos",
    isCustom=True,
    func_name="reactive_watch_qos",
    trigger=trigger,
    kwargs={
        'frequency': 300
    }
)
planner = client.ScanflowAgent(
    name="planner",
    dockerfile="Dockerfile_scanflow_planner",
    template="planner",
    sensors=[sensor],
    image_pull_secret="cloudskin-registry" # Required when deploying to Kubernetes cluster (created during deployment)
)

### Compose the Scanflow Application

In [7]:
app = client.ScanflowApplication(
    app_name=app_name,
    app_dir=app_dir,
    team_name=team_name,
    workflows=[workflow_1],
    agents=[planner]
)

### DEBUG: show application config

In [8]:
#app.to_dict()

### Build the Scanflow Application
- This step builds the Docker images for all the Scanflow executors and uploads them to the container registry (currently hardcoded in the `scanflow` module)

In [None]:
# Define the Scanflow Tracker Port (32766)
build_app = client.build_ScanflowApplication(
    app=app,
    trackerPort=32766,
    image_pull_secret="cloudskin-registry" # Required when deploying to Kubernetes (created during deployment)
)

### DEBUG: show built application config

In [10]:
#build_app.to_dict()

### Create a ScanflowDeployerClient

This client creates the required environment for Scanflow to run the pipelines in a Kubernetes cluster based on the built application. It can:

- Create an environment for the Scanflow application within its own namespace
- Deploy a local Scanflow Tracker
- Run the application as an Argo Workflow

In [None]:
# Initialize the deployer client
if env.get_env("LOCAL_DEPLOY"):
    deployer_client = ScanflowDeployerClient(
        user_type="local",
        deployer="argo",
        k8s_config_file="/root/.kube/config"
    )

### Deploy the ScanflowEnvironment
This creates:
- A namespace for the application
- A Deployment for the local scanflow tracker
- A Deployment for all the agents (in this case there's only the planner)
  - Planner doesn't include right now the `scanflow` module, so it must be copied inside the planner's PVC so the container finds it in the `/scanflow/scanflow/scanflow` path

Go to your Kubernetes cluster and check that both tracker and planner pods are Running without errors in the `scanflow-cloudedge-reactive-migration-dataengineer`.

In [12]:
# Compose a custom ScanflowEnvironment
from scanflow.deployer.env import ScanflowEnvironment
data_eng_env = ScanflowEnvironment()
data_eng_env.namespace=f"scanflow-{build_app.app_name}-{build_app.team_name}"
# TRACKER STORAGE MUST BE ALREADY DEPLOYED IN ITS OWN NAMESPACE (i.e: "scanflow-server")
# - "scanflow" db must already exist in postgresql
# - "scanflow" bucket must already exist in MinIO 
data_eng_env.tracker_config.TRACKER_STORAGE = f"postgresql://postgres:scanflow123@postgresql.scanflow-server/scanflow"
data_eng_env.tracker_config.TRACKER_ARTIFACT = f"s3://scanflow/{data_eng_env.namespace}"
# CLIENT CONFIG: REPLACE WITH CURRENTLY DEPLOYED SERVICES IN "scanflow-server" namespace
#data_eng_env.client_config.SCANFLOW_TRACKER_LOCAL_URI = f"http://scanflow-tracker.{data_eng_env.namespace}"
data_eng_env.client_config.SCANFLOW_TRACKER_LOCAL_URI = f"http://scanflow-server-tracker-service.scanflow-server"
data_eng_env.client_config.SCANFLOW_TRACKER_URI = f"http://scanflow-server-tracker-service.scanflow-server"
data_eng_env.client_config.SCANFLOW_SERVER_URI = f"http://scanflow-server-tracker-service.scanflow-server"
# MINIO MUST BE ALREADY DEPLOYED IN ITS OWN NAMESPACE (i.e: "scanflow-server")
data_eng_env.secret.AWS_ACCESS_KEY_ID = "admin"
data_eng_env.secret.AWS_SECRET_ACCESS_KEY = "scanflow123"
data_eng_env.secret.MLFLOW_S3_ENDPOINT_URL = "http://minio.scanflow-server:9000"
data_eng_env.secret.AWS_ENDPOINT_URL = "http://minio.scanflow-server:9000"
# NEW: configure image pull secret
data_eng_env.image_pull_secret.name = "cloudskin-registry"
data_eng_env.image_pull_secret.registry = env.get_env("DOCKER_REGISTRY")
data_eng_env.image_pull_secret.username = env.get_env("DOCKER_REGISTRY_USERNAME")
data_eng_env.image_pull_secret.password = env.get_env("DOCKER_REGISTRY_PASSWORD")
data_eng_env.image_pull_secret.email = "cloudskin-project@bsc.es"

In [None]:
# Create the application environment
if env.get_env("LOCAL_DEPLOY"):
    await deployer_client.create_environment(
        app=build_app,
        scanflowEnv=data_eng_env
    )

    # TODO: Retrieve the PV name of the PVC and copy `scanflow` module there

## Run Workflow to test
This composes an Argo CronWorkflow for the application and submits it to the Argo Workflows engine:
- Pre-requisites: Argo Workflows must be set to use the `default` service account when no `serviceAccount` is provided in the template

In [None]:
if env.get_env("LOCAL_DEPLOY"):
    await deployer_client.run_app(app=build_app)
    # DEBUG - TODO: if using external config files, automate their copy inside the workflow PVC instead of doing it manually
    # - Copy Promcsv config file so it is available within the container in the /workflow/promql_queries.json path

## Clean-up

### Remove Scanflow application
This will delete the target Scanflow application:
- Remove its Argo Workflow object
- Remove its PVC and related PV (created during Argo Workflow execution)

In [None]:
#await deployer_client.delete_app(app=build_app)

### Remove Scanflow environment

In [None]:
#await deployer_client.clean_environment(app=build_app, scanflow_env=data_eng_env)