In [3]:
#https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml
#Importing libraries
import azureml.core
from azureml.core import Workspace, Dataset
from azureml.core import Experiment
#from azureml.widgets import RunDetails
from azureml.core import Run
from azureml.core import Experiment, ScriptRunConfig, Environment
from azureml.core.runconfig import DockerConfiguration
import mlflow

import matplotlib.pyplot as plt
#import seaborn as sns
import pandas as pd

## Subir los datos a Azure Blob

In [None]:
from azureml.core import Workspace

airlines_delay = '../airlines_delay/airlines_delay.csv'

ws = Workspace.from_config()

# Default datastore
default_store = ws.get_default_datastore() 

default_store.upload_files([airlines_delay], 
                           target_path = 'airlines', 
                           overwrite = True, 
                           show_progress = True)

print("Upload calls completed.")

## Leer el dataset y dividirlo en 3

In [2]:
from azureml.core import Dataset, Datastore
from azureml.core import Workspace


ws = Workspace.from_config()

dataset = Dataset.get_by_name(ws, name='AirlinesDelay')
airlines_df = dataset.to_pandas_dataframe()


## Setup compute

In [35]:
from azureml.core.compute import ComputeTarget, ComputeInstance
from azureml.core.compute_target import ComputeTargetException

# nombre del cluster
compute_name = "prueba-DS"
ws = Workspace.from_config()
# verificación de exixtencia del cluster
try:
    aml_compute = ComputeTarget(workspace=ws, name=compute_name)
    print('Existe!')
except ComputeTargetException:
    
    compute_config = ComputeInstance.provisioning_configuration(vm_size='Standard_DS11_v2',
                                                           ssh_public_access=False)
    aml_compute = ComputeTarget.create(ws, compute_name, compute_config)

aml_compute.wait_for_completion(show_output=True)


Creating...........................................
Running


## Definir el ambiente de trabajo

In [75]:
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core import Experiment, ScriptRunConfig, Environment
# Create a Python environment for the experiment (from a .yml file)
experiment_env = Environment.from_conda_specification("experiment_env",  './env/environment.yml')

# Register the environment 
experiment_env.register(workspace=ws)
registered_env = Environment.get(ws, 'experiment_env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = aml_compute

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the source_directory for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the source_directory would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the source_directory of the step.

In [None]:
# preprocesamiento
#1. eliminar columna que no importa
#2. cambiar variables categoricas en numericas
#3. Estandarizar variables numericas
#4. split resultados

## Pipeline Steps

In [11]:
from azureml.pipeline.core import PipelineData
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
import mlflow


#Linkg workspace with mlflow
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())

airlines_data = ws.datasets.get('AirlinesDelay')

#https://learn.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py
#clean_airlines_data = PipelineData("clean_airlines_data", datastore=default_store, is_directory=True).as_dataset()
clean_airlines_data = OutputFileDatasetConfig('cleaned_data')

clean_step = PythonScriptStep(
    name="Clean airlines data",
    script_name="scripts/data_clean.py", 
    arguments=["--output_cleanse", clean_airlines_data],
    inputs=[airlines_data.as_named_input('raw_data')],
    outputs=[clean_airlines_data],
    compute_target=aml_compute,
    runconfig=pipeline_run_config,
    source_directory='./',
    allow_reuse=True
)

# train and test splits output
output_split_train = OutputFileDatasetConfig("output_split_train")
output_split_test = OutputFileDatasetConfig("output_split_test")
output_split_validation = OutputFileDatasetConfig("output_split_validation")

train_test_split_step = PythonScriptStep(
    name="split data train test",
    script_name="scripts/train_test_split.py", 
    arguments=["--input_data", clean_airlines_data.as_input(name='Clean_data'),
            "--output_train_data", output_split_train,
            "--output_test_data", output_split_test,
            "--output_val_data", output_split_validation],
    compute_target=aml_compute,
    runconfig=pipeline_run_config,
    source_directory='./',
    allow_reuse=True
)

datastore = ws.get_default_datastore()

transformed_data_train = OutputFileDatasetConfig('transformed_data_train')
transformed_data_val = OutputFileDatasetConfig('transformed_data_val')
scaler = PipelineData("scaler", datastore=datastore)

transform_step = PythonScriptStep(
    name="transform airlines data",
    script_name="scripts/data_transform.py", 
    arguments=['--input_data_train', output_split_train.as_input(name='Train_data'),
            '--input_data_val',output_split_validation.as_input(name= 'Val_data'),
            '--input_data_test', None,
            "--output_transform_train", transformed_data_train,
            "--output_transform_val", transformed_data_val,
            "--output_transform_test", None,
            "--output_scaler", scaler],
    outputs=[scaler],
    compute_target=aml_compute,
    runconfig=pipeline_run_config,
    source_directory='./',
    allow_reuse=True
)

step_output = PipelineData("model", datastore=datastore)

train_step = PythonScriptStep(
    name = 'Training model',
    script_name = 'scripts/train_model.py',
    arguments = ["--input_data_train", transformed_data_train.as_input(name='train_Data'),
                "--input_data_val", transformed_data_val.as_input(name="val_data"),
                "--output_model", step_output],
    #inputs = [output_split_train],
    outputs = [step_output],
    runconfig=pipeline_run_config,
    source_directory='./',
    allow_reuse=True

)

validation_step = PythonScriptStep(
    name = 'Validation model',
    script_name = 'scripts/val_model.py',
    arguments = ["--model_out", step_output.as_input(input_name='model_output'),
                "--input_scaler", scaler.as_input(input_name='scaler')],
    inputs = [step_output, scaler],
    #outputs = [],
    runconfig=pipeline_run_config,
    source_directory='./',
    allow_reuse=True
)

print("Done.")


Done.


In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
#from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [clean_step, train_test_split_step, transform_step, train_step, validation_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")


# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'exp-Airlines')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
#RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

## inference step

In [19]:
from azureml.pipeline.steps import PythonScriptStep
from azureml.core import Dataset, Datastore
from azureml.data import OutputFileDatasetConfig
from azureml.core import Workspace
import mlflow
#Linkg workspace with mlflow
ws = Workspace.from_config()
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())

dataset_test = Dataset.get_by_name(ws, name='data_to_predict')
processed_data = OutputFileDatasetConfig('proccesed_data')

tracking_step = PythonScriptStep(
    name = 'traking',
    script_name="scripts_inference/tracking.py", 
    #arguments=["--output_data",processed_data],
    inputs=[dataset_test.as_named_input('raw_data')],
    compute_target=aml_compute,
    runconfig=pipeline_run_config,
    source_directory='./',
    allow_reuse=True
    )

preprocess_step = PythonScriptStep(
    name = 'preprocessing',
    script_name="scripts_inference/preprocess_data.py", 
    arguments=["--output_data",processed_data],
    inputs=[dataset_test.as_named_input('raw_data')],
    outputs=[processed_data],
    compute_target=aml_compute,
    runconfig=pipeline_run_config,
    source_directory='./',
    allow_reuse=True
)

inference_step = PythonScriptStep(
    name = 'inference',
    script_name = 'scripts_inference/inference.py',
    arguments = ['--input_data', processed_data.as_input(name='processed_data')],
    #inputs = [processed_data],
    compute_target=aml_compute,
    runconfig=pipeline_run_config,
    source_directory='./',
    allow_reuse=True
)


In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
#from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps_inference = [tracking_step, preprocess_step ,inference_step]
pipeline_inference = Pipeline(workspace=ws, steps=pipeline_steps_inference)
print("Pipeline is built.")


# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'exp-Airlines_inference')
pipeline_run = experiment.submit(pipeline_inference, regenerate_outputs=True)
print("Pipeline submitted for execution.")
#RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

In [None]:
!python setup.py

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
#from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps_deployment = [preprocess_step ,inference_step]
pipeline_deployment = Pipeline(workspace=ws, steps=pipeline_steps_deployment)
print("Pipeline is built.")


# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'deployment_pipeline')
deployenent_pipeline_run = experiment.submit(pipeline_deployment, regenerate_outputs=True)
print("Pipeline submitted for execution.")
#RunDetails(pipeline_run).show()
deployenent_pipeline_run.wait_for_completion(show_output=True)

In [149]:
from azureml.pipeline.core import PipelineEndpoint
published_pipeline = deployenent_pipeline_run.publish_pipeline(name="My_Published_Pipeline", 
description="My Published Pipeline Description",
version="1.0")
#endpoint = PipelineEndpoint.publish(workspace=ws, name="my-pipeline-endpoint", description='my very first pipeline', pipeline=published_pipeline)


In [150]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print('Authentication header ready.')

Authentication header ready.


In [151]:
from azureml.pipeline.core import PublishedPipeline
import requests

response = requests.post(published_pipeline.endpoint,
                         headers=auth_header,
                         json={"ExperimentName": "deployment_pipeline"})

In [152]:
run_id = response.json()["Id"]
run_id

'ae78320d-c53a-474f-89d1-95f96faf04f6'

In [None]:
from azureml.pipeline.core.run import PipelineRun


published_pipeline_run = PipelineRun(ws.experiments['deployment_pipeline'], run_id)

# Block until the run completes
published_pipeline_run.wait_for_completion(show_output=True)

In [None]:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

# Define the recurrence
recurrence = ScheduleRecurrence(frequency="Minute", interval=10)

# Define the schedule
experiment_name = "schedule-deploy"
experiment = Experiment(ws, experiment_name)
schedule = Schedule.create(workspace=ws,experiment_name= experiment_name, name="my-schedule", pipeline_id='105374ac-b0da-4810-b295-d841d7f1e27e', description="Runs my pipeline daily", recurrence=recurrence)

# Trigger the pipeline on the scheduled date and time
#schedule.submit(experiment=experiment)





In [156]:
schedule.disable()