# Flow Predictor with MLflow Integration

This notebook implements MLflow tracking for the flow predictor model.

In [8]:
# Import section - add MLflow imports
import keras
from keras.models import Model
from keras import backend as K
from keras.layers import (
    Input, 
    concatenate, 
    Conv2D,
    MaxPooling2D,
    Conv2DTranspose,
    ZeroPadding2D
)
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Reshape, Dense, Flatten

import vtk
from vtm_data import VTK_data
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import os
import mlflow
import mlflow.keras

# Create experiment
mlflow.set_experiment("Flow_Predictor_Training")

# Enable MLflow autologging
mlflow.keras.autolog(
    log_models=True,
    log_model_signatures=True,
    log_input_examples=True,
    registered_model_name="flow_predictor"
)

os.environ['CUDA_VISIBLE_DEVICES'] = '-1'



In [9]:
# training params
batch_size = 32
epochs = 100 # number of times through training set

In [10]:
# load dataset
class VTK_data:
    def __init__(self, base_path, split_ratio=0.8):
        self.base_path = Path(base_path)
        self.data = []
        self.geometries = []
        self.steady_flows = []
        self.split_ratio = split_ratio
        self.split_line = 0
        
    def load_data(self):
        for dirpath, dirnames, filenames in os.walk(self.base_path):
            for filename in filenames:
                if filename.endswith('.vtm'):
                    full_path = Path(dirpath) / filename
                    try:
                        data = self._load_single_file(full_path)
                        if data is not None:
                            print(f"\nProcessing file: {filename}")
                            print(f"Full path: {full_path}")
                            
                            if 'geometry' in filename:
                                print(f"Classified as geometry file")
                                self.geometries.append(data)
                            elif 'cylinder2d_iT' in filename:
                                print(f"Classified as flow file")
                                self.steady_flows.append(data)
                            else:
                                print(f"Skipping file: {filename}")
                    except Exception as e:
                        print(f"Error loading {full_path}: {str(e)}")
        
        print("\nLoaded data summary:")
        print(f"Number of geometry files: {len(self.geometries)}")
        if self.geometries:
            print(f"Shape of first geometry: {np.array(self.geometries[0]).shape}")
        
        print(f"Number of flow files: {len(self.steady_flows)}")
        if self.steady_flows:
            print(f"Shape of first flow: {np.array(self.steady_flows[0]).shape}")
        
        total_samples = len(self.geometries)
        self.split_line = int(total_samples * self.split_ratio)
        
        return self.geometries, self.steady_flows
    
    def _load_single_file(self, file_path):
        reader = vtk.vtkXMLMultiBlockDataReader()
        reader.SetFileName(str(file_path))
        reader.Update()
        
        if reader.GetErrorCode() != 0:
            raise RuntimeError(f"Error reading file")
        
        data = reader.GetOutput()
        if data is None:
            raise RuntimeError("No data read from file")
            
        data_iterator = data.NewIterator()
        img_data = data_iterator.GetCurrentDataObject()
        
        if img_data is None:
            raise RuntimeError("No image data found in file")
        
        if hasattr(img_data, 'GetProducerPort'):
            producer = img_data.GetProducerPort()
            if producer:
                producer.Update()
        elif hasattr(img_data, 'GetSource'):
            source = img_data.GetSource()
            if source:
                source.Update()
                
        point_data = img_data.GetPointData()
        array_data = point_data.GetArray(0)
        array_data = vtk.util.numpy_support.vtk_to_numpy(array_data)
        
        return array_data

# Create instance and load data
base_directory = "/mnt/data/cfd-ml-examples/sumulation"
dataset = VTK_data(base_directory)

# Load the data
geometries, steady_flows = dataset.load_data()


Processing file: geometry_iT0000000.vtm
Full path: /mnt/data/cfd-ml-examples/sumulation/simulation_data/runlog_01483/vtkData/geometry_iT0000000.vtm
Classified as geometry file

Processing file: cuboid_iT0000000.vtm
Full path: /mnt/data/cfd-ml-examples/sumulation/simulation_data/runlog_01483/vtkData/cuboid_iT0000000.vtm
Skipping file: cuboid_iT0000000.vtm

Processing file: rank_iT0000000.vtm
Full path: /mnt/data/cfd-ml-examples/sumulation/simulation_data/runlog_01483/vtkData/rank_iT0000000.vtm
Skipping file: rank_iT0000000.vtm

Processing file: cylinder2d_iT0032530.vtm
Full path: /mnt/data/cfd-ml-examples/sumulation/simulation_data/runlog_01483/vtkData/data/cylinder2d_iT0032530.vtm
Classified as flow file

Processing file: geometry_iT0000000.vtm
Full path: /mnt/data/cfd-ml-examples/sumulation/simulation_data/runlog_01484/vtkData/geometry_iT0000000.vtm
Classified as geometry file

Processing file: cuboid_iT0000000.vtm
Full path: /mnt/data/cfd-ml-examples/sumulation/simulation_data/runlo

In [11]:
# get train and test split
train_geometries = dataset.geometries[0:dataset.split_line]
train_steady_flows = dataset.steady_flows[0:dataset.split_line]
test_geometries = dataset.geometries[dataset.split_line:-1]
test_steady_flows = dataset.steady_flows[dataset.split_line:-1]

print(f"Training set size: {len(train_geometries)} samples")
print(f"Test set size: {len(test_geometries)} samples")

Training set size: 1600 samples
Test set size: 399 samples


In [12]:
# reshape into single np array
train_geometries = np.stack(train_geometries, axis=0)
train_steady_flows = np.stack(train_steady_flows, axis=0)
test_geometries = np.stack(test_geometries, axis=0)
test_steady_flows = np.stack(test_steady_flows, axis=0)

# print dataset values
print('geometry shape:', train_geometries.shape[1:])
print('steady flow shape:', train_steady_flows.shape[1:])
print(train_geometries.shape[0], ' train samples')
print(test_geometries.shape[0], ' test samples')

geometry shape: (9812,)
steady flow shape: (9812, 2)
1600  train samples
399  test samples


In [13]:
# Define and create model
def create_flow_predictor_model():
    inputs = Input(shape=(9812,))
    reshaped = Reshape((44, 223, 1))(inputs)
    
    # Encoder Path
    conv1 = Conv2D(32, (3, 3), activation='relu', padding='same')(reshaped)
    conv1 = Conv2D(32, (3, 3), activation='relu', padding='same')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
    
    conv2 = Conv2D(64, (3, 3), activation='relu', padding='same')(pool1)
    conv2 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    
    conv3 = Conv2D(128, (3, 3), activation='relu', padding='same')(pool2)
    conv3 = Conv2D(128, (3, 3), activation='relu', padding='same')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
    
    conv4 = Conv2D(256, (3, 3), activation='relu', padding='same')(pool3)
    conv4 = Conv2D(256, (3, 3), activation='relu', padding='same')(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)
    
    # Bridge
    conv5 = Conv2D(512, (3, 3), activation='relu', padding='same')(pool4)
    conv5 = Conv2D(512, (3, 3), activation='relu', padding='same')(conv5)
    
    # Decoder Path
    up6 = concatenate([ZeroPadding2D(((1,0),(1,0)))(Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(conv5)), conv4], axis=3)
    conv6 = Conv2D(256, (3, 3), activation='relu', padding='same')(up6)
    conv6 = Conv2D(256, (3, 3), activation='relu', padding='same')(conv6)
    
    up7 = concatenate([ZeroPadding2D(((1,0),(1,0)))(Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(conv6)), conv3], axis=3)
    conv7 = Conv2D(128, (3, 3), activation='relu', padding='same')(up7)
    conv7 = Conv2D(128, (3, 3), activation='relu', padding='same')(conv7)
    
    up8 = concatenate([ZeroPadding2D(((0,0),(1,0)))(Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv7)), conv2], axis=3)
    conv8 = Conv2D(64, (3, 3), activation='relu', padding='same')(up8)
    conv8 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv8)
    
    up9 = concatenate([ZeroPadding2D(((0,0),(1,0)))(Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv8)), conv1], axis=3)
    conv9 = Conv2D(32, (3, 3), activation='relu', padding='same')(up9)
    conv9 = Conv2D(32, (3, 3), activation='relu', padding='same')(conv9)
    
    conv10 = Conv2D(2, (1, 1), activation='linear')(conv9)
    final_output = Reshape((9812, 2))(conv10)
    
    model = Model(inputs=inputs, outputs=final_output)
    model.compile(
        loss='mse',
        optimizer=keras.optimizers.Adam(learning_rate=1e-4),
        metrics=['MSE']
    )
    return model

# Create model
model = create_flow_predictor_model()
model.summary()

In [14]:
print("Starting MLflow run for model training...")

# Train model with MLflow tracking
with mlflow.start_run(run_name="flow_predictor_training") as run:
    # Log parameters
    mlflow.log_param("batch_size", batch_size)
    mlflow.log_param("epochs", epochs)
    mlflow.log_param("learning_rate", 1e-4)
    mlflow.log_param("train_samples", len(train_geometries))
    mlflow.log_param("test_samples", len(test_geometries))
    
    # Train model - MLflow autolog will capture metrics
    history = model.fit(
        train_geometries, 
        train_steady_flows,
        batch_size=batch_size,
        epochs=epochs,
        verbose=1,
        validation_data=(test_geometries, test_steady_flows)
    )
    
    # Evaluate on test set
    test_loss = model.evaluate(test_geometries, test_steady_flows, verbose=0)
    mlflow.log_metric("test_mse", test_loss[0])
    print('\nAverage Mean Squared Error:', test_loss[0])
    
    # Generate and log sample predictions visualization
    print("\nGenerating prediction visualizations...")
    predicted_steady_flow = model.predict(test_geometries[:3], batch_size=batch_size)
    
    for i in range(3):
        pred_reshaped = predicted_steady_flow[i].reshape(44, 223, 2)
        true_reshaped = test_steady_flows[i].reshape(44, 223, 2)
        geom_reshaped = test_geometries[i].reshape(44, 223)
        
        plt.figure(figsize=(15, 5))
        velocity_image = np.concatenate([
            pred_reshaped[:,:,0],
            true_reshaped[:,:,0],
            geom_reshaped/10.0
        ], axis=1)
        
        plt.imshow(velocity_image)
        plt.title(f'Sample {i+1}: Predicted vs True Flow vs Geometry')
        plt.colorbar()
        
        # Save and log figure
        plt.savefig(f'prediction_sample_{i}.png')
        mlflow.log_artifact(f'prediction_sample_{i}.png')
        plt.close()
    
    # Register the model if it's the best one so far
    print("\nChecking if current model is best performing...")
    client = mlflow.tracking.MlflowClient()
    runs = client.search_runs(
        experiment_ids=[run.info.experiment_id],
        order_by=["metrics.test_mse ASC"]
    )
    
    if run.info.run_id == runs[0].info.run_id:
        print("New best model found! Registering model...")
        mlflow.keras.log_model(
            model,
            "model",
            registered_model_name="flow_predictor",
            signature=mlflow.models.infer_signature(
                train_geometries[:2],
                model.predict(train_geometries[:2])
            )
        )
        print("Model registered successfully!")
    else:
        print("Not best model - skipping registration")

print('\nTraining and logging complete! Check MLflow UI for detailed metrics and artifacts.')

Starting MLflow run for model training...


Epoch 1/3
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m83s[0m 2s/step - MSE: 0.0188 - loss: 0.0188 - val_MSE: 0.0088 - val_loss: 0.0088
Epoch 2/3
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m80s[0m 2s/step - MSE: 0.0083 - loss: 0.0083 - val_MSE: 0.0037 - val_loss: 0.0037
Epoch 3/3
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m79s[0m 2s/step - MSE: 0.0032 - loss: 0.0032 - val_MSE: 0.0017 - val_loss: 0.0017


Registered model 'flow_predictor' already exists. Creating a new version of this model...
2025/01/15 18:15:57 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: flow_predictor, version 4
Created version '4' of model 'flow_predictor'.



Average Mean Squared Error: 0.0017221253365278244

Generating prediction visualizations...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 207ms/step

Checking if current model is best performing...
Not best model - skipping registration
🏃 View run flow_predictor_training at: http://127.0.0.1:8768/#/experiments/1210/runs/9fac3bc6a266469a921e0e96f19a69e9
🧪 View experiment at: http://127.0.0.1:8768/#/experiments/1210

Training and logging complete! Check MLflow UI for detailed metrics and artifacts.
