In [1]:
username = 'dylan'

# Define the magnet configuration
config = {
    "host": "research.prismadic.ai",
    "credentials": None,
    "domain": None,
    "stream_name": username,  # Corrected from "name" to "stream_name"
    "category": f"{username}_default",
    "kv_name": username,
    "session": username,
    "os_name": username,
    "index": {
        "milvus_uri": "127.0.0.1",
        "milvus_port": 19530,
        "milvus_user": "test",
        "milvus_password": "test",
        "dimension": 1024,
        "model": "BAAI/bge-large-en-v1.5",
        "name": "test",
        "options": {
            'metric_type': 'COSINE',
            'index_type': 'HNSW',
            'params': {
                "efConstruction": 40,
                "M": 48
            }
        }
    }
}

In [None]:
from magnet.base import Magnet
from magnet.utils.data_classes import Status
from magnet.utils.globals import _f

# Define a callback function for status updates
def status_callback(status: Status):
    _f(status.type, status.content)

magnet = Magnet(config, status_callback)
await magnet.align()

In [None]:
import os
from magnet.utils.data_classes import AcquireParams, ProcessParams, TrainParams

async def main(magnet):
    # Step 1: Local Acquisition and Upload
    acquire_params_local = AcquireParams(
        resource_id="example",  # Resource ID for identification
        data_source="local",    # Method to acquire, e.g., "local"
        location=os.path.expanduser("~/VSCode/Rivitt/sample.csv"),  # Path to the file
        acquisition_options={}  # Any additional options (can be expanded as needed)
    )

    # Excite the acquire job for local file
    raw_data_job = await magnet.charge.excite("acquire", acquire_params_local)
    await magnet.resonator.worker(role='acquire')  # Process the 'acquire' job
    # Step 2: Object Store Acquisition
    acquire_params_object_store = AcquireParams(
        resource_id=raw_data_job.params.resource_id,
        data_source="object_store",
        location=raw_data_job._id,
        acquisition_options={}
    )

    # Excite the acquire job for object store
    pipeline_acquire_job = await magnet.charge.excite("acquire", acquire_params_object_store)
    await magnet.resonator.worker(role='acquire')  # Process the 'acquire' job

    # Step 3: Process the Downloaded File
    process_params = ProcessParams(
        resource_id=raw_data_job.params.resource_id,  # Use the ID from the object store acquisition job
        location=pipeline_acquire_job.params.location,  # Use the location from the object store acquisition job
        data_source="local",                   # The data is local after acquisition
        model="cmamba",                        # Specify the model to use for processing
        processing_options={
            "chunk_size": 5000,
            "timestamp_column": "timestampRecorded",
            "features_to_match": ["Water_Flow", "flowRate", "FlowRate"],
            "input_length": 500
        }
    )
    # Excite the process job
    pipeline_process_job = await magnet.charge.excite("process", process_params)
    await magnet.resonator.worker(role='process')  # Process the 'process' job

    # Step 4: Training Job
    train_params = TrainParams(
        resource_id=pipeline_process_job._id,  # Use the ID from the processing job
        data_source="stream",                  # Data source for training (e.g., streaming data)
        model="cmamba",                        # Specify the model to use for training
        training_options={
            "batch_size": 18,
            "learning_rate": 0.0005,
            "input_length": 250,
            "num_epochs": 5,
            "d_model": 128,          # Dimension of the model
            "n_layer": 4,            # Number of C-Mamba blocks
            "seq_len": 250,          # Length of input sequence (look-back window)
            "num_channels": 3,       # Number of numerical channels in your data
            "patch_len": 32,         # Length of each patch
            "stride": 8,             # Stride for patching
            "forecast_len": 250,     # Number of future time steps to predict
            "d_state": 16,           # Dimension of SSM state
            "expand": 2,             # Expansion factor for inner dimension
            "dt_rank": 'auto',       # Rank for delta projection, 'auto' sets it to d_model/16
            "d_conv": 4,             # Kernel size for temporal convolution
            "pad_multiple": 25,      # Padding to ensure sequence length is divisible by this
            "conv_bias": True,       # Whether to use bias in convolution
            "bias": False,           # Whether to use bias in linear layers
            "sigma": 0.5,            # Standard deviation for channel mixup
            "reduction_ratio": 4,    # Reduction ratio for channel attention
            "verbose": False
        }
    )

    # Excite the training job
    pipeline_train_job = await magnet.charge.excite("train", train_params)
    await magnet.resonator.worker(role='train')  # Process the 'train' job

# Run the workflow
await main(magnet)