In [1]:
import json
import warnings

import numpy as np
import pandas as pd
import requests
from sklearn.ensemble import RandomForestRegressor

import mlflow
from mlflow.models import infer_signature

warnings.filterwarnings("ignore")

In [2]:
DOW_MODEL_NAME_PREFIX = "DOW_model_"
MME_MODEL_NAME = "MME_DOW_model"

In [3]:
def create_weekly_dataset(n_dates, n_observations_per_date):
  rng = pd.date_range(start="today", periods=n_dates, freq="D")
  df = pd.DataFrame(
      np.random.randn(n_dates * n_observations_per_date, 4),
      columns=["x1", "x2", "x3", "y"],
      index=np.tile(rng, n_observations_per_date),
  )
  df["dow"] = df.index.dayofweek
  return df


df = create_weekly_dataset(n_dates=30, n_observations_per_date=500)
print(df.shape)
df.head()

(15000, 5)


Unnamed: 0,x1,x2,x3,y,dow
2025-04-15 12:17:44.475420,-0.214841,-0.2989,0.634389,1.177039,1
2025-04-16 12:17:44.475420,0.818592,0.090341,-0.83252,-1.279138,2
2025-04-17 12:17:44.475420,2.849018,0.029121,0.030814,-2.264813,3
2025-04-18 12:17:44.475420,0.997985,0.418582,-0.198927,0.014201,4
2025-04-19 12:17:44.475420,0.503126,0.030147,0.358636,-0.685085,5


In [4]:
for dow in df["dow"].unique():
  # Create dataset corresponding to a single day of the week
  X = df.loc[df["dow"] == dow]
  X.pop("dow")  # Remove DOW as a predictor column
  y = X.pop("y")

  # Fit our DOW model
  model = RandomForestRegressor().fit(X, y)

  # Infer signature of the model
  signature = infer_signature(X, model.predict(X))

  with mlflow.start_run():
      model_path = f"model_{dow}"

      # Log and register our DOW model with signature
      mlflow.sklearn.log_model(
          model,
          model_path,
          signature=signature,
          registered_model_name=f"{DOW_MODEL_NAME_PREFIX}{dow}",
      )
      mlflow.set_tag("dow", dow)

The git executable must be specified in one of the following ways:
    - be included in your $PATH
    - be set via $GIT_PYTHON_GIT_EXECUTABLE
    - explicitly set via git.refresh(<full-path-to-git-executable>)

All git commands will error until this is rectified.

This initial message can be silenced or aggravated in the future by setting the
$GIT_PYTHON_REFRESH environment variable. Use one of the following values:
    - quiet|q|silence|s|silent|none|n|0: for no message or exception
    - error|e|exception|raise|r|2: for a raised exception

Example:
    export GIT_PYTHON_REFRESH=quiet

Successfully registered model 'DOW_model_1'.
Created version '1' of model 'DOW_model_1'.
Successfully registered model 'DOW_model_2'.
Created version '1' of model 'DOW_model_2'.
Successfully registered model 'DOW_model_3'.
Created version '1' of model 'DOW_model_3'.
Successfully registered model 'DOW_model_4'.
Created version '1' of model 'DOW_model_4'.
Successfully registered model 'DOW_model_5'.
Crea

In [7]:
# Load Tuesday's model
tuesday_dow = 1
model_name = f"{DOW_MODEL_NAME_PREFIX}{tuesday_dow}"
model_uri = f"models:/{model_name}/latest"
model = mlflow.sklearn.load_model(model_uri)

# Perform inference using our training data for Tuesday
predictor_columns = [column for column in df.columns if column not in {"y", "dow"}]
head_of_training_data = df.loc[df["dow"] == tuesday_dow, predictor_columns].head()
tuesday_fitted_values = model.predict(head_of_training_data)
print(tuesday_fitted_values)

[ 0.78367474  0.85385597 -0.51975899 -1.06918999 -0.76458862]


In [8]:
class DOWModel(mlflow.pyfunc.PythonModel):
  def __init__(self, model_uris):
      self.model_uris = model_uris
      self.models = {}

  @staticmethod
  def _model_uri_to_dow(model_uri: str) -> int:
      return int(model_uri.split("/")[-2].split("_")[-1])

  def load_context(self, context):
      self.models = {
          self._model_uri_to_dow(model_uri): mlflow.sklearn.load_model(model_uri)
          for model_uri in self.model_uris
      }

  def predict(self, context, model_input, params):
      # Parse the dow parameter
      dow = params.get("dow")
      if dow is None:
          raise ValueError("DOW param is not passed.")

      # Get the model associated with the dow parameter
      model = self.models.get(dow)
      if model is None:
          raise ValueError(f"Model {dow} version was not found: {self.models.keys()}.")

      # Perform inference
      return model.predict(model_input)
  
  



In [9]:
head_of_training_data

Unnamed: 0,x1,x2,x3
2025-04-15 12:17:44.475420,-0.214841,-0.2989,0.634389
2025-04-22 12:17:44.475420,0.0537,-0.292374,-0.137657
2025-04-29 12:17:44.475420,0.740965,1.546126,0.457648
2025-05-06 12:17:44.475420,-1.524318,-0.638831,-0.795944
2025-05-13 12:17:44.475420,0.809163,0.63521,-0.90875


In [11]:
# Instantiate our DOW MME
model_uris = [f"models:/{DOW_MODEL_NAME_PREFIX}{i}/latest" for i in df["dow"].unique()]
dow_model = DOWModel(model_uris)
dow_model.load_context(None)
print("Model URIs:")
print(model_uris)

# Perform inference using our training data for Tuesday
params = {"dow": 1}
mme_tuesday_fitted_values = dow_model.predict(None, head_of_training_data, params=params)
assert all(tuesday_fitted_values == mme_tuesday_fitted_values)

print("Tuesday fitted values:")
print(mme_tuesday_fitted_values)

Model URIs:
['models:/DOW_model_1/latest', 'models:/DOW_model_2/latest', 'models:/DOW_model_3/latest', 'models:/DOW_model_4/latest', 'models:/DOW_model_5/latest', 'models:/DOW_model_6/latest', 'models:/DOW_model_0/latest']
Tuesday fitted values:
[ 0.78367474  0.85385597 -0.51975899 -1.06918999 -0.76458862]


In [12]:
with mlflow.start_run():
  # Instantiate the custom pyfunc model
  model = DOWModel(model_uris)
  model.load_context(None)
  model_path = "MME_model_path"

  signature = infer_signature(
      model_input=head_of_training_data,
      model_output=tuesday_fitted_values,
      params=params,
  )
  print(signature)

  # Log the model to the experiment
  mlflow.pyfunc.log_model(
      model_path,
      python_model=model,
      signature=signature,
      pip_requirements=["scikit-learn=1.3.2"],
      registered_model_name=MME_MODEL_NAME,  # also register the model for easy access
  )

  # Set some relevant information about our model
  # (Assuming model has a property 'models' that can be counted)
  mlflow.log_param("num_models", len(model.models))

inputs: 
  ['x1': double (required), 'x2': double (required), 'x3': double (required)]
outputs: 
  [Tensor('float64', (-1,))]
params: 
  ['dow': long (default: 1)]



Successfully registered model 'MME_DOW_model'.
Created version '1' of model 'MME_DOW_model'.


In [19]:
model_uris[0]

'models:/DOW_model_1/latest'

In [None]:
PORT = 5051
print(
  f"""Run the below command in a new window. You must be in the same repo as your mlruns directory and have mlflow installed...
  mlflow models serve -m "models:/{DOW_model_1}/latest" --env-manager local -p {PORT}"""
)

Run the below command in a new window. You must be in the same repo as your mlruns directory and have mlflow installed...
  mlflow models serve -m "models:/MME_DOW_model/latest" --env-manager local -p 5051


In [21]:
from mlflow.tracking import MlflowClient

client = MlflowClient()

# List all registered models
for model in client.list_registered_models():
    print(f"Model Name: {model.name}")
    print(f"Latest Versions: {model.latest_versions}")
    print("-" * 40)

AttributeError: 'MlflowClient' object has no attribute 'list_registered_models'

In [18]:
def score_model(pdf, params):
  headers = {"Content-Type": "application/json"}
  url = f"http://127.0.0.1:{PORT}/invocations"
  ds_dict = {"dataframe_split": pdf, "params": params}
  data_json = json.dumps(ds_dict, allow_nan=True)

  response = requests.request(method="POST", headers=headers, url=url, data=data_json)
  response.raise_for_status()

  return response.json()


print("Inference on dow model 1 (Tuesday):")
inference_df = head_of_training_data.reset_index(drop=True).to_dict(orient="split")
print(score_model(inference_df, params={"dow": 1}))

Inference on dow model 1 (Tuesday):


ConnectionError: HTTPConnectionPool(host='127.0.0.1', port=5051): Max retries exceeded with url: /invocations (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f6e660e7fd0>: Failed to establish a new connection: [Errno 111] Connection refused'))