In [1]:
from azureml.core import Workspace, Dataset
from sklearn.model_selection import train_test_split
import numpy as np

In [2]:
ws = Workspace.from_config()
ds = Dataset.get_by_name(workspace=ws, name="wine-quality")
df = ds.to_pandas_dataframe()
df.shape

(4898, 12)

In [3]:
df["quality"] = np.where(df["quality"]==6, 1, 0)
df_train, df_test = train_test_split(df, stratify= df["quality"], random_state=9)

In [4]:
df_train["quality"].mean(), df_test["quality"].mean()

(0.4486795534985026, 0.4489795918367347)

In [5]:
Dataset.Tabular.register_pandas_dataframe(df_train, 
                                          name="wine-quality-train",
                                         target = (ws.get_default_datastore(), "wine-quality"))
Dataset.Tabular.register_pandas_dataframe(df_test, 
                                          name="wine-quality-test",
                                         target = (ws.get_default_datastore(), "wine-quality"))

Validating arguments.
Arguments validated.
Successfully obtained datastore reference and path.
Uploading file to wine-quality/518c7cb8-79f2-471e-ace0-7ad5cb2e1f19/
Successfully uploaded file to datastore.
Creating and registering a new dataset.
Successfully created and registered a new dataset.
Validating arguments.
Arguments validated.
Successfully obtained datastore reference and path.
Uploading file to wine-quality/d6b73973-b536-43b0-9d2c-66774afd0ffc/
Successfully uploaded file to datastore.
Creating and registering a new dataset.
Successfully created and registered a new dataset.


{
  "source": [
    "('workspaceblobstore', 'wine-quality/d6b73973-b536-43b0-9d2c-66774afd0ffc/')"
  ],
  "definition": [
    "GetDatastoreFiles",
    "ReadParquetFile",
    "DropColumns"
  ],
  "registration": {
    "id": "4f4a6fbb-87ef-4334-9c38-df884709ea94",
    "name": "wine-quality-test",
    "version": 2,
    "workspace": "Workspace.create(name='learn-mlops-ws', subscription_id='fcd1fe46-718c-472d-9814-211fa6d32599', resource_group='learn-mlops-rg')"
  }
}

In [7]:
%%writefile training_pipeline/prep.py
from azureml.core import Run, Workspace, Dataset, Model
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import recall_score, precision_score, f1_score
import numpy as np
import pandas as pd
import argparse
import pickle

run = Run.get_context()
parser = argparse.ArgumentParser()
parser.add_argument("--train-ds-name", dest="train_ds_name", type=str)
parser.add_argument("--test-ds-name", dest="test_ds_name", type=str)
parser.add_argument("--train-out-folder", dest="train_out_folder", type=str)
parser.add_argument("--test-out-folder", dest="test_out_folder", type=str)

args=parser.parse_args()


FEATURES = ['fixed acidity', 'volatile acidity', 
            'citric acid', 'residual sugar',
            'chlorides', 'free sulfur dioxide', 
            'total sulfur dioxide', 'density',
            'pH', 'sulphates', 'alcohol']
LABEL = "quality"
WS = run.experiment.workspace

def read_data():
    train_df = Dataset.get_by_name(workspace=WS, name=args.train_ds_name).to_pandas_dataframe()
    test_df = Dataset.get_by_name(workspace=WS, name=args.test_ds_name).to_pandas_dataframe()
    return train_df, test_df

def save_as_pickle(path, obj):
    with open(path, "wb") as f:
        pickle.dump(obj, f)

def prepare_data():
    df_train, df_test = read_data()
    scaler = MinMaxScaler()
    df_train[FEATURES] = scaler.fit_transform(df_train[FEATURES])
    df_test[FEATURES] = scaler.transform(df_test[FEATURES])
    train_save_path = os.path.join(args.train_out_folder, "wine-quality-train-prepped.csv")
    df_train.to_csv(train_save_path)
    test_save_path = os.path.join(args.test_out_folder, "wine-quality-test-prepped.csv")
    df_test.to_csv(test_save_path)

    os.makedirs("outputs", exist_ok=True)
    model_path = os.path.join("outputs", "scaler.pkl")
    save_as_pickle(path=model_path, obj=scaler)
    model = Model.register(workspace=WS, model_name="wine-quality-scaler", 
                   model_path=model_path,
                   description="lr model for wine quality",
                   tags = {"dataset": "wine_train"}
                  )

prepare_data()
run.complete()

Overwriting training_pipeline/prep.py


In [18]:
%%writefile training_pipeline/train.py
from azureml.core import Run, Workspace, Dataset, Model
from azureml.data import OutputFileDatasetConfig
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, recall_score, precision_score
import os
import argparse
import numpy as np
import pandas as pd
import pickle
parser = argparse.ArgumentParser()
parser.add_argument("--train-data", type=str, dest="train_data")
parser.add_argument("--test-data", type=str, dest="test_data")
args = parser.parse_args()

run = Run.get_context()
WS = run.experiment.workspace

FEATURES = ['fixed acidity', 'volatile acidity', 
            'citric acid', 'residual sugar',
            'chlorides', 'free sulfur dioxide', 
            'total sulfur dioxide', 'density',
            'pH', 'sulphates', 'alcohol']
LABEL = "quality"
def save_as_pickle(path, obj):
    with open(path, "wb") as f:
        pickle.dump(obj, f)

def create_dataframe(path):
    files = [f for f in os.listdir(path) if f.endswith(".csv")]
    files = [os.path.join(path, f) for f in files]
    print("files found : {files}")
    df = pd.DataFrame()
    for f in files:
        df = pd.concat([df, pd.read_csv(f)])
    return df
    
def train():
    df_train = create_dataframe(args.train_data)
    df_test = create_dataframe(args.test_data)

    lr = LogisticRegression()
    
    lr.fit(df_train[FEATURES], df_train[LABEL])
    train_pred = lr.predict(df_train[FEATURES])
    train_pred_class = np.where(train_pred>0.5, 1,0)
    accuracy = accuracy_score(df_train[LABEL], train_pred_class)
    recall = recall_score(df_train[LABEL], train_pred_class)
    precision = precision_score(df_train[LABEL], train_pred_class)
    train_metrics = {"accurracy": accuracy,
                     "recall":recall,
                     "precision": precision}
    
    
    test_pred = lr.predict(df_test[FEATURES])
    test_pred_class = np.where(test_pred>0.5, 1,0)
    accuracy = accuracy_score(df_test[LABEL], test_pred_class)
    recall = recall_score(df_test[LABEL], test_pred_class)
    precision = precision_score(df_test[LABEL], test_pred_class)
    test_metrics = {"accurracy": accuracy,
                     "recall":recall,
                     "precision": precision}
    
    run.log_table("train_metrics", train_metrics)
    run.log_table("test_metrics", test_metrics)
    
    os.makedirs("outputs", exist_ok=True)
    model_path = os.path.join("outputs", "model.pkl")
    save_as_pickle(path=model_path, obj=lr)
    model = Model.register(workspace=WS, model_name="wine-quality-lr", 
                   model_path=model_path,
                   description="lr model for wine quality",
                   tags = {"dataset": "wine_train"}
                  )
train()
run.complete()

Overwriting training_pipeline/train.py


In [19]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
from azureml.core import Workspace, Experiment, RunConfiguration
from azureml.core.environment import CondaDependencies
run_config = RunConfiguration()
run_config.environment.python.conda_dependencies = CondaDependencies.create(python_version="3.8",
                                                                            pip_packages=["numpy", "pandas",
                                                                                          "scikit-learn", "azureml-core",
                                                                                          "azureml-defaults", "azureml-pipeline"])
ws = Workspace.from_config()
train_prepped_data = OutputFileDatasetConfig("train_prepped")
test_prepped_data = OutputFileDatasetConfig("test_prepped")

step1 = PythonScriptStep(name="prepare-data",
                         source_directory="training_pipeline",
                         script_name="prep.py",
                         compute_target="aml-cluster",
                         arguments = ["--train-ds-name", "wine-quality-train",
                                      "--test-ds-name", "wine-quality-test",
                                      "--train-out-folder", train_prepped_data,
                                      "--test-out-folder", test_prepped_data],
                        runconfig=run_config,
                        allow_reuse=False)
step2 = PythonScriptStep(name="train-model",
                        source_directory="training_pipeline", 
                        script_name="train.py",
                        compute_target="aml-cluster",
                        arguments=["--train-data", train_prepped_data.as_input(),
                                   "--test-data", test_prepped_data.as_input()],
                        runconfig=run_config)

pipeline_steps = Pipeline(workspace=ws, steps=[step1, step2])
pipeline_steps.validate()

Step prepare-data is ready to be created [8e30a1c3]
Step train-model is ready to be created [0f24d71a]


[]

In [20]:
experiment = Experiment(name="wine-quality-training", workspace=ws)
run = experiment.submit(pipeline_steps)
run.wait_for_completion(show_output=True)

Created step prepare-data [8e30a1c3][f7acbb57-ea62-4a3f-b009-6e624cd62ffb], (This step will run and generate new outputs)
Created step train-model [0f24d71a][4aa6192b-8777-4e3b-ad86-5ce1ee9ac3e0], (This step will run and generate new outputs)
Submitted PipelineRun 6cf3e98e-e1bd-4e68-990c-40def5719d16
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/6cf3e98e-e1bd-4e68-990c-40def5719d16?wsid=/subscriptions/fcd1fe46-718c-472d-9814-211fa6d32599/resourcegroups/learn-mlops-rg/workspaces/learn-mlops-ws&tid=219401eb-53de-4dfb-8561-29e630707cb7
PipelineRunId: 6cf3e98e-e1bd-4e68-990c-40def5719d16
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/6cf3e98e-e1bd-4e68-990c-40def5719d16?wsid=/subscriptions/fcd1fe46-718c-472d-9814-211fa6d32599/resourcegroups/learn-mlops-rg/workspaces/learn-mlops-ws&tid=219401eb-53de-4dfb-8561-29e630707cb7
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: 391e9f3e-8f93-4ce3-ac30-8b04478d580e
Link to Azure Machine Lea

ExperimentExecutionException: ExperimentExecutionException:
	Message: The output streaming for the run interrupted.
But the run is still executing on the compute target. 
Details for canceling the run can be found here: https://aka.ms/aml-docs-cancel-run
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "The output streaming for the run interrupted.\nBut the run is still executing on the compute target. \nDetails for canceling the run can be found here: https://aka.ms/aml-docs-cancel-run"
    }
}

# Publish pipeline

In [35]:
%%writefile training_pipeline/scoring.py

import json
import pickle
import numpy as np
import os

def init():
    global model, scaler
    model_path = os.path.join(os.getenv("AZUREML_MODEL_DIR"), "wine-quality-lr/1/model.pkl")
    model = pickle.load(model_path)
    
    scaler_path = os.path.join(os.getenv("AZUREML_MODEL_DIR"), "wine-quality-scaler/15/scaler.pkl")
    scaler = pickle.load(scaler_path)
    
    
def run(raw_data):
    data = np.array(json.loads(raw_data)["data"])
    prepped = scaler.transform(data)
    predictions = model.predict(prepped)
    return predictions.tolist()

    

Overwriting training_pipeline/scoring.py


In [36]:
from azureml.core import Environment
from azureml.core.environment import CondaDependencies
service_env = Environment(name="service-env")
service_env.python.conda_dependencies = CondaDependencies.create(python_version="3.8",
                                                                            pip_packages=["numpy", "pandas",
                                                                                          "scikit-learn", "azureml-core",
                                                                                          "azureml-defaults", "azureml-pipeline"])

In [37]:
from azureml.core.model import InferenceConfig

model_inference_config = InferenceConfig(source_directory="training_pipeline",
                                        entry_script="scoring.py",
                                        environment=service_env)

In [38]:
from azureml.core.webservice import AciWebservice
from azureml.core.model import Model

aci_config = AciWebservice.deploy_configuration(cpu_cores = 1, memory_gb = 1)
model1 = Model(ws, "wine-quality-lr")
model2 = Model(ws, "wine-quality-scaler")
service = Model.deploy(ws, "wine-quality-aci", [model1, model2], model_inference_config, aci_config)
service.wait_for_deployment(show_output = True)
print(service.state)

Tips: You can try get_logs(): https://aka.ms/debugimage#dockerlog or local deployment: https://aka.ms/debugimage#debug-locally to debug if deployment takes longer than 10 minutes.
Running
2022-01-25 12:44:30+00:00 Creating Container Registry if not exists.
2022-01-25 12:44:32+00:00 Use the existing image.
2022-01-25 12:44:34+00:00 Submitting deployment to compute..
2022-01-25 12:44:44+00:00 Checking the status of deployment wine-quality-aci..
2022-01-25 12:46:04+00:00 Checking the status of inference endpoint wine-quality-aci.
Failed


Service deployment polling reached non-successful terminal state, current service state: Failed
Operation ID: 3c18d40a-7273-4183-ab1e-f3c083dd4f0d
More information can be found using '.get_logs()'
Error:
{
  "code": "AciDeploymentFailed",
  "statusCode": 400,
  "message": "Aci Deployment failed with exception: Error in entry script, TypeError: file must have 'read' and 'readline' attributes, please run print(service.get_logs()) to get details.",
  "details": [
    {
      "code": "CrashLoopBackOff",
      "message": "Error in entry script, TypeError: file must have 'read' and 'readline' attributes, please run print(service.get_logs()) to get details."
    }
  ]
}



WebserviceException: WebserviceException:
	Message: Service deployment polling reached non-successful terminal state, current service state: Failed
Operation ID: 3c18d40a-7273-4183-ab1e-f3c083dd4f0d
More information can be found using '.get_logs()'
Error:
{
  "code": "AciDeploymentFailed",
  "statusCode": 400,
  "message": "Aci Deployment failed with exception: Error in entry script, TypeError: file must have 'read' and 'readline' attributes, please run print(service.get_logs()) to get details.",
  "details": [
    {
      "code": "CrashLoopBackOff",
      "message": "Error in entry script, TypeError: file must have 'read' and 'readline' attributes, please run print(service.get_logs()) to get details."
    }
  ]
}
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Service deployment polling reached non-successful terminal state, current service state: Failed\nOperation ID: 3c18d40a-7273-4183-ab1e-f3c083dd4f0d\nMore information can be found using '.get_logs()'\nError:\n{\n  \"code\": \"AciDeploymentFailed\",\n  \"statusCode\": 400,\n  \"message\": \"Aci Deployment failed with exception: Error in entry script, TypeError: file must have 'read' and 'readline' attributes, please run print(service.get_logs()) to get details.\",\n  \"details\": [\n    {\n      \"code\": \"CrashLoopBackOff\",\n      \"message\": \"Error in entry script, TypeError: file must have 'read' and 'readline' attributes, please run print(service.get_logs()) to get details.\"\n    }\n  ]\n}"
    }
}