In [None]:
from uuid import uuid4
from datetime import datetime, timedelta
import json

from rime_sdk import Client

# Setup Experiment

Please enter values for the cluster and its API token.

In [None]:
API_TOKEN = '' # PASTE API_KEY 
CLUSTER_URL = '' # PASTE DEDICATED DOMAIN OF RIME SERVICE (eg: rime.stable.rbst.io)
AGENT_ID = '' # PASTE AGENT_ID IF USING AN AGENT THAT IS NOT THE DEFAULT


In [None]:
client = Client(CLUSTER_URL, API_TOKEN)

Create a project.

In [None]:
project = client.create_project("Delta Lake on S3", "Delta lake on S3 test", "MODEL_TASK_BINARY_CLASSIFICATION")
model_id = project.register_model("My model " + str(uuid4()))

Register datasets from delta table on S3 and set up the test. This test uses the internal agent on autotest, whose IAM role has permission to access the S3 bucket.

In [None]:
ref_delta_table_name = "s3a://rime-datasets/delta_lake/test_data/data/binary_classification/ref"
ref_preds_delta_table_name = "s3a://rime-datasets/delta_lake/test_data/models/binary_classification/preds/ref"
data_info = {
    "connection_info": {
        "data_file": {
            "path": ref_delta_table_name,
            "data_type": "DATA_TYPE_DELTA_TABLE",
        }
    },
    "data_params": {"label_col": "is_fraud", "timestamp_col": "timestamp"},
}
ref_data_id = project.register_dataset(
    name=f"{str(datetime.now())} reference dataset",
    data_config=data_info
)
prediction_info = {
    "connection_info": {
        "data_file": {
            "path": ref_preds_delta_table_name,
            "data_type": "DATA_TYPE_DELTA_TABLE",
        }
    },
    "pred_params": {"pred_col": "0"},
}
project.register_predictions(
    ref_data_id, model_id, prediction_info
)

eval_data_delta_table_name = "s3a://rime-datasets/delta_lake/test_data/data/binary_classification/eval"
eval_preds_delta_table_name = "s3a://rime-datasets/delta_lake/test_data/models/binary_classification/preds/eval"
data_info = {
    "connection_info": {
        "data_file": {
            "path": eval_data_delta_table_name,
            "data_type": "DATA_TYPE_DELTA_TABLE",
        }
    },
    "data_params": {"label_col": "is_fraud", "timestamp_col": "timestamp"},
}
eval_data_id = project.register_dataset(
    name=f"{str(datetime.now())} evaluation dataset",
    data_config=data_info
)
prediction_info = {
    "connection_info": {
        "data_file": {
            "path": eval_preds_delta_table_name,
            "data_type": "DATA_TYPE_DELTA_TABLE",
        }
    },
    "pred_params": {"pred_col": "0"},
}
project.register_predictions(
    eval_data_id, model_id, prediction_info
)

print(f"Project id: {project.project_id}")
print(f"Reference Data id: {ref_data_id}")
print(f"Evaluation Data id: {eval_data_id}")
print(f"Model id: {model_id}")

Define the config and start the stress test.

In [None]:
st_config = {
  "data_info": {
    "ref_dataset_id": ref_data_id,
    "eval_dataset_id": eval_data_id
  },
  "run_name": "Test Example",
  "model_id": model_id,
    "categories": [
    "TEST_CATEGORY_TYPE_MODEL_PERFORMANCE",
    "TEST_CATEGORY_TYPE_SUBSET_PERFORMANCE"
  ],
  "run_time_info": {
    "explicit_errors": True
  }
}

In [None]:
stress_job = client.start_stress_test(test_run_config=st_config, project_id=project.project_id, agent_id=AGENT_ID)
status_dict = stress_job.get_status(verbose=True, wait_until_finish=True)
if status_dict['status'] == 'JOB_STATUS_FAILED':
  raise Exception('job failed')

In [None]:
client.delete_project(project.project_id, force=True)