In [None]:
import numpy as np
from azureml.core import Workspace, Dataset, Datastore
from azureml.core import ScriptRunConfig, RunConfiguration, Experiment
from azureml.train.estimator import Estimator
from azureml.data.data_reference import DataReference
from azureml.core import Environment
#from azureml.data.dataset_consumption_config import DatasetConsumptionConfig

In [None]:
ws = Workspace.from_config()

In [None]:
def_blob_store = Datastore(ws, "workspaceblobstore")
def_file_store = Datastore(ws, "workspacefilestore")

In [None]:
localenv = Environment(name="localenv")

localenv= Environment.from_conda_specification("localenv", './environment.yml')
#localenv.docker = False
localenv

# Testing clean.py

In [None]:
dataset = Dataset.get_by_name(ws, name='annonces_ds')
experiment = Experiment(ws, "CleanTest")

clean_est = Estimator(source_directory='./pipeline_steps', entry_script='clean.py',
                script_params= {"--input": 'annonces_ds', "--output": 'cleantest'},
                inputs=[dataset.as_named_input('annonces_ds')],
                compute_target='local',
                environment_definition=localenv
               )

run = experiment.submit(clean_est)

run.wait_for_completion(show_output=True)

In [None]:
run.download_file('outputs/cleantest', output_file_path='./cleantmp.csv')
!head ./cleantmp.csv

In [None]:
clean_ref = def_blob_store.upload_files(
    ['./cleantmp.csv'],
    target_path='tmp/clean.csv',
    overwrite=True)
clean_ref.data_reference_name = "clean_data"
clean_ref.mode = 'download' # 'download'
clean_ref

In [None]:
datastore_paths = [(def_blob_store, 'tmp/clean.csv')]
clean_ds = Dataset.Tabular.from_delimited_files(path=datastore_paths)
# # clean_ds = clean_ds.register(workspace=ws,
# #                            name='clean_ds',
# #                            description='annonces data clean')
# clean_ds = DatasetConsumptionConfig('clean_ds', clan_ds, mode='direct', path_on_compute=None)

# Testing split.py

In [None]:
split_est = Estimator(source_directory='./pipeline_steps',
                      entry_script='split.py',
                      script_params= {"--dataset": "clean_data",
                                      "--train": "train_ds",
                                      "--valid": "valid_ds",
                                      "--trainsize": 400,
                                      "--validsize": 100},
                      #inputs=[clean_ref],
                      inputs=[dataset.as_named_input('clean_data')],
                      compute_target='local',
                      environment_definition=localenv
                     )

run = experiment.submit(split_est)

run.wait_for_completion(show_output=True)

In [None]:
run.download_file('outputs/train_ds', output_file_path='./train_dstmp.csv')
!head ./cleantmp.csv
run.download_file('outputs/valid_ds', output_file_path='./valid_dstmp.csv')
!head ./cleantmp.csv

In [None]:
train_ref = def_blob_store.upload_files(
    ['./train_dstmp.csv'],
    target_path='tmp/train.csv',
    overwrite=True)
train_ref.data_reference_name = "train_data"
train_ref.mode = 'download'

valid_ref = def_blob_store.upload_files(
    ['./valid_dstmp.csv'],
    target_path='tmp/valid.csv',
    overwrite=True)
valid_ref.data_reference_name = "valid_data"
valid_ref.mode = 'download'

In [None]:
datastore_paths = [(def_blob_store, 'tmp/train.csv')]
train_ds = Dataset.Tabular.from_delimited_files(path=datastore_paths)

datastore_paths = [(def_blob_store, 'tmp/valid.csv')]
valid_ds = Dataset.Tabular.from_delimited_files(path=datastore_paths)

# Testing train.py

In [None]:
split_est = Estimator(source_directory='./pipeline_steps', entry_script='train.py',
                script_params= {"--dataset": "train_data",
                                "--model": "model.pkl"},
                inputs=[train_ref],
                #inputs=[dataset.as_named_input('clean_data')],
                compute_target='local',
                environment_definition=localenv
               )

run = experiment.submit(split_est)

run.wait_for_completion(show_output=True)

In [None]:
model = run.register_model(model_name='test_model',
                           tags={'test': 'test'},
                           model_path='outputs/model.pkl')

# Testing eval.py

In [None]:
eval_est = Estimator(source_directory='.', entry_script='./pipeline_steps/eval.py',
                script_params= {"--dataset": "valid_data",
                                "--model": "model.pkl"},
                inputs=[valid_ref],
                compute_target='local',
                environment_definition=localenv
               )

run = experiment.submit(eval_est)

run.wait_for_completion(show_output=True)