In [17]:
import os
import itertools
from datetime import datetime

import boto3
import mlflow
import numpy as np
from dotenv import load_dotenv
import tensorflow.keras as keras

In [2]:
s3 = boto3.client(
    "s3",
    endpoint_url="http://127.0.0.1:9000",
    aws_access_key_id="minio",
    aws_secret_access_key="minio123"
)

In [5]:
load_dotenv()

os.getenv("J_MLFLOW_SERVER_URI")

'http://localhost:5000'

In [6]:
def get_or_create_experiment(experiment_name):
    """
    Retrieve the ID of an existing MLflow experiment or create a new one if it doesn't exist.

    This function checks if an experiment with the given name exists within MLflow.
    If it does, the function returns its ID. If not, it creates a new experiment
    with the provided name and returns its ID.

    Taken from mlflow.org

    Parameters:
    - experiment_name (str): Name of the MLflow experiment.

    Returns:
    - str: ID of the existing or newly created MLflow experiment.
    """

    if experiment := mlflow.get_experiment_by_name(experiment_name):
        return experiment.experiment_id
    else:
        return mlflow.create_experiment(experiment_name)


In [7]:
def get_latest_data_path(
        s3_client: boto3.client,
        bucket_name: str,
        base_folder: str = 'preprocessing'
) -> tuple[str, str]:
    """
    Find the latest timestamp folder and NPZ file in the specified bucket/folder
    Returns tuple of (full_path, filename)
    """
    response = s3_client.list_objects_v2(
        Bucket=bucket_name,
        Prefix=f"{base_folder}/",
        Delimiter='/'
    )

    timestamps = []
    for prefix in response.get('CommonPrefixes', []):
        folder_name = prefix['Prefix'].strip('/')
        try:
            timestamp = folder_name.replace(f"{base_folder}/", '')
            timestamps.append(timestamp)
        except ValueError:
            continue

    if not timestamps:
        raise ValueError("No timestamp folders found")

    latest_timestamp = sorted(timestamps)[-1]
    latest_folder = f"{base_folder}/{latest_timestamp}"

    response = s3_client.list_objects_v2(
        Bucket=bucket_name,
        Prefix=latest_folder
    )

    npz_files = [
        obj['Key'] for obj in response.get('Contents', [])
        if obj['Key'].endswith('.npz')
    ]

    if not npz_files:
        raise ValueError(f"No NPZ files found in {latest_folder}")

    latest_file = npz_files[0]
    return latest_file, latest_file.split('/')[-1]

In [14]:
def preprocess_and_store():
    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")

    (X_train, y_train), (X_test, y_test) = keras.datasets.mnist.load_data()
    X_train = X_train.astype('float32') / 255.0
    X_test = X_test.astype('float32') / 255.0
    X_train = np.expand_dims(X_train, axis=-1)
    X_test = np.expand_dims(X_test, axis=-1)

    local_path = f"/tmp/mnist_processed_{timestamp}.npz"
    np.savez_compressed(local_path,
                        X_train=X_train, y_train=y_train,
                        X_test=X_test, y_test=y_test)

    bucket_name = "mnist-data"
    object_path = f"preprocessing/{timestamp}/mnist_processed.npz"

    try:
        s3.head_bucket(Bucket=bucket_name)
    except:
        print(f"Bucket: {bucket_name} does not exist, creating one now!")
        s3.create_bucket(Bucket=bucket_name)

    s3.upload_file(local_path, bucket_name, object_path)

    os.remove(local_path)
    print(f"Preprocessed data stored to MinIO: {object_path}")

In [29]:
def train_mnist():
    
    bucket_name="mnist-data"
    base_folder="preprocessing"
    s3_path, filename = get_latest_data_path(s3, bucket_name=bucket_name,
                                             base_folder=base_folder)
    local_path = "/tmp"
    local_file = f"{local_path}/{filename}"
    s3.download_file(bucket_name, s3_path, local_file)

    data = np.load(local_file)
    X_train, y_train = data['X_train'], data['y_train']
    X_test, y_test = data['X_test'], data['y_test']

    y_train = keras.utils.to_categorical(y_train, 10)
    y_test = keras.utils.to_categorical(y_test, 10)

    mlflow.set_tracking_uri(os.getenv("J_MLFLOW_SERVER_URI"))
    experiment_id=get_or_create_experiment("MNIST_Hyperparameter_Search_autolog")
    mlflow.set_experiment(experiment_id=experiment_id)

    best_accuracy = 0
    best_model = None
    best_params = {}

    HYPERPARAM_GRID = {
        'epochs': [1, 2]
    }

    keys, values = zip(*HYPERPARAM_GRID.items())
    param_combinations = [dict(zip(keys, v)) for v in
                          itertools.product(*values)]
    
    mlflow.autolog()
    with mlflow.start_run(run_name="mnist-hyperparameter-tuning-parent"):
        for params in param_combinations:
            with mlflow.start_run(nested=True):
                model = keras.Sequential([
                    keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
                    keras.layers.MaxPooling2D((2, 2)),
                    keras.layers.Flatten(),
                    keras.layers.Dense(128, activation='relu'),
                    keras.layers.Dense(10, activation='softmax')
                ])
        
                optimizer = keras.optimizers.Adam(learning_rate=0.001)
                model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
                history = model.fit(
                    X_train,
                    y_train,
                    epochs=params['epochs'],
                    validation_data=(X_test, y_test),
                )
    
                val_acc = history.history['val_accuracy'][-1]
                train_acc = history.history['accuracy'][-1]
    
                if val_acc > best_accuracy:
                    best_accuracy = val_acc
                    best_model = model
                    best_params = params
                    
            if best_model is not None:
                artifact_path = "mnist_model_autolog"
                mlflow.tensorflow.log_model(model, artifact_path)
        
                model_uri = mlflow.get_artifact_uri(artifact_path)
                print("Model stored at ", model_uri)

In [15]:
preprocess_and_store()

Preprocessed data stored to MinIO: preprocessing/20250205-103140/mnist_processed.npz


In [20]:
os.environ["MLFLOW_S3_ENDPOINT_URL"] = os.getenv("J_MLFLOW_S3_ENDPOINT_URL")
os.getenv("MLFLOW_S3_ENDPOINT_URL")

'http://localhost:9000'

In [30]:
train_mnist()

2025/02/05 10:48:11 INFO mlflow.bedrock: Enabled auto-tracing for Bedrock. Note that MLflow can only trace boto3 service clients that are created after this call. If you have already created one, please recreate the client by calling `boto3.client`.
2025/02/05 10:48:11 INFO mlflow.tracking.fluent: Autologging successfully enabled for boto3.
2025/02/05 10:48:11 INFO mlflow.tracking.fluent: Autologging successfully enabled for keras.
2025/02/05 10:48:11 INFO mlflow.tracking.fluent: Autologging successfully enabled for sklearn.
2025/02/05 10:48:11 INFO mlflow.tracking.fluent: Autologging successfully enabled for tensorflow.
  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1870/1875[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 7ms/step - accuracy: 0.9153 - loss: 0.2849 



[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 7ms/step - accuracy: 0.9154 - loss: 0.2845 - val_accuracy: 0.9798 - val_loss: 0.0609




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 46ms/step




🏃 View run overjoyed-turtle-772 at: http://localhost:5000/#/experiments/2/runs/d5aee92cb6cc46dab7679df2378d4679
🧪 View experiment at: http://localhost:5000/#/experiments/2




Model stored at  s3://mlflow/2/c8f6b2065d824ccf96f9aa3bdf8098bf/artifacts/mnist_model_autolog


Epoch 1/2
[1m1869/1875[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 7ms/step - accuracy: 0.9134 - loss: 0.2888 



[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 7ms/step - accuracy: 0.9135 - loss: 0.2883 - val_accuracy: 0.9803 - val_loss: 0.0628
Epoch 2/2
[1m1869/1875[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 7ms/step - accuracy: 0.9843 - loss: 0.0508 



[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 8ms/step - accuracy: 0.9843 - loss: 0.0508 - val_accuracy: 0.9846 - val_loss: 0.0462
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step




🏃 View run learned-foal-225 at: http://localhost:5000/#/experiments/2/runs/a20e2f61b9df46d5a5ef83cf9c5afc54
🧪 View experiment at: http://localhost:5000/#/experiments/2




Model stored at  s3://mlflow/2/c8f6b2065d824ccf96f9aa3bdf8098bf/artifacts/mnist_model_autolog
🏃 View run mnist-hyperparameter-tuning-parent at: http://localhost:5000/#/experiments/2/runs/c8f6b2065d824ccf96f9aa3bdf8098bf
🧪 View experiment at: http://localhost:5000/#/experiments/2
