# Data Manipulation using SDK V1

__Make sure to select Kernel: Python 3.8 - AzureML__

In [27]:
import azureml.core
from azureml.core import Workspace, Dataset,Datastore, Experiment

# Referencing Workspace
ws = Workspace.from_config()

## Create a Data Asset

### Download Data file

In [28]:
# Download training dataset

import requests
import os

dataset_created = True
try:
    # checking if the directory demo_folder 
    # exist or not.
    if not os.path.exists("./data"):      
        # if the demo_folder directory is not present 
        # then create it.
        os.makedirs("./data")

    url = "https://raw.githubusercontent.com/MicrosoftDocs/mslearn-introduction-to-machine-learning/main/Data/ml-basics/diabetes.csv"
    r = requests.get(url)
    #retrieving data from the URL using get method
    with open("./data/diabetes.csv", 'wb') as f:    
        f.write(r.content) 

except Exception as e:
    print(e)
    dataset_created = False
    pass  


### Register Data Asset as table

In [29]:
from azureml.core import Dataset
from azureml.data.datapath import DataPath
import os

# Get default Datastore
default_ds = ws.get_default_datastore()

src_dir = "./data"
tgt_dir = "/data_tables"
file_format="csv"
dataset_name = "diabetes_table"
data_file_mask = f"{tgt_dir}/diabetes.csv"
dataset_registered = True
# Register dataset if not exists
if dataset_name not in ws.datasets:
    try:
        Dataset.File.upload_directory(src_dir=src_dir,
                            target=DataPath(default_ds, tgt_dir)
                            )

        #Create a tabular dataset from the path on the datastore (this may take a short while)
        tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds,data_file_mask))

        # Register the tabular dataset    
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name=dataset_name,
                                description=dataset_name,
                                tags = {'format':file_format.upper()},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as e:
        print(e)
        dataset_registered = False
        pass
else:
    print('Dataset already registered.')

Dataset already registered.


### Read a Data Asset

In [30]:
from azureml.core import Workspace, Dataset

dataset = Dataset.get_by_name(ws, name='diabetes_table')
df = dataset.to_pandas_dataframe()
df

Unnamed: 0,PatientID,Pregnancies,PlasmaGlucose,DiastolicBloodPressure,TricepsThickness,SerumInsulin,BMI,DiabetesPedigree,Age,Diabetic
0,1354778,0,171,80,34,23,43.509726,1.213191,21,0
1,1147438,8,92,93,47,36,21.240576,0.158365,23,0
2,1640031,7,115,47,52,35,41.511523,0.079019,23,0
3,1883350,9,103,78,25,304,29.582192,1.282870,43,1
4,1424119,1,85,59,27,35,42.604536,0.549542,22,0
...,...,...,...,...,...,...,...,...,...,...
14995,1490300,10,65,60,46,177,33.512468,0.148327,41,1
14996,1744410,2,73,66,27,168,30.132636,0.862252,38,1
14997,1742742,0,93,89,43,57,18.690683,0.427049,24,0
14998,1099353,0,132,98,18,161,19.791645,0.302257,23,0


### Register Data Asset as Folder

In [31]:
from azureml.core import Workspace, Datastore, Dataset
from azureml.data.datapath import DataPath
ds = Datastore(ws, "ezmylake")

# create input dataset
Dataset.File.upload_directory(src_dir=f"../data/",
                            target=DataPath(ds, f"./data/")
                            )

data = Dataset.File.from_files(path=(ds, "data/*"))
folder_dataset = data.register(workspace=ws, name="test_dataset_files",create_new_version=True)

folder_dataset.to_path()

Validating arguments.
Arguments validated.
Uploading file to ./data/
Uploading an estimated of 4 files
Target already exists. Skipping upload for data/.amlignore
Target already exists. Skipping upload for data/.amlignore.amltmp
Target already exists. Skipping upload for data/diabetes.csv
Target already exists. Skipping upload for data/diabetes_2.csv
Uploaded 0 files
Creating new dataset
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and en

['/.amlignore',
 '/.amlignore.amltmp',
 '/NYCTripSmall.parquet',
 '/daily-bike-share.csv',
 '/daily-bike-share.json',
 '/daily-bike-share_corrupted.csv',
 '/diabetes.csv',
 '/diabetes_2.csv',
 '/edc_eventcal_APR_sample500.json',
 '/eventcal_data_20200101_20211231.json',
 '/form_text1.png',
 '/test.csv',
 '/test.jsonl',
 '/test_json.json',
 '/test_json_array.json',
 '/test_json_array_empty.json',
 '/test_json_array_nested.json',
 '/test_json_empty.json']

### Filter files based on file extension

In [32]:
# Filter only on CSV files
test_res=folder_dataset.filter(folder_dataset.file_metadata('Extension').ends_with('csv'))

# Show selected csv files
print(test_res.to_path())


Method file_metadata: This is an experimental method, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Method filter: This is an experimental method, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
['/daily-bike-share.csv', '/daily-bike-share_corrupted.csv', '/diabetes.csv', '/diabetes_2.csv', '/test.csv']


### Read data files from blob storage

In [33]:
from azureml.core import Dataset

default_ds = Datastore(ws, "ezmylake") #ws.get_default_datastore()
dataset = Dataset.Tabular.from_delimited_files(path = [(default_ds, 'data/daily-bike-share.csv')])

# preview the first 3 rows of the dataset
dataset.take(3).to_pandas_dataframe()

Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assig

Unnamed: 0,day,mnth,year,season,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,rentals
0,1,1,2011,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331
1,2,1,2011,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131
2,3,1,2011,1,0,1,1,1,0.196364,0.189405,0.437273,0.248309,120


__Read multiple files as files__

In [34]:
datastore = Datastore.get(ws, 'ezmylake')

#Create a file dataset from the path on the datastore (this may take a short while)
file_data_set = Dataset.File.from_files(path=(datastore, 'data/*.csv'))
res=file_data_set.download()
print(res)
# Get the files in the dataset
for file_path in file_data_set.to_path():
    print(file_path)

Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assig

In [35]:
import pandas as pd

#Read files
df = pd.read_csv(res[0])
print(len(df))
df

731


Unnamed: 0,day,mnth,year,season,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,rentals
0,1,1,2011,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331
1,2,1,2011,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131
2,3,1,2011,1,0,1,1,1,0.196364,0.189405,0.437273,0.248309,120
3,4,1,2011,1,0,2,1,1,0.200000,0.212122,0.590435,0.160296,108
4,5,1,2011,1,0,3,1,1,0.226957,0.229270,0.436957,0.186900,82
...,...,...,...,...,...,...,...,...,...,...,...,...,...
726,27,12,2012,1,0,4,1,2,0.254167,0.226642,0.652917,0.350133,247
727,28,12,2012,1,0,5,1,2,0.253333,0.255046,0.590000,0.155471,644
728,29,12,2012,1,0,6,0,2,0.253333,0.242400,0.752917,0.124383,159
729,30,12,2012,1,0,0,0,1,0.255833,0.231700,0.483333,0.350754,364


__Read multiple files as tables__

In [36]:
from azureml.core import Dataset

# Get the default datastore
default_ds = ws.get_default_datastore()

#Create a tabular dataset from the path on the datastore (this may take a short while)
tab_data_set = Dataset.Tabular.from_delimited_files(path=(datastore, 'data/daily-bike-share*.csv'))
#tab_data_set.get_all()
# Display the first 20 rows as a Pandas dataframe
df2 = tab_data_set.to_pandas_dataframe()
print(len(df2))
df2

Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assig

Unnamed: 0,day,mnth,year,season,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,rentals
0,1.0,1.0,2011.0,1.0,0.0,6.0,0.0,2.0,0.344167,0.363625,0.805833,0.160446,331.0
1,2.0,1.0,2011.0,1.0,0.0,0.0,0.0,2.0,0.363478,0.353739,0.696087,0.248539,131.0
2,3.0,1.0,2011.0,1.0,0.0,1.0,1.0,1.0,0.196364,0.189405,0.437273,0.248309,120.0
3,4.0,1.0,2011.0,1.0,0.0,2.0,1.0,1.0,0.200000,0.212122,0.590435,0.160296,108.0
4,5.0,1.0,2011.0,1.0,0.0,3.0,1.0,1.0,0.226957,0.229270,0.436957,0.186900,82.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1458,28.0,12.0,2012.0,1.0,0.0,5.0,1.0,2.0,0.253333,0.255046,0.590000,0.155471,644.0
1459,29.0,12.0,2012.0,1.0,0.0,6.0,0.0,2.0,0.253333,0.242400,0.752917,0.124383,159.0
1460,30.0,12.0,2012.0,1.0,0.0,0.0,0.0,1.0,0.255833,0.231700,0.483333,0.350754,364.0
1461,31.0,12.0,2012.0,1.0,0.0,1.0,1.0,2.0,0.215833,0.223487,0.577500,0.154846,439.0


## Create pipeline and pass data assets as input

In [37]:
%%writefile dummy_train.py

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import sys
import os
import argparse
import pandas as pd
from azureml.core import Run
import matplotlib.pyplot as plt

# Get the experiment run context
run = Run.get_context()

print("*********************************************************")
print("Hello Azure Machine Learning!")


parser = argparse.ArgumentParser("process")

 

parser.add_argument("--input_ds", type=str, dest='input_data', help="input scored data")

parser.add_argument("--output_ds", type=str, dest='save_folder', help="output directory for GW data")


args = parser.parse_args()


#print("Argument 1: %s" % args.input_ds)
print("Argument 1 bis: %s" % args.input_data)
print("Argument 2: %s" % args.save_folder)
print(f'list dir:\n{os.listdir(args.save_folder)}')

print("Loading Data...")
#used with input parameter: dataset_raw_data
diabetes = run.input_datasets['input'].to_pandas_dataframe()

#used with input parameter: file_data_set
#diabetes = pd.read_csv(args.input_data)

#used with input parameter: dataset_files
#diabetes = pd.read_csv(os.path.join(args.input_data,"daily-bike-share.csv"))

#-----------------------#

# INSERT your model training code here

#-----------------------#

#logging
run.log("accuracy", 0.95)
run.log_list("accuracies", [0.6, 0.7, 0.87])
run.log_row("Y over X", x=1, y=0.4)
run.log_table("Y over X table", {"x":[1, 2, 3], "y":[0.6, 0.7, 0.89]})

#run.log_image("ROC", path)
plt.figure(figsize=(6, 3))
plt.title("test log image)", fontsize=14)
plt.plot([0.6, 0.7, 0.87], "b-", label="Accuracy", lw=4, alpha=0.5)
plt.plot([0.1, 0.2, 0.5], "r--", label="Loss", lw=4, alpha=0.5)
plt.legend(fontsize=12)
plt.grid(True)
run.log_image("Loss v.s. Accuracy", plot=plt)


print(f"df len:{len(diabetes)}")

diabetes.head(10).to_csv(os.path.join(args.save_folder, 'output2_10.csv'))
diabetes.head(100).to_csv(os.path.join(args.save_folder, 'output2_100.csv'))
diabetes.to_csv(os.path.join(args.save_folder, 'output2_all.csv'))

Overwriting dummy_train.py


In [38]:
%%writefile env.yml

name: simple_environment
dependencies:
  # The python interpreter version.
  # Currently Azure ML only supports 3.5.2 and later.
- python=3.8.3
- scikit-learn
- pandas
- pip
- pip:
  - azureml-defaults
  - azureml-mlflow
  - pandas
  - numpy
  - joblib
  - scikit-learn
  - matplotlib
  - seaborn

Overwriting env.yml


In [39]:
from azureml.core import Environment
from azureml.core import Workspace

# Get or create registered environment
name = "experiment_env"
file_path = r"env.yml"
environment_registered = True
registered_env = None
try:
    #register environement if not exist
    env_names = Environment.list(workspace=ws)
    current_env = name
    if current_env in env_names:
        registered_env = Environment.get(ws, current_env)
    else:    
        env = Environment.from_conda_specification(name=name, file_path=file_path)

        # Registering and reusing environments
        registered_env = env.register(workspace=ws)
except Exception as e:
    print(e)
    environment_registered = False
    pass

print(registered_env)

Environment(Name: experiment_env,
Version: 8)


In [40]:
from azureml.core import Dataset

# Get the default datastore
datastore = Datastore.get(ws, 'ezmylake')

#Create a tabular dataset from the path on the datastore (this may take a short while)
tab_data_set = Dataset.Tabular.from_delimited_files(path=(datastore, 'data/*.csv'))

# Display the first 20 rows as a Pandas dataframe
#tab_data_set.take(20).to_pandas_dataframe().head()

Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration


In [42]:
from azureml.core import ScriptRunConfig
from azureml.data.datapath import DataPath
from azureml.data import OutputFileDatasetConfig

#input_data= mnist_ds.as_named_input('input').as_mount()# the dataset will be mounted on the remote compute 

from azureml.core.compute import ComputeTarget, AmlCompute, ComputeInstance
# Get compute instance to run a test
ci_instance = ComputeInstance(workspace=ws, name="re-compute-inst")
compute_target = ComputeTarget(workspace=ws, name="re-compute-cl")

experiment = Experiment(workspace = ws, name = 'test-ds-pipeline') #regenerate_outputs=True 

datastore = Datastore.get(ws, 'ezmylake')
#Inputs
"""
Once you've created a named input, you can choose its access mode: as_mount() or as_download(). 
  - If your script processes all the files in your dataset and the disk on your compute resource is large enough for the dataset, 
    the download access mode is the better choice. The download access mode avoids the overhead of streaming the data at runtime. 
  - If your script accesses a subset of the dataset or it's too large for your compute, use the mount access mode. 


"""
#Tabular: input_dataset.as_named_input('input')
input_dataset = Dataset.get_by_name(ws, 'diabetes.csv') 
input_dataset = ws.datasets.get("diabetes.csv") 

#Files: input_file_dataset.as_named_input('input').as_mount()
input_file_dataset = ws.datasets.get("test file dataset") #Reference a folder
input_file_dataset = Dataset.File.from_files(path=(datastore, 'data/daily-bike-share.csv')) #Reference a specific file in a folder

#Outputs
"""
Ref: https://learn.microsoft.com/en-us/azure/machine-learning/how-to-move-data-in-out-of-pipelines?view=azureml-api-1
OutputFileDatasetConfig supports writing data to blob storage, fileshare, adlsgen1, or adlsgen2. 
It supports both mount mode and upload mode. 
  - In mount mode, files written to the mounted directory are permanently stored when the file is closed. 
  - In upload mode, files written to the output directory are uploaded at the end of the job. 
    If the job fails or is canceled, the output directory won't be uploaded.
"""
output = OutputFileDatasetConfig(destination=(datastore, 'sample/outputdataset2')) #Output to blob storage

#prepared_output_ds = (OutputFileDatasetConfig(destination=(datastore, 'outputdataset'))
#                      .register_on_complete(name='prepared_fashion_ds')
#                     ) #Output to blob storage and register data asset
#step1_output_data = (OutputFileDatasetConfig(name="processed_data_mount", 
#                        destination=(datastore, "sample/outputdataset2/{run-id}/{output-name}")).as_mount()
#                    )

#Job config
src = ScriptRunConfig(source_directory='./',
                      script='dummy_train.py',
                      arguments=["--input_ds", input_dataset.as_named_input('input'), 
                                 "--output_ds",output],
                      compute_target=ci_instance,
                      environment=registered_env)

# Submit the run configuration for your training run
run = experiment.submit(src)
run.wait_for_completion(show_output=True)

Resolving access token for scope "https://storage.azure.com/.default" using identity of type "MANAGED".
Getting data access token with Assigned Identity (client_id=clientid) and endpoint type based on configuration
RunId: test-ds-pipeline_1695261658_10fd6429
Web View: https://ml.azure.com/runs/test-ds-pipeline_1695261658_10fd6429?wsid=/subscriptions/e0d7a68e-191f-4f51-83ce-d93995cd5c09/resourcegroups/my_ml_tests/workspaces/myworkspace&tid=16b3c013-d300-468d-ac64-7eda0820b6d3

Streaming user_logs/std_log.txt

*********************************************************
Hello Azure Machine Learning!
Argument 1 bis: b06fa5c4-9802-46d5-a96a-56662060b41c
Argument 2: /mnt/azureml/cr/j/ba3696e69e3745c3aed7ac8170972ab6/cap/data-capability/wd/output_43061f8a
list dir:
['output.csv', 'output10.csv', 'output100.csv', 'output2_10.csv', 'output2_100.csv', 'output2_all.csv', 'output3_10.csv', 'output3_100.csv', 'output3_all.csv', 'output_all.csv', 'test-ds-pipeline_1690808620_35f01228', 'test-ds-pipeli

{'runId': 'test-ds-pipeline_1695261658_10fd6429',
 'target': 're-compute-inst',
 'status': 'Completed',
 'startTimeUtc': '2023-09-21T02:01:13.458824Z',
 'endTimeUtc': '2023-09-21T02:01:47.664842Z',
 'services': {},
 'properties': {'_azureml.ComputeTargetType': 'amlcdsi',
  'ContentSnapshotId': '5b4ac3fd-1073-4039-9d33-ea5bc5b0a870',
  'ProcessInfoFile': 'azureml-logs/process_info.json',
  'ProcessStatusFile': 'azureml-logs/process_status.json'},
 'inputDatasets': [{'dataset': {'id': 'b06fa5c4-9802-46d5-a96a-56662060b41c'}, 'consumptionDetails': {'type': 'RunInput', 'inputName': 'input', 'mechanism': 'Direct'}}],
 'outputDatasets': [{'identifier': {'savedId': '2167dfed-a5c7-4445-a98b-030c6f5bff63'},
   'outputType': 'RunOutput',
   'outputDetails': {'outputName': 'output_43061f8a'},
   'dataset': {
     "source": [
       "('ezmylake', 'sample/outputdataset2')"
     ],
     "definition": [
       "GetDatastoreFiles"
     ],
     "registration": {
       "id": "2167dfed-a5c7-4445-a98b-03