In [17]:
import sys
import os
from typing import Tuple
import pandas as pd 
import mlflow as mf

## replace with your specific paths
sys.path.append("/home/aamir07/mlops2_with_dagster/") 
ARTIFACT_PATH = "/home/aamir07/mlops2_with_dagster/artifacts/mlflow_artfacts" 
LOGGER_FOLDER_PATH = "/home/aamir07/mlops2_with_dagster/notebooks"

from mlops2_with_dagster import encoder_pipeline, features_pipeline
from pathlib import Path
from joblib import dump, load
from hamilton import driver, base
from mlops2_with_dagster.utils import get_project_dir, printse
from sklearn.preprocessing import (
    StandardScaler,
    LabelEncoder
)

from ortho import BaseKernel
from ortho.utils import Logger
from ortho.ortho.decorators import task
from ortho.ortho.callbacks import MlFlowCallBack, LoggerCallBack


logger = Logger().logger()

   

        
        

In [18]:
project = 'mlops2_with_dagster'
project_dir = get_project_dir(project)
printse(f'project_dir: {project_dir}')

>>>> project_dir: /home/aamir07/mlops2_with_dagster


In [19]:
train_data : Path = project_dir/"data/train.csv"
test_data : Path = project_dir/"data/test.csv"
encoder_file: Path = project_dir/"warehouse/encoders.joblib"
data: Path = project_dir/"data/train.csv"

In [20]:
# parameters
import dagstermill
df = pd.read_csv(data)
encoders = load(encoder_file)
context = dagstermill.get_context(op_config={'datatype': 'xxxx'})


2023-09-15 00:15:05,034 - INFO - Context impl SQLiteImpl.
2023-09-15 00:15:05,039 - INFO - Will assume non-transactional DDL.
2023-09-15 00:15:05,082 - INFO - Running stamp_revision  -> ec80dd91891a
2023-09-15 00:15:05,085 - DEBUG - new branch insert ec80dd91891a
2023-09-15 00:15:05,151 - INFO - Context impl SQLiteImpl.
2023-09-15 00:15:05,152 - INFO - Will assume non-transactional DDL.
2023-09-15 00:15:05,205 - INFO - Running stamp_revision  -> ec80dd91891a
2023-09-15 00:15:05,207 - DEBUG - new branch insert ec80dd91891a


In [21]:
class Transforms(BaseKernel):
    
    def __init__(self, callbacks,  experiment_name, run_name, load_from_artifact=False):
        super().__init__(callbacks,
                       experiment_name,
                       run_name,
                       load_from_artifact)
        
        self.index_col = 'passengerid'
        self.target_col = "survived"
        self.cat_cols = ["sex", "cabin", "embarked"]
        self.config = {
            'index_column': self.index_col,
            'target_column': self.target_col,
            'categorical_columns': self.cat_cols
        }
    
    @task(build_on_previous_run=True, end_mlflow_run=False)       
    def transform(self, df_train, encoders=None):
        transform_dr = driver.Driver(self.config, encoder_pipeline, features_pipeline)
        ddf = dict(df = df_train, **encoders['encoders'])
        output_nodes = ['final_imputed_features']
        
        output = transform_dr.execute(output_nodes, inputs = ddf)
        
        data = {"transformed_data": output }
        payload = {"artifact_path": ARTIFACT_PATH}
        
        return data, payload
    
    def run(self, *args, **kwargs):
        
        return self.transform(*args, **kwargs)
        
        
    

In [23]:
import dagstermill
transformer = Transforms(experiment_name="Mlflow_with_Dagster",
                          run_name="BadamBhum",
                          callbacks = [LoggerCallBack(log_folder=LOGGER_FOLDER_PATH,
                                                      kernel_names=["transform"]),
                                       MlFlowCallBack(kernel_names=["transform"])])

# mf.end_run() #Uncomment only when you run into an error and an mlflow run is active 
dagstermill.yield_result(transformer.run(df_train=df, encoders=encoders)[0]["transformed_data"], output_name="transformed_data")

2023-09-15 00:15:47,669 - DEBUG - Resetting dropped connection: 127.0.0.1


2023-09-15 00:15:47,813 - DEBUG - http://127.0.0.1:5002 "POST /api/2.0/mlflow/runs/update HTTP/1.1" 200 424
2023-09-15 00:15:47,844 - DEBUG - Resetting dropped connection: 127.0.0.1
2023-09-15 00:15:47,873 - DEBUG - http://127.0.0.1:5002 "GET /api/2.0/mlflow/experiments/get-by-name?experiment_name=Mlflow_with_Dagster HTTP/1.1" 200 241
2023-09-15 00:15:47,882 - DEBUG - Resetting dropped connection: 127.0.0.1
2023-09-15 00:15:47,906 - DEBUG - http://127.0.0.1:5002 "GET /api/2.0/mlflow/runs/get?run_uuid=a05f805113ff42a3b54371547a26d49b&run_id=a05f805113ff42a3b54371547a26d49b HTTP/1.1" 200 1257
2023-09-15 00:15:47,913 - DEBUG - Resetting dropped connection: 127.0.0.1
2023-09-15 00:15:48,172 - DEBUG - http://127.0.0.1:5002 "POST /api/2.0/mlflow/runs/update HTTP/1.1" 200 423
2023-09-15 00:15:48,187 - DEBUG - Resetting dropped connection: 127.0.0.1
2023-09-15 00:15:48,280 - DEBUG - http://127.0.0.1:5002 "GET /api/2.0/mlflow/runs/get?run_uuid=a05f805113ff42a3b54371547a26d49b&run_id=a05f805113f

<class 'pandas.core.frame.DataFrame'>


2023-09-15 00:15:48,687 - DEBUG - Computing sex.
2023-09-15 00:15:48,691 - DEBUG - Computing sexencoder.
2023-09-15 00:15:48,693 - DEBUG - Computing sex_category.
2023-09-15 00:15:48,721 - DEBUG - Computing embarked.
2023-09-15 00:15:48,726 - DEBUG - Computing embarkedencoder.
2023-09-15 00:15:48,732 - DEBUG - Computing embarked_category.
2023-09-15 00:15:48,761 - DEBUG - Computing sibsp.
2023-09-15 00:15:48,764 - DEBUG - Computing parch.
2023-09-15 00:15:48,767 - DEBUG - Computing family.
2023-09-15 00:15:48,775 - DEBUG - Computing engineered_features.
2023-09-15 00:15:48,781 - DEBUG - Succeed in sending telemetry consisting of [b'{"api_key": "phc_mZg8bkn3yvMxqvZKRlMlxjekFU5DFDdcdAsijJ2EH5e", "event": "os_hamilton_run_start", "properties": {"os_type": "posix", "os_version": "Linux-5.15.90.1-microsoft-standard-WSL2-x86_64-with-glibc2.35", "python_version": "3.10.11/CPython", "distinct_id": "f41b7308-274c-4333-ab85-eca6fe911802", "hamilton_version": [1, 28, 0], "telemetry_version": "0.0

Unnamed: 0_level_0,pclass,age,fare,cabin_category,sex_category,embarked_category,family
passengerid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
0,1,0.00,27.14,2,1,2,2
1,3,0.00,13.35,8,1,2,0
2,3,0.33,71.29,8,1,2,3
3,3,19.00,13.04,8,1,2,0
4,3,25.00,7.76,8,1,2,0
...,...,...,...,...,...,...,...
99995,2,62.00,14.86,3,0,0,0
99996,2,66.00,11.15,8,1,2,0
99997,3,37.00,9.95,8,1,2,0
99998,3,51.00,30.92,8,1,2,1
