In [1]:
from dflow import Step, Workflow, upload_artifact
from dflow.python import OP, OPIO, Artifact, OPIOSign, PythonOPTemplate, upload_packages
from pathlib import Path
from subprocess import call

import deepchem as dc
import xgboost as xgb
from deepchem.data import NumpyDataset, DiskDataset
import sys, os
import torch

Instructions for updating:
experimental_relax_shapes is deprecated, use reduce_retracing instead


In [2]:
tr_dataset = upload_artifact("trainBBBP.csv")
te_dataset = upload_artifact("testBBBP.csv")
val_dataset = upload_artifact("validBBBP.csv")

In [3]:
# S0
class SetupAndLoad(OP):
    def __init__(self):
        pass

    @classmethod
    def get_input_sign(cls):
        return OPIOSign({
            "train_file": Artifact(Path),  # Expecting a file path as an artifact
            "test_file": Artifact(Path),
            "valid_file": Artifact(Path),
        })

    @classmethod
    def get_output_sign(cls):
        return OPIOSign({
            "train_dataset": DiskDataset,
            "test_dataset": DiskDataset,
            "valid_dataset": DiskDataset,
        })

    @OP.exec_sign_check
    def execute(self, op_in: OPIO) -> OPIO:
        # Extract file paths from the artifacts
        train_file_path = str(op_in["train_file"])
        test_file_path = str(op_in["test_file"])
        valid_file_path = str(op_in["valid_file"])

        smile_str = 'SMILES'
        loader = dc.data.CSVLoader(
            tasks=['targets'],
            feature_field=smile_str,
            featurizer=dc.feat.CircularFingerprint(size=256))

        # Load and process datasets
        train_dataset = loader.create_dataset(train_file_path)
        test_dataset = loader.create_dataset(test_file_path)
        valid_dataset = loader.create_dataset(valid_file_path)
        print(train_dataset,str(train_dataset))

        # Return the paths to the saved datasets
        op_out = OPIO({
            "train_dataset": train_dataset,
            "test_dataset": test_dataset,
            "valid_dataset": valid_dataset,
        })
        return op_out


In [4]:
# S1
class TransformData(OP):
    def __init__(self):
        pass

    @classmethod
    def get_input_sign(cls):
        return OPIOSign({
            "train_dataset": DiskDataset,
            "test_dataset": DiskDataset,
            "valid_dataset": DiskDataset,
        })

    @classmethod
    def get_output_sign(cls):
        return OPIOSign({
            "transformed_train": Artifact(Path),
            "transformed_test": Artifact(Path),
            "transformed_valid": Artifact(Path),
        })

    @OP.exec_sign_check
    def execute(self, op_in: OPIO) -> OPIO:
        print("hello")
        train_dataset = op_in["train_dataset"]
        test_dataset = op_in["test_dataset"]
        valid_dataset = op_in["valid_dataset"]
        print("train_dataset", train_dataset)

        transformers = [
            dc.trans.NormalizationTransformer(transform_y=True, dataset=train_dataset)
        ]

        for transformer in transformers:
            transformed_tr = transformer.transform(train_dataset)
            transformed_val = transformer.transform(test_dataset)
            transformed_te = transformer.transform(valid_dataset)


        op_out = OPIO({
            "transformed_train": transformed_tr,
            "transformed_test": transformed_te,
            "transformed_valid": transformed_val,
        })
        return op_out


In [5]:
# S2
class TrainModel(OP):
    def __init__(self):
        pass

    @classmethod
    def get_input_sign(cls):
        return OPIOSign({
            "transformed_train": Artifact(Path),
        })

    @classmethod
    def get_output_sign(cls):
        return OPIOSign({
            "model": Artifact(Path),
        })

    @OP.exec_sign_check
    def execute(self, op_in: OPIO) -> OPIO:
        transformed_train = op_in["transformed_train"]

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        xgb_model = xgb.XGBClassifier(n_estimators=100, learning_rate=0.1)
        if device == torch.device("cuda"):
            xgb_model = xgb.XGBClassifier(
                n_estimators=100, learning_rate=0.05, 
                tree_method='gpu_hist'
            )

        dc_model = dc.models.GBDTModel(xgb_model, mode="classification")
        dc_model.fit(transformed_train)

        op_out = OPIO({
            "model": dc_model,
        })
        return op_out

In [6]:
# S3
class EvaluateModel(OP):
    def __init__(self):
        pass

    @classmethod
    def get_input_sign(cls):
        return OPIOSign({
            "model": Artifact(Path),
            "transformed_test": Artifact(Path),
        })

    @classmethod
    def get_output_sign(cls):
        return OPIOSign({
            "evaluation_metrics": float,  # AUC score as a float
        })

    @OP.exec_sign_check
    def execute(self, op_in: OPIO) -> OPIO:
        model = op_in["model"]
        test_dataset = op_in["transformed_test"]

        # Assuming the model is a DeepChem model and test_dataset is a DeepChem dataset
        metric = dc.metrics.Metric(dc.metrics.roc_auc_score)
        evaluation_metrics = model.evaluate(test_dataset, [metric])

        # Extract the AUC score from the evaluation metrics
        auc_score = evaluation_metrics["roc_auc_score"]

        op_out = OPIO({
            "evaluation_metrics": auc_score,
        })
        return op_out


In [7]:
package_list = [
    # "/Users/star/anaconda3/lib/python3.11/site-packages/deepchem", # deepchem
    # "/Users/star/anaconda3/lib/python3.11/site-packages/xgboost", # xgboost
    # "/Users/star/anaconda3/lib/python3.11/site-packages/pandas", # pandas
    # "/Users/star/anaconda3/lib/python3.11/site-packages/pytz", # pytz
    # "/Users/star/anaconda3/lib/python3.11/site-packages/dateutil", # dateutil
    "/Users/star/anaconda3/lib/python3.11/site-packages/rdkit", # rdkit
]

In [8]:
# Step 0: Setup and Load Data
step0 = Step(
    name="setup-and-load",
    template=PythonOPTemplate(
        SetupAndLoad,
        image="starliu714/python:0.3",
        # python_packages=package_list,
        # command=["pip", "install", "numpy"],  # You can add commands here
        ),
    parameters={
        # "train_dataset": tr_dataset,
        # "test_dataset": te_dataset,
        # "valid_dataset": val_dataset,
    },
    artifacts={
        "train_file": tr_dataset,
        "test_file": te_dataset,
        "valid_file": val_dataset,
    }
)

# Step 1: Transform Data
# It should take the output of step0 as input
# Step 1: Transform Data
step1 = Step(
    name="transform-data",
    template=PythonOPTemplate(
        TransformData,
        image="starliu714/python:0.3",
        # python_packages=package_list,
    ),
    parameters={
        "train_dataset": step0.outputs.parameters["train_dataset"],
        "test_dataset": step0.outputs.parameters["test_dataset"],
        "valid_dataset": step0.outputs.parameters["valid_dataset"],
    },
    artifacts={
    #     "train_dataset": step0.outputs.artifacts["train_dataset"],
    #     "test_dataset": step0.outputs.artifacts["test_dataset"],
    #     "valid_dataset": step0.outputs.artifacts["valid_dataset"],
    }
)

# Step 2: Train Model
# It should take the output of step1 as input
step2 = Step(
    name="train-model",
    template=PythonOPTemplate(
        TrainModel,
        image="starliu714/python:0.3",
        # python_packages=package_list,
    ),
    parameters={},
    artifacts={
        "transformed_train": step1.outputs.artifacts["transformed_train"]}
)

# Step 3: Evaluate Model
# It should take the output of step2 and the test dataset from step1 as inputs
step3 = Step(
    name="evaluate-model",
    template=PythonOPTemplate(
        EvaluateModel,
        image="starliu714/python:0.3",
        python_packages=package_list,
    ),
    parameters={},
    artifacts={
        "model": step2.outputs.artifacts["model"],
        "transformed_test": step1.outputs.artifacts["transformed_test"]
    }
)

In [9]:
# Create and submit workflow
wf = Workflow(name="xgboost")
wf.add(step0)
wf.add(step1)
wf.add(step2)
wf.add(step3)
wf.submit()

Workflow has been submitted (ID: xgboost-n7vjj, UID: fef7d557-937e-4ce1-979d-dd93c8a03879)
Workflow link: https://127.0.0.1:2746/workflows/argo/xgboost-n7vjj


{'metadata': {'name': 'xgboost-n7vjj', 'generateName': 'xgboost-', 'namespace': 'argo', 'uid': 'fef7d557-937e-4ce1-979d-dd93c8a03879', 'resourceVersion': '230824', 'generation': 1, 'creationTimestamp': '2024-01-14T06:58:08Z', 'labels': {'workflows.argoproj.io/creator': 'system-serviceaccount-argo-argo-server'}, 'managedFields': [{'manager': 'argo', 'operation': 'Update', 'apiVersion': 'argoproj.io/v1alpha1', 'time': '2024-01-14T06:58:08Z', 'fieldsType': 'FieldsV1', 'fieldsV1': {'f:metadata': {'f:generateName': {}, 'f:labels': {'.': {}, 'f:workflows.argoproj.io/creator': {}}}, 'f:spec': {}, 'f:status': {}}}]}, 'spec': {'templates': [{'name': 'xgboost-steps', 'inputs': {}, 'outputs': {}, 'metadata': {}, 'steps': [[{'name': 'setup-and-load', 'template': 'setupandload-jswr2', 'arguments': {'artifacts': [{'name': 'train_file', 'path': '/tmp/inputs/artifacts/train_file', 's3': {'key': 'upload/b27be36a-23d5-4016-bd27-9562b1f1322a/tmpk41alq57.tgz'}}, {'name': 'test_file', 'path': '/tmp/inputs/