## Reading from and Writing to Azure SQL Database using Azure Machine Learning

The purpose of this notebook is to demonstrate how to read from and write to Azure Machine Learning using the pipeline approach.  This assumes the following:
* You have configured a Datastore named `expense_reports`.
* You want to use the `workspaceblobstore` Azure Blob Storage Datastore.  You can, of course, create your own and substitute it.
* You have configured an Azure Data Factory.

First up, let's load some important Python libraries and obtain the workspace from our Azure ML config.

In [None]:
import azureml.core
from azureml.core import Workspace, Datastore, Dataset
from azureml.data.datapath import DataPath
from azureml.data import DataType
from azureml.pipeline.steps import DataTransferStep
import pandas as pd

ws = Workspace.from_config()

Retrieve the expenses and Blob Storage datastores from Azure Machine Learning.

In [None]:
expenses_datastore = Datastore.get(ws, datastore_name="expense_reports")
blob_datastore = Datastore.get(ws, datastore_name="workspaceblobstore")

Query our data.  This will hit the `expenses_datastore` Datastore and will pull back data prior to 2017.  Note that we need to specify the data types for each input in `from_sql_query`, but the resulting output of this is a Pandas DataFrame, making it easy to work with.

In [None]:
query = DataPath(expenses_datastore, """
SELECT
    er.EmployeeID,
    CONCAT(e.FirstName, ' ', e.LastName) AS EmployeeName,
    ec.ExpenseCategoryID,
    ec.ExpenseCategory,
    er.ExpenseDate,
    YEAR(er.ExpenseDate) AS ExpenseYear,
    -- Python requires FLOAT values--it does not support DECIMAL
    CAST(er.Amount AS FLOAT) AS Amount
FROM dbo.ExpenseReport er
    INNER JOIN dbo.ExpenseCategory ec
        ON er.ExpenseCategoryID = ec.ExpenseCategoryID
    INNER JOIN dbo.Employee e
        ON e.EmployeeID = er.EmployeeID
WHERE
    er.ExpenseDate < '2017-01-01'""")

data_types = {
    'EmployeeID': DataType.to_long(),
    'EmployeeName': DataType.to_string(),
    'ExpenseCategoryID': DataType.to_long(),
    'ExpenseCategory': DataType.to_string(),
    'ExpenseDate': DataType.to_datetime('%Y-%m-%d'),
    'ExpenseYear': DataType.to_long(),
    'Amount': DataType.to_float()
}

expense_reports = Dataset.Tabular.from_sql_query(query, set_column_types=data_types).to_pandas_dataframe()

This gives you a feel for what the data looks like.  It's fairly straightforward, tabular data.  Almost like I generated it for a demo or something.

In [None]:
expense_reports.head()

Load the `RandomForestRegressor` from scikit-learn and fit for `Amount` given `ExpenseCategoryID` and `ExpenseYear`.  It's a simple model but it works pretty well.

In [None]:
from sklearn.ensemble import RandomForestRegressor
reg = RandomForestRegressor() 
model = reg.fit(expense_reports[["ExpenseCategoryID", "ExpenseYear"]], expense_reports[["Amount"]].values.ravel())

Here's the data that we'll use for predictions.  This is expense reports from 2017 forward.

In [None]:
query = DataPath(expenses_datastore, """
SELECT
    er.EmployeeID,
    CONCAT(e.FirstName, ' ', e.LastName) AS EmployeeName,
    ec.ExpenseCategoryID,
    ec.ExpenseCategory,
    er.ExpenseDate,
    YEAR(er.ExpenseDate) AS ExpenseYear,
    -- Python requires FLOAT values--it does not support DECIMAL
    CAST(er.Amount AS FLOAT) AS Amount
FROM dbo.ExpenseReport er
    INNER JOIN dbo.ExpenseCategory ec
        ON er.ExpenseCategoryID = ec.ExpenseCategoryID
    INNER JOIN dbo.Employee e
        ON e.EmployeeID = er.EmployeeID
WHERE
    er.ExpenseDate >= '2017-01-01'""")

data_types = {
    'EmployeeID': DataType.to_long(),
    'EmployeeName': DataType.to_string(),
    'ExpenseCategoryID': DataType.to_long(),
    'ExpenseCategory': DataType.to_string(),
    'ExpenseDate': DataType.to_datetime('%Y-%m-%d'),
    'ExpenseYear': DataType.to_long(),
    'Amount': DataType.to_float()
}

expense_reports_to_predict = Dataset.Tabular.from_sql_query(query, set_column_types=data_types).to_pandas_dataframe()

Now that we have our data, generate predictions.  Then, concatenate the `PredictedAmount` column onto the expense reports DataFrame so that we can see the inputs as well as the prediction.

In [None]:
pred = pd.DataFrame({"PredictedAmount" : model.predict(expense_reports_to_predict[["ExpenseCategoryID", "ExpenseYear"]]) })
output_data_set = pd.concat([expense_reports_to_predict, pred], axis=1)

Here's a brief view of the resulting outputs.

In [None]:
output_data_set.head()

Now I want to write the results to Azure SQL Database.  The thing is, though, that there's no direct way to perform that write.  The best available option (as of March of 2021) is to write the data to Azure Blob Storage and then transfer that data to Azure SQL Database.

Well, to write the data to Azure Blob Storage, I first need to write it locally and then transfer.  I'll call the output `predictions.csv`.

In [None]:
import os
if not os.path.exists('data'):
    os.mkdir('data')
local_path = 'data/predictions.csv'
output_data_set.to_csv(local_path, index=False)

Now that the data is saved locally, we can upload it to Azure Blob Storage.  This is where I first use the `blob_datastore` Datastore.  I'm going to write it to `ExpenseReportPrediction/predictions.csv`.  If you go digging into the storage account Azure Machine Learning uses, you can find this folder.

In [None]:
blob_datastore.upload(src_dir='data', target_path='ExpenseReportPrediction')

Now we want to load a few more objects in order to build out a pipeline.  Technically, we could have made training and inference steps in this pipeline as well, and that's what I'd do on a production project.

In [None]:
from azureml.core import Workspace, Experiment
from azureml.pipeline.core import Pipeline
from azureml.data.data_reference import DataReference
from azureml.pipeline.steps import DataTransferStep
from azureml.data.sql_data_reference import SqlDataReference
from azureml.core.compute import ComputeTarget, DataFactoryCompute 

We need to bring in Azure Data Factory.  To do that, we specify the location of the data factory, both its resource group (`rg`) and the Data Factory name (`adf`).

In [None]:
rg = '<Resource Group>'
adf='<Data Factory>'
adfcompute = 'amlcompute-adf'

adfconfig = DataFactoryCompute.attach_configuration(resource_group=rg, factory_name=adf, resource_id=None)
adf_compute = ComputeTarget.attach(workspace=ws, name=adfcompute, attach_configuration=adfconfig)
adf_compute.wait_for_completion()

Create a `DataReference()` reference to bring in Azure Blob Storage.  We'll read all of the files from `ExpenseReportPrediction/`.

In [None]:
prediction_blob_ref = DataReference(
    datastore=blob_datastore,
    data_reference_name="prediction_blob_ref",
    path_on_datastore="ExpenseReportPrediction/",
    mode="mount",
    path_on_compute=None,
    overwrite=False
)

Bring in Azure SQL Database as a `SqlDataReference`.  We will write out to the `ExpenseReportPrediction` table in SQL Server.  Note that this table must already exist prior to executing the pipeline!

In [None]:
prediction_sql_ref = SqlDataReference(
    datastore=expenses_datastore,
    data_reference_name="prediction_sql_ref",
    sql_table="ExpenseReportPrediction",
    sql_query=None,
    sql_stored_procedure=None,
    sql_stored_procedure_params=None
)

This `DataTransferStep` migrates our data from Azure Blob Storage into Azure SQL Database.  We set `allow_reuse=False` here because that allows us to re-run the operation with new data (but the same code) and actually get results.  if `allow_reuse=True`, re-running this will return a completed status but not do anything new after the first time it runs.

In [None]:
transfer_blob_to_sql = DataTransferStep(
    name="transfer_blob_to_sql",
    source_data_reference=prediction_blob_ref,
    destination_data_reference=prediction_sql_ref,
    compute_target=adf_compute,
    allow_reuse=False,
    destination_reference_type=None
)

And here's the `Pipeline` which will do the work.  It has one step:  `transfer_blob_to_sql`.

In [None]:
datatransfer_pipeline = Pipeline(workspace=ws, 
    steps=[transfer_blob_to_sql], 
    description='Transfer blob data to sql')

We execute the pipeline in the context of an `Experiment`.  It takes a little while to execute and gives us a summary of what happened.  The net result here is that we inserted the data we wanted into `dbo.ExpenseReportPrediction`.

In [None]:
exp = Experiment(workspace = ws, name="DataTransfer_BlobtoSQL")

exp_pipelinerun = exp.submit(datatransfer_pipeline)

exp_pipelinerun.wait_for_completion()