### 1. Install dependencies

In [2]:
#!pip install scikit-learn pandas
!pip install -r requirements.txt



In [1]:
# 2. Imports
import os
import mlflow
import mlflow.sklearn
from sklearn.linear_model import ElasticNet
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
import numpy as np
from mlflow.models import infer_signature

# DATA LOAD PHASE 
## CDF load 
### from https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv 
### and WRITES TO s3:my_bucket/raw-data/winequality-red.csv)

#### look at CDF > Project AIPeritivos > GetRawDataFromSource

# FEATURISATION PHASE 
## read from s3:my_bucket/raw-data/winequality-red.csv 
## and write a iceberg table winequality-red-table

### a) original version: 
* ### read from http, in a cell

In [3]:
# 3. Load data: (re-import every dependencies of this cell, to better export in script)
import pandas as pd
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
df = pd.read_csv(url, sep=';')
df.head()

# once CDF writes into s3:my_bucket/raw-data/winequality-red.csv, 
    # a cell reads the csv and write a iceberg table (SAMPLE FEATURISATION)
    # a cell reads from iceberg table and split data... (TRAIN PHASE)

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5


### b) refactored version: 
* ### read from s3 bucket where CDF has loaded from http
    * ### check if file on s3 has been modified recently (1 hour) otherwise does not featurise 

In [8]:
import boto3
import os
import pandas as pd
from io import StringIO
from datetime import datetime, timezone, timedelta


aws_access_key_id = os.environ["AWS_ACCESS_KEY"]
aws_secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]

print(f">>>> Read AWS credentials to access rawdata bucket")

# S3 configuration
# bucket = 'gab-bucket-removeme'
# key = 'rawdata/winequality-red.csv'  # Update with the actual key in your bucket

bucket = os.environ["S3_RAWDATA_BUCKET"]
key = os.environ["S3_RAWDATA_KEY"]  # Update with the actual key in your bucket

print(f">>>> Reading rawdata csv from s3 bucket ")
print(f">>>>>>>> Bucket: {bucket} ....")
print(f">>>>>>>> Bucket Key: {key} ....")


# Create S3 client with credentials
s3 = boto3.client(
    's3',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key
)

# Get object metadata
head = s3.head_object(Bucket=bucket, Key=key)
last_modified = head['LastModified']

# Check if object was modified within the last hour
now = datetime.now(timezone.utc)
if (now - last_modified) <= timedelta(hours=1):
    # Read object from S3
    response = s3.get_object(Bucket=bucket, Key=key)
    body = response['Body'].read().decode('utf-8')

    # Load CSV into DataFrame
    df = pd.read_csv(StringIO(body), sep=';')

else:
    print("Object is older than 1 hour. Skipping.")

# Display data
df.head()

>>>> Read AWS credentials to access rawdata bucket
>>>> Reading rawdata csv from s3 bucket 
>>>>>>>> Bucket: gab-bucket-removeme ....
>>>>>>>> Bucket Key: rawdata/winequality-red.csv ....


Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5


In [6]:
# Write rawdata csv into a Iceberg table, as a feature set
## table features.winequality has been already created by admin as a parquet / iceberg table
import cml.data_v1 as cmldata

CONNECTION_NAME = "aiperitivo-hive" # jdbc:hive2://hs2-gab-aiperitivo-hive.dw-emerging-cdp-env.dp5i-5vkq.cloudera.site/default;transportMode=http;httpPath=cliservice;socketTimeout=60;ssl=true;retries=3;

conn = cmldata.get_connection(CONNECTION_NAME)

## Sample Usage to get pandas data frame
EXAMPLE_SQL_QUERY = "show databases"
dataframe = conn.get_pandas_dataframe(EXAMPLE_SQL_QUERY)
print(dataframe)

# Closing the connection
conn.close()


## Other Usage Notes:

## Alternate Sample Usage to provide different credentials as optional parameters
#conn = cmldata.get_connection(
#    CONNECTION_NAME, {"USERNAME": "someuser", "PASSWORD": "somepassword"}
#)

## Alternate Sample Usage to get DB API Connection interface
#db_conn = conn.get_base_connection()

## Alternate Sample Usage to get DB API Cursor interface
#db_cursor = conn.get_cursor()
#db_cursor.execute(EXAMPLE_SQL_QUERY)
#for row in db_cursor:
#  print(row)

              database_name
0                   default
1                 diptidash
2                emerging01
3       emerging01_airlines
4   emerging01_airlines_raw
5                emerging02
6                emerging03
7                emerging04
8                emerging05
9       emerging05_airlines
10  emerging05_airlines_raw
11               emerging06
12               emerging07
13      emerging07_airlines
14  emerging07_airlines_raw
15               emerging08
16      emerging08_airlines
17  emerging08_airlines_raw
18               emerging09
19      emerging09_airlines
20               emerging10
21               emerging11
22      emerging11_airlines
23  emerging11_airlines_raw
24               emerging12
25               emerging13
26               emerging14
27               emerging15
28               emerging16
29               emerging17
30               emerging18
31               emerging19
32               emerging20
33               emerging21
34               eme

<impala.hiveserver2.HiveServer2Connection at 0x7f7883600b50>

# TRAIN PHASE
## read feature set 
## split data, set experiment and run, train, eval, etc

In [10]:
import mlflow
import mlflow.sklearn
from sklearn.linear_model import ElasticNet
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
import numpy as np
from mlflow.models import infer_signature

# 4. Split data
X = df.drop("quality", axis=1)
y = df["quality"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_test.head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol
803,7.7,0.56,0.08,2.5,0.114,14.0,46.0,0.9971,3.24,0.66,9.6
124,7.8,0.5,0.17,1.6,0.082,21.0,102.0,0.996,3.39,0.48,9.5
350,10.7,0.67,0.22,2.7,0.107,17.0,34.0,1.0004,3.28,0.98,9.9
682,8.5,0.46,0.31,2.25,0.078,32.0,58.0,0.998,3.33,0.54,9.8
1326,6.7,0.46,0.24,1.7,0.077,18.0,34.0,0.9948,3.39,0.6,10.6


## OPTIONAL: convert X_test to json and write to a file

In [53]:
# Convert DataFrame to JSON string (records format is usually good for row-wise data)
xTest_all_json = X_test.to_json(orient='records')

# Write JSON string to a file
with open("sample_inputs/X_test_all.json", "w") as f:
    f.write(xTest_all_json)

In [54]:
# Convert the first row (record) of the DataFrame to JSON
xTest_first_record_json = X_test.iloc[0:1].to_json(orient='records')

# Write that single-record JSON string to a file
with open("sample_inputs/X_test_first_record.json", "w") as f:
    f.write(xTest_first_record_json)

# SKLEARN: SETUP EXPERIMENT AND START RUN 

In [19]:
# 5. Set up experiment and start run
mlflow.set_experiment("wine_quality_experiment")
with mlflow.start_run(run_name="elasticnet_wine-2141"):

    # Log hyperparameters
    alpha = 0.5
    l1_ratio = 0.5
    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)

    # Train model
    model = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
    model.fit(X_train, y_train)

    # Evaluate and log metrics
    preds = model.predict(X_test)
    mse = mean_squared_error(y_test, preds)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_test, preds)
    r2 = r2_score(y_test, preds)


    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("mae", mae)
    mlflow.log_metric("r2", r2)

    # Set a tag that we can use to remind ourselves what this run was for
    mlflow.set_tag("Training Info", "Basic model for wine quality")

    # Infer the model signature
    signature = infer_signature(X_train, model.predict(X_train))
    
    # 6. Log and register model
    model_info = mlflow.sklearn.log_model(
        sk_model=model,
        artifact_path="elasticnet_model",
        signature=signature,
        input_example=X_test,
        registered_model_name="ElasticNetWineModel"
    )


  "dataframe_split": {
    "columns": [
      "fixed acidity",
      "volatile acidity",
      "citric acid",
      "residual sugar",
      "chlorides",
      "free sulfur dioxide",
      "total sulfur dioxide",
      "density",
      "pH",
      "sulphates",
      "alcohol"
    ],
    "data": [
      [
        7.7,
        0.56,
        0.08,
        2.5,
        0.114,
        14.0,
        46.0,
        0.9971,
        3.24,
        0.66,
        9.6
      ],
      [
        7.8,
        0.5,
        0.17,
        1.6,
        0.082,
        21.0,
        102.0,
        0.996,
        3.39,
        0.48,
        9.5
      ],
      [
        10.7,
        0.67,
        0.22,
        2.7,
        0.107,
        17.0,
        34.0,
        1.0004,
        3.28,
        0.98,
        9.9
      ],
      [
        8.5,
        0.46,
        0.31,
        2.25,
        0.078,
        32.0,
        58.0,
        0.998,
        3.33,
        0.54,
        9.8
      ],
      [
        6.7,
  

In [21]:
print(f"Run completed with RMSE: {rmse:.4f}, R^2: {r2:.4f}")

Run completed with RMSE: 0.7628, R^2: 0.1096


## Load our SKLEARN saved model as a Python Function

Although we can load our model back as a native scikit-learn format with `mlflow.sklearn.load_model()`, below we are loading the model as a generic Python Function, which is how this model would be loaded for online model serving. We can still use the `pyfunc` representation for batch use cases, though, as is shown below.

In [35]:
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)

## Use our SKLEARN model to predict the wine quality on a Pandas DataFrame

In [37]:
# Predict on a Pandas DataFrame.
import pandas as pd
predicted_quality = loaded_model.predict(pd.DataFrame(X_test))

#print(predicted_quality)

# Convert X_test validation feature data to a Pandas DataFrame
result = pd.DataFrame(X_test)

# Add the actual quality result to the DataFrame
result["y_test_quality"] = y_test

# Add the model predictions to the DataFrame
result["predicted_quality"] = predicted_quality

result[:4]

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,y_test_quality,predicted_quality
803,7.7,0.56,0.08,2.5,0.114,14.0,46.0,0.9971,3.24,0.66,9.6,6,5.548497
124,7.8,0.5,0.17,1.6,0.082,21.0,102.0,0.996,3.39,0.48,9.5,5,5.290737
350,10.7,0.67,0.22,2.7,0.107,17.0,34.0,1.0004,3.28,0.98,9.9,6,5.648928
682,8.5,0.46,0.31,2.25,0.078,32.0,58.0,0.998,3.33,0.54,9.8,5,5.590718


# Convert SKLEARN to ONNX and log model

In [46]:

import mlflow.onnx
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType

# Assuming you have `model` trained and X_test available

# Define the initial type for ONNX conversion based on input shape
initial_type = [('input', FloatTensorType([None, X_test.shape[1]]))]

# Convert the sklearn model to ONNX
onnx_model = convert_sklearn(model, initial_types=initial_type)

# Start an MLflow run
with mlflow.start_run(run_name="elasticnet_wine-2141-ONNX"):
    # Log the ONNX model with MLflow ONNX flavor and register it
    onnx_model_info = mlflow.onnx.log_model(
        onnx_model=onnx_model,
        artifact_path="elasticnet_model_onnx",
        input_example=X_test,
        registered_model_name="ElasticNetWineModel_ONNX"
    )


  "dataframe_split": {
    "columns": [
      "fixed acidity",
      "volatile acidity",
      "citric acid",
      "residual sugar",
      "chlorides",
      "free sulfur dioxide",
      "total sulfur dioxide",
      "density",
      "pH",
      "sulphates",
      "alcohol"
    ],
    "data": [
      [
        7.7,
        0.56,
        0.08,
        2.5,
        0.114,
        14.0,
        46.0,
        0.9971,
        3.24,
        0.66,
        9.6
      ],
      [
        7.8,
        0.5,
        0.17,
        1.6,
        0.082,
        21.0,
        102.0,
        0.996,
        3.39,
        0.48,
        9.5
      ],
      [
        10.7,
        0.67,
        0.22,
        2.7,
        0.107,
        17.0,
        34.0,
        1.0004,
        3.28,
        0.98,
        9.9
      ],
      [
        8.5,
        0.46,
        0.31,
        2.25,
        0.078,
        32.0,
        58.0,
        0.998,
        3.33,
        0.54,
        9.8
      ],
      [
        6.7,
  

### ONCE LOGGED, look at Experiment / Run, then publish into model registry

## Load our ONNX saved model as a Python Function

Although we can load our model back as a native scikit-learn format with `mlflow.sklearn.load_model()`, below we are loading the model as a generic Python Function, which is how this model would be loaded for online model serving. We can still use the `pyfunc` representation for batch use cases, though, as is shown below.

In [47]:
onnx_loaded_model = mlflow.pyfunc.load_model(onnx_model_info.model_uri)
print(onnx_loaded_model.metadata.signature)

## Use our ONNX model to predict the wine quality on a Pandas DataFrame

In [48]:
# Predict on a Pandas DataFrame.
import pandas as pd
onnx_predicted_quality = onnx_loaded_model.predict(pd.DataFrame(X_test))

#print(predicted_quality)

# Convert X_test validation feature data to a Pandas DataFrame
result = pd.DataFrame(X_test)

# Add the actual quality result to the DataFrame
result["y_test_quality"] = y_test

# Add the model predictions to the DataFrame
result["predicted_quality"] = onnx_predicted_quality

result[:4]

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,y_test_quality,predicted_quality
803,7.7,0.56,0.08,2.5,0.114,14.0,46.0,0.9971,3.24,0.66,9.6,6,
124,7.8,0.5,0.17,1.6,0.082,21.0,102.0,0.996,3.39,0.48,9.5,5,5.659277
350,10.7,0.67,0.22,2.7,0.107,17.0,34.0,1.0004,3.28,0.98,9.9,6,
682,8.5,0.46,0.31,2.25,0.078,32.0,58.0,0.998,3.33,0.54,9.8,5,


In [56]:
# Predict on a FIRST ROW OF A Pandas DataFrame.
import pandas as pd
onnx_predicted_quality_1 = onnx_loaded_model.predict(X_test.iloc[0:1])

#print(predicted_quality)

# Convert X_test validation feature data to a Pandas DataFrame
result = pd.DataFrame(X_test.iloc[0:1])

# Add the actual quality result to the DataFrame
result["y_test_quality"] = y_test.iloc[0:1]

# Add the model predictions to the DataFrame
result["predicted_quality"] = onnx_predicted_quality_1

result

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,y_test_quality,predicted_quality
803,7.7,0.56,0.08,2.5,0.114,14.0,46.0,0.9971,3.24,0.66,9.6,6,


### NOW GO TO REGISTRY, DEPLOY ONNX MODEL TO MODEL INFERENCE, AND USE THE PYTHON CLIENT onnx-elasticnet_wine_quality_client.py