# Set-Up Variables

In [None]:
token = ""  # @param {type:"string"}


In [None]:
from relevanceai import Client

from relevanceai.utils import decode_workflow_token

config = decode_workflow_token(token)

token = config["authorizationToken"]
dataset_id = config["dataset_id"]
text_field = config["text_field"]
model_name = config["model_name"]


In [None]:
client = Client(token=token)
dataset = client.Dataset(dataset_id)


In [None]:
from typing import List, Dict, Any
from relevanceai.utils.doc_utils import DocUtils

document = Dict[str, Any]
documents = List[document]


# Any CPU Workflow

In [None]:
class CPUWorkflow(DocUtils):
    """
    An Example Workflow that runs on the CPU.

    CAN be multithreaded when transforming.
    """

    def __init__(self, text_field: str):
        self.text_field = text_field
        self.new_field = f"{text_field}_split"

    def run(self, documents: documents) -> documents:
        """
        Adds a new field to each document.

        This is an array of words split by ` `
        """
        values = self.get_field_across_documents(
            field=self.text_field,
            docs=documents,
        )
        values = [value.split(" ") for value in values]
        self.set_field_across_documents(
            field=self.new_field,
            values=values,
            docs=documents,
        )
        return documents


cpu_workflow = CPUWorkflow(text_field=text_field)

dataset.bulk_apply(    
    bulk_func=cpu_workflow.run,
    update_workers=8,  # sets how many threads run for udpating, can be >1 since update is on cpu
    push_workers=8,
    multithreaded_update=True,  # allows multiple update workers
)


# Light GPU Workflow

In [None]:
from sentence_transformers import SentenceTransformer


class GPUWorkflow(DocUtils):
    """
    An Example Workflow that runs on the GPU.

    CANNOT be multithreaded when transforming.
    """

    def __init__(self, model_name: str, text_field: str):
        self.text_field = text_field
        self.model = SentenceTransformer(model_name)
        self.vector_field = f"{model_name}_vector_"

    def run(self, documents: documents) -> documents:
        """
        Adds a new field to each document.

        This is vector created by `model_name` from `text_field`
        """
        values = self.get_field_across_documents(
            field=self.text_field,
            docs=documents,
        )
        vectors = self.model(values)
        self.set_field_across_documents(
            field=self.vector_field,
            values=vectors,
            docs=documents,
        )
        return documents


gpu_workflow = GPUWorkflow(model_name=model_name, text_field=text_field)

dataset.bulk_apply(
    bulk_func=gpu_workflow.run,
    update_workers=1,  # must be =1 for gpu tasks
    push_workers=8,
    multithreaded_update=False,  # must be set false
)


# Heavy GPU Workflow

For computationally intensive processes, hugging face models initialize certain parameters on the first model pass through. This initialisation time is dependent on the size of first batch size. Setting this to be as small as possible leads to an decrease in initialisation time.

In [None]:
kwargs: dict  # Stuff your workflow needs for initialisation

gpu_workflow = GPUWorkflow(**kwargs)
warmup_batch_size = 1

dataset.bulk_apply(
    bulk_func=gpu_workflow.run,
    update_workers=1,  # must be =1 for gpu tasks
    push_workers=8,
    multithreaded_update=False,  # must be set false
    warmup_batch_size=1,  # this is the batch size for the first model pass through
)


# Upload to Different Dataset/Region

The nature of the upload can be controlled directly from `PullTransformPush`. If `func` is None, the documents are simply reuploade to the new dataset. However, `func` can be any function that transforms on a list of documents. Same rules will apply to `bulk_apply` if `func` uses the GPU or CPU; these can be passed in as `kwargs`.

In [None]:
from relevanceai import Client

from relevanceai.operations_new.ops_run import PullTransformPush


old_token: str
new_token: str
old_dataset_id: str
new_dataset_id: str

old_client = Client(token=old_token)
new_client = Client(token=new_token)

old_dataset = old_client.Dataset(dataset_id)
new_dataset = new_client.Dataset(dataset_id)

ptp = PullTransformPush(
    pull_dataset=old_dataset,
    push_dataset=new_dataset,
    func=None,
)
ptp.run()