In [None]:
#Create an MLFlow experiment
import mlflow

experiment_name='astro_ml_dev'
tags={'project': 'astro_ml', 'model':'RidgeCV'}

try:
    experiment_id = mlflow.create_experiment(name=experiment_name, tags=tags)
except (mlflow.exceptions.MlflowException, mlflow.exceptions.RestException) as e:
    if "already exists" in e.args[0]:
        experiment = mlflow.search_experiments(filter_string="name = '"+experiment_name+"'")
        experiment_id = experiment[0].experiment_id


#Set S3 access for MLFlow artifact writing.
import os
os.environ['AWS_ACCESS_KEY_ID']='minioadmin'
os.environ['AWS_SECRET_ACCESS_KEY']='minioadmin'


In [None]:
#Download data and save it to S3
from s3fs import S3FileSystem
from sklearn.datasets import fetch_california_housing

housing_df = fetch_california_housing(download_if_missing=True, as_frame=True).frame

fs = S3FileSystem(key='minioadmin', secret='minioadmin', client_kwargs={'endpoint_url': "http://localhost:9000/"})
with fs.open('s3://data/housing_df.csv', 'wb') as f:
    f.write(housing_df.to_csv(index=False).encode())

In [None]:
#Make sure we can read it back
from s3fs import S3FileSystem
import pandas as pd
fs = S3FileSystem(key='minioadmin', secret='minioadmin', client_kwargs={'endpoint_url': "http://localhost:9000/"})

with fs.open('s3://data/housing_df.csv', 'rb') as f:
    housing_df = pd.read_csv(f)

In [None]:
#Feature engineering
import mlflow
from sklearn.preprocessing import StandardScaler

mlflow.sklearn.autolog()

target = 'MedHouseVal'
X = housing_df.drop(target, axis=1)
y = housing_df[target]

scaler = StandardScaler()

with mlflow.start_run(experiment_id=experiment_id, run_name='Scaler') as run:
    X = pd.DataFrame(scaler.fit_transform(X), columns=X.columns)
    mlflow.sklearn.log_model(scaler, artifact_path='scaler')
    mlflow.log_metrics(pd.DataFrame(scaler.mean_, index=X.columns)[0].to_dict())

In [None]:
#Train a model

import mlflow
from sklearn.linear_model import RidgeCV
from sklearn.model_selection import cross_validate
import numpy as np
import pandas as pd

mlflow.sklearn.autolog()

target = 'MedHouseVal'

model = RidgeCV(alphas=np.logspace(-3, 1, num=30))

with mlflow.start_run(experiment_id=experiment_id, run_name='RidgeCV') as run:
    reg = model.fit(X, y)

run_id=run.info.run_id

#return run_id

In [None]:
#Score the data

import mlflow

logged_model = 'runs:/'+run_id+'/model'
loaded_model = mlflow.pyfunc.load_model(logged_model)
X['pred'] = loaded_model.predict(X)
X[target] = y

In [None]:
#Write data with predictions to S3
from s3fs import S3FileSystem

fs = S3FileSystem(key='minioadmin', secret='minioadmin', client_kwargs={'endpoint_url': "http://localhost:9000/"})
with fs.open('s3://data/housing_pred.csv', 'wb') as f:
    f.write(X.to_csv(index=False).encode())