### Accesing data lake data from container

In [0]:
storage_account_name = "storage_account_name"
storage_account_key = "storage_account_key"
lake_container = "lake_container"

file_location = "abfss://"+lake_container+"@"+storage_account_name+".dfs.core.windows.net/input/housing.csv"

spark.conf.set(
"fs.azure.account.key."+storage_account_name+".dfs.core.windows.net",
storage_account_key
)

In [0]:
#read in the data to dataframe df
housing = spark.read.format("csv").option("inferSchema", "true").option("header",
"true").option("delimiter",",").load(file_location)
 

In [0]:
print(housing.columns)

In [0]:
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin

rooms_ix, bedrooms_ix, population_ix, households_ix = 3, 4, 5, 6

class CombinedAttributesAdder(BaseEstimator, TransformerMixin):
    def __init__(self, add_bedrooms_per_room=True): 
        self.add_bedrooms_per_room = add_bedrooms_per_room
    def fit(self, X, y=None):
        return self  
    def transform(self, X):
        rooms_per_household = X[:, rooms_ix] / X[:, households_ix]
        population_per_household = X[:, population_ix] / X[:, households_ix]
        if self.add_bedrooms_per_room:
            bedrooms_per_room = X[:, bedrooms_ix] / X[:, rooms_ix]
            return np.c_[X, rooms_per_household, population_per_household,
                         bedrooms_per_room]
        else:
            return np.c_[X, rooms_per_household, population_per_household]

In [0]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OrdinalEncoder
from sklearn.preprocessing import OneHotEncoder

num_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy="median")),
        ('attribs_adder', CombinedAttributesAdder()),
        ('std_scaler', StandardScaler()),
    ])

In [0]:
cat_attribs = ["ocean_proximity"]
num_attribs = [i for i in housing.columns if i not in cat_attribs]
target_col = "median_house_value"

In [0]:
from sklearn.compose import ColumnTransformer

full_pipeline = ColumnTransformer([
        ("numerical", num_pipeline, num_attribs),
        ("categorical", OneHotEncoder(), cat_attribs),
    ])

In [0]:
from sklearn.model_selection import train_test_split

housing_data = full_pipeline.fit_transform(housing.toPandas())

X_train, X_test, y_train, y_test = train_test_split(housing_data, 
                                                    housing.select(target_col).toPandas(), 
                                                    random_state=0, test_size=0.2)

In [0]:
from sklearn.linear_model import LinearRegression

lin_reg = LinearRegression()
lin_reg.fit(X_train, y_train)

In [0]:
from sklearn.metrics import mean_squared_error
y_predict = lin_reg.predict(X_test)
lin_mse = mean_squared_error(y_test, y_predict)
lin_rmse = np.sqrt(lin_mse)
lin_rmse

In [0]:
import joblib
joblib.dump(lin_reg, 'sklearn_regression_model.pkl')

In [0]:
# Loading workspace

from azureml.core import Workspace
ws = Workspace.get(name="workspace",
               subscription_id='subscription_id',
               resource_group='resource_group')

###  Registering a model

In [0]:
import sklearn

from azureml.core import Model

In [0]:
model = Model.register(workspace=ws,
                       model_name='Lin_reg_model',                # Name of the registered model in your workspace.
                       model_path='./sklearn_regression_model.pkl',  # Local file to upload and register as a model.
                       description='Linear Regression model for housing dataset.',
                       tags={'area': 'housing', 'type': 'regression'})

print('Name:', model.name)
print('Version:', model.version)

### Fetch a model

In [0]:
# model = Model(ws, "Lin_reg_model", version=3)

### Dependency

In [0]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies


environment = Environment('housing_env')
environment.python.conda_dependencies = CondaDependencies.create(conda_packages=[
    'pip==22.2.2'],
    pip_packages=[
    'azureml-defaults',
    'inference-schema[numpy-support]',
    'joblib==0.17.0',
    'pandas==1.1.5',
    'scikit-learn==0.23.2'
])

### Deploying model

In [0]:
from azureml.core.model import InferenceConfig
from azureml.core.webservice import AciWebservice
from azureml.core.webservice import Webservice
from azureml.core.model import Model
from azureml.core.environment import Environment


script_file_name = "/dbfs/scripts/score.py"

inference_config = InferenceConfig(entry_script=script_file_name, environment=environment)

aciconfig = AciWebservice.deploy_configuration(cpu_cores = 1, 
                                               memory_gb = 1, 
                                               description = 'Housing_data_service')

aci_service_name = 'housingmodel-1'
print(aci_service_name)
aci_service = Model.deploy(ws, aci_service_name, [model], inference_config, aciconfig)
aci_service.wait_for_deployment(True)
print(aci_service.state)

### Saving result into data lake

In [0]:
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error

y_pred = lin_reg.predict(X_test)

mse = mean_squared_error(y_test, y_pred)
rmse = mean_squared_error(y_test, y_pred, squared=False)
mae = mean_absolute_error(y_test, y_pred)

In [0]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.getOrCreate()
pd_data = pd.DataFrame({
    "MSE" : [mse],
    "MAE" : [mae],
    "RMSE": [rmse]
})
data = spark.createDataFrame(pd_data)

In [0]:
filename = "abfss://"+lake_container+"@"+storage_account_name+".dfs.core.windows.net/output/result.csv"
data.write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv(filename)