In [172]:
# export ZENML_CONFIG_PATH=$PWD/services/zenml

In [173]:
cols = [
    "Marital status",
    "Application mode",
    "Application order",
    "Course",
    "Daytime/evening attendance",
    "Previous qualification",
    "Nacionality",
    "Mother's qualification",
    "Father's qualification",
    "Mother's occupation",
    "Father's occupation",
    "Displaced",
    "Educational special needs",
    "Debtor",
    "Tuition fees up to date",
    "Gender",
    "Scholarship holder",
]

In [204]:
from http import client
import os
from typing import Annotated, Any, Tuple

import numpy as np
import pandas as pd
import torch
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, StandardScaler
from torch.utils.data import DataLoader, SubsetRandomSampler, TensorDataset

from zenml import ArtifactConfig, pipeline, step
from zenml import save_artifact, load_artifact

from zenml.artifacts.external_artifact import ExternalArtifact
# from zenml import ExternalArtifact

from zenml.integrations import SklearnIntegration, PytorchIntegration, PandasIntegration

import pandas as pd

from zenml.client import Client


@step(enable_cache=False)
def training_data_loader(
    train_data_path: str, target_name: str
) -> Tuple[
    Annotated[pd.DataFrame, ArtifactConfig(name="features", tags=["data_preparation"])],
    Annotated[pd.DataFrame, ArtifactConfig(name="target", tags=["data_preparation"])],
]:
    df_train = pd.read_csv(train_data_path, index_col=0)
    target = df_train[target_name].to_frame()
    data = df_train.drop(target_name, axis=1)
    return data, target


@step(enable_cache=False)
def test_data_loader(
    test_data_path: str,
) -> Annotated[
    pd.DataFrame, ArtifactConfig(name="features", tags=["data_preparation"])
]:
    df_test = pd.read_csv(test_data_path, index_col=0)
    return df_test


@step(enable_cache=False)
def fit_encoders_and_scalers(
    data: pd.DataFrame, target: pd.Series, categorial_cols: list
) -> Tuple[
    Annotated[
        OneHotEncoder, ArtifactConfig(name="onehot_encoder", tags=["preprocessers"])
    ],
    Annotated[
        StandardScaler, ArtifactConfig(name="std_scaler", tags=["preprocessers"])
    ],
    Annotated[
        LabelEncoder,
        ArtifactConfig(name="label_encoder", tags=["preprocessers"]),
    ],
]:
    SklearnIntegration.activate()

    one_hot_encoder = OneHotEncoder(
        sparse_output=False, drop="first", handle_unknown="ignore"
    )
    one_hot_encoder.fit(data[categorial_cols])

    scaler = StandardScaler()
    scaler.fit(data.drop(columns=categorial_cols))

    label_encoder = LabelEncoder()
    label_encoder.fit(target)

    save_artifact(categorial_cols, name="categorial_cols")

    return one_hot_encoder, scaler, label_encoder


@step(enable_cache=False)
def preprocess_data(
    data: pd.DataFrame,
    categorial_cols: list,
    one_hot_encoder: OneHotEncoder,
    scaler: StandardScaler,
) -> Annotated[
    pd.DataFrame,
    ArtifactConfig(name="input_features", tags=["data_preparation", "inference"]),
]:
    data_cat = data[categorial_cols]
    data_cat_encoded = one_hot_encoder.transform(data_cat)

    data_num = data.drop(columns=categorial_cols)
    data_num_scaled = scaler.transform(data_num)

    return pd.DataFrame(np.concatenate([data_cat_encoded, data_num_scaled], axis=1))


@step(enable_cache=False)
def preprocess_target(
    target: pd.Series,
    label_encoder: LabelEncoder,
) -> Annotated[
    np.ndarray, ArtifactConfig(name="input_target", tags=["data_preparation"])
]:
    return label_encoder.transform(target)


@step(enable_cache=False)
def split_data(
    processed_train_data: pd.DataFrame, test_size: float = 0.2, seed: int = 42
) -> Tuple[
    Annotated[
        np.ndarray, ArtifactConfig(name="train_indeces", tags=["data_preparation"])
    ],
    Annotated[
        np.ndarray, ArtifactConfig(name="validation_indeces", tags=["data_preparation"])
    ],
]:
    np.random.seed(seed)
    indices = np.random.permutation(len(processed_train_data))
    test_split = int(len(processed_train_data) * test_size)
    train_indices, test_indices = indices[test_split:], indices[:test_split]
    return train_indices, test_indices


@step(enable_cache=False)
def create_train_torch_dataloader(
    processed_train_data: pd.DataFrame,
    target_encoded: np.ndarray,
    train_idx: np.ndarray,
    val_idx: np.ndarray,
    batch_size: int = 128,
    save_torch_loaders: bool = True,
) -> Tuple[
    Annotated[
        DataLoader, ArtifactConfig(name="train_dataloader", tags=["data_preparation"])
    ],
    Annotated[
        DataLoader,
        ArtifactConfig(name="validation_dataloader", tags=["data_preparation"]),
    ],
]:
    X = torch.tensor(processed_train_data.values, dtype=torch.float32)
    y = torch.tensor(target_encoded, dtype=torch.int64)

    processed_dataset = TensorDataset(X, y)
    train_sampler = SubsetRandomSampler(train_idx)
    val_sampler = SubsetRandomSampler(val_idx)

    train_loader = DataLoader(
        processed_dataset, batch_size=batch_size, sampler=train_sampler
    )
    val_loader = DataLoader(
        processed_dataset, batch_size=batch_size, sampler=val_sampler
    )

    if save_torch_loaders:
        torch.save(train_loader, "data/processed/train_loader.pt")
        torch.save(val_loader, "data/processed/val_loader.pt")

    return train_loader, val_loader


@step(enable_cache=False)
def create_test_torch_dataloader(
    test_data: pd.DataFrame,
    batch_size: int = 128,
    save_torch_loaders: bool = True,
) -> Annotated[
    DataLoader,
    ArtifactConfig(name="test_dataloader", tags=["data_preparation"]),
]:
    X_test = torch.tensor(test_data.values, dtype=torch.float32)
    test_dataset = TensorDataset(X_test)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    if save_torch_loaders:
        torch.save(test_loader, "data/processed/test_loader.pt")

    return test_loader


@step(enable_cache=False)
def tensor_converter(
    data: pd.DataFrame,
) -> Annotated[
    torch.Tensor, ArtifactConfig(name="tensor_converter", tags=["data_preparation"])
]:
    return torch.tensor(data.values, dtype=torch.float32)


@pipeline(enable_cache=False)
def inference_pipeline(
    data_name: str = "inference_data"
) -> Annotated[
    torch.Tensor,
    ArtifactConfig(name="features", tags=["inference"], is_deployment_artifact=True),
]:
    PytorchIntegration.activate()
    PandasIntegration.activate()

    client = Client()

    one_hot_encoder = client.get_artifact_version("one_hot_encoder")
    scaler = client.get_artifact_version("scaler")
    categorial_cols = client.get_artifact_version("categorial_cols")

    # Using latest data for inference
    data = client.get_artifact_version(data_name)

    processed_data = preprocess_data(data, categorial_cols, one_hot_encoder, scaler)

    tensor_data = tensor_converter(data=processed_data)

    return tensor_data


@pipeline(enable_cache=False)
def preprocessing_pipeline(
    data: pd.DataFrame, target: pd.DataFrame, categorial_cols: list
) -> Tuple[
    Annotated[pd.DataFrame, ArtifactConfig(name="features", tags=["data_preparation"])],
    Annotated[np.ndarray, ArtifactConfig(name="target", tags=["data_preparation"])],
]:
    one_hot_encoder, scaler, label_encoder = fit_encoders_and_scalers(
        data, target, categorial_cols
    )
    processed_data = preprocess_data(data, categorial_cols, one_hot_encoder, scaler)

    target_encoded = preprocess_target(target, label_encoder)

    return processed_data, target_encoded


@pipeline(enable_cache=False)
def training_pipeline(
    train_data_path: str = "../data/raw/train.csv",
) -> Tuple[
    Annotated[
        DataLoader, ArtifactConfig(name="train_dataloader", tags=["data_preparation"])
    ],
    Annotated[
        DataLoader,
        ArtifactConfig(name="validation_dataloader", tags=["data_preparation"]),
    ],
]:
    data, target = training_data_loader(train_data_path, target_name="Target")

    categorial_cols = Client().get_artifact_version("categorial_cols")

    processed_data, target_encoded = preprocessing_pipeline(
        data, target, categorial_cols
    )

    train_idx, val_idx = split_data(processed_data)

    train_loader, val_loader = create_train_torch_dataloader(
        processed_data, target_encoded, train_idx, val_idx
    )

    return train_loader, val_loader

In [None]:
test_df = test_data_loader("../data/raw/test.csv")
data, target = training_data_loader("../data/raw/train.csv", target_name="Target")

In [None]:
a, b, c = fit_encoders_and_scalers(
    data=data,
    target=target,
    categorial_cols=cols,
)

In [None]:
pr_data = preprocess_data(data, cols, a, b)
pr_target = preprocess_target(target, c)

pr_test = preprocess_data(test_df, cols, a, b)

In [None]:
train_idx, val_idx = split_data(pr_data)

In [None]:
create_train_torch_dataloader(
    processed_train_data=pr_data,
    target_encoded=pr_target,
    train_idx=train_idx,
    val_idx=val_idx,
)

In [None]:
create_test_torch_dataloader(test_data=pr_data)

In [184]:
i = 1

In [205]:
test_df_name = "some_naming"

i += 1
save_artifact(test_df, name=test_df_name)

inference_pipeline(test_df_name)

[1;35mInitiating a new run for the pipeline: [0m[1;36minference_pipeline[1;35m.[0m
[1;35mExecuting a new run.[0m
[1;35mCaching is disabled by default for [0m[1;36minference_pipeline[1;35m.[0m
[1;35mUsing user: [0m[1;36mdefault[1;35m[0m
[1;35mUsing stack: [0m[1;36mmy_local_stack[1;35m[0m
[1;35m  artifact_store: [0m[1;36mmy_artifact_store[1;35m[0m
[1;35m  orchestrator: [0m[1;36mdefault[1;35m[0m
[1;35mDashboard URL for Pipeline Run: [0m[34mhttp://127.0.0.1:8237/runs/319912f5-e80f-487f-bfc7-cb697a82fcfb[1;35m[0m
[1;35mCaching [0m[1;36mdisabled[1;35m explicitly for [0m[1;36mpreprocess_data[1;35m.[0m
[1;35mStep [0m[1;36mpreprocess_data[1;35m has started.[0m
[1;35mStep [0m[1;36mpreprocess_data[1;35m has finished in [0m[1;36m1.236s[1;35m.[0m
[1;35mStep [0m[1;36mpreprocess_data[1;35m completed successfully.[0m
[1;35mCaching [0m[1;36mdisabled[1;35m explicitly for [0m[1;36mtensor_converter[1;35m.[0m
[1;35mStep [0m[1;36mtens

PipelineRunResponse(body=PipelineRunResponseBody(created=datetime.datetime(2024, 9, 22, 19, 7, 46, 425734), updated=datetime.datetime(2024, 9, 22, 19, 7, 48, 612162), user=UserResponse(body=UserResponseBody(created=datetime.datetime(2024, 9, 21, 11, 52, 50, 841353), updated=datetime.datetime(2024, 9, 21, 11, 58, 57, 137570), active=True, activation_token=None, full_name='', email_opted_in=False, is_service_account=False, is_admin=True), metadata=None, resources=None, id=UUID('7a4bc18b-1221-41db-a94c-3adbacc8f1b6'), permission_denied=False, name='default'), status=<ExecutionStatus.COMPLETED: 'completed'>, stack=StackResponse(body=StackResponseBody(created=datetime.datetime(2024, 9, 21, 23, 15, 36, 927808), updated=datetime.datetime(2024, 9, 21, 23, 15, 36, 927810), user=UserResponse(body=UserResponseBody(created=datetime.datetime(2024, 9, 21, 11, 52, 50, 841353), updated=datetime.datetime(2024, 9, 21, 11, 58, 57, 137570), active=True, activation_token=None, full_name='', email_opted_in=

In [None]:
preprocessing_pipeline(data, target, cols)

In [203]:
training_pipeline()

<zenml.steps.entrypoint_function_utils.StepArtifact object at 0x15dbc0dd0>
[1;35mInitiating a new run for the pipeline: [0m[1;36mtraining_pipeline[1;35m.[0m
[1;35mExecuting a new run.[0m
[1;35mCaching is disabled by default for [0m[1;36mtraining_pipeline[1;35m.[0m
[1;35mUsing user: [0m[1;36mdefault[1;35m[0m
[1;35mUsing stack: [0m[1;36mmy_local_stack[1;35m[0m
[1;35m  artifact_store: [0m[1;36mmy_artifact_store[1;35m[0m
[1;35m  orchestrator: [0m[1;36mdefault[1;35m[0m
[1;35mDashboard URL for Pipeline Run: [0m[34mhttp://127.0.0.1:8237/runs/08a7396a-e2da-406d-9481-bd7730997846[1;35m[0m
[1;35mCaching [0m[1;36mdisabled[1;35m explicitly for [0m[1;36mtraining_data_loader[1;35m.[0m
[1;35mStep [0m[1;36mtraining_data_loader[1;35m has started.[0m
[1;35mStep [0m[1;36mtraining_data_loader[1;35m has finished in [0m[1;36m0.915s[1;35m.[0m
[1;35mStep [0m[1;36mtraining_data_loader[1;35m completed successfully.[0m
[1;35mCaching [0m[1;36mdisab

PipelineRunResponse(body=PipelineRunResponseBody(created=datetime.datetime(2024, 9, 22, 19, 5, 17, 549768), updated=datetime.datetime(2024, 9, 22, 19, 5, 24, 359409), user=UserResponse(body=UserResponseBody(created=datetime.datetime(2024, 9, 21, 11, 52, 50, 841353), updated=datetime.datetime(2024, 9, 21, 11, 58, 57, 137570), active=True, activation_token=None, full_name='', email_opted_in=False, is_service_account=False, is_admin=True), metadata=None, resources=None, id=UUID('7a4bc18b-1221-41db-a94c-3adbacc8f1b6'), permission_denied=False, name='default'), status=<ExecutionStatus.COMPLETED: 'completed'>, stack=StackResponse(body=StackResponseBody(created=datetime.datetime(2024, 9, 21, 23, 15, 36, 927808), updated=datetime.datetime(2024, 9, 21, 23, 15, 36, 927810), user=UserResponse(body=UserResponseBody(created=datetime.datetime(2024, 9, 21, 11, 52, 50, 841353), updated=datetime.datetime(2024, 9, 21, 11, 58, 57, 137570), active=True, activation_token=None, full_name='', email_opted_in=

In [None]:
import torch.utils
import torch.utils.data
from typing_extensions import Annotated
import pandas as pd
from sklearn.datasets import load_iris

from zenml import pipeline, step, ArtifactConfig

import torch


# Using Annotated to name our dataset
@step(enable_cache=False)
def training_data_loader() -> (
    Annotated[
        pd.DataFrame,
        # Add `ArtifactConfig` to control more properties of your artifact
        ArtifactConfig(name="iris_dataset", version="raw_20s23__", tags=["asdas"]),
    ]
):
    """Load the iris dataset as pandas dataframe."""
    iris = load_iris(as_frame=True)
    return iris.get("frame")


@pipeline(enable_cache=False)
def feature_engineering_pipeline():
    training_data_loader()


if __name__ == "__main__":
    feature_engineering_pipeline()
