In [1]:
from smartsim import Experiment
from smartredis import Client
import torch
import io
from torch.jit import script
from typing import List
import numpy as np

# Implements:
# Liang, F., Shi, R., & Mo, Q. (2016). 
# A split-and-merge approach for singular value decomposition of large-scale matrices. 
# Statistics and its interface, 9(4), 453. 10.4310/SII.2016.v9.n4.a5

# Set up the execution of the foamSmartSimMapField application 
# as a SmartSim Experiment. 
exp = Experiment("foam-smartsim-svd", launcher="local")

db = exp.create_database(port=8000,       # database port
                         interface="lo")  # network interface to use
exp.start(db)

# Connect the python client to the smartredis database
client = Client(address=db.get_address()[0], cluster=False)

# Set the field name to analyze with SVD
field_name = "p"

# MPI parallel run settings for foamSmartSimMapFields - run_command can be "slurm" on a cluster.
num_mpi_ranks = 4
run_settings_parallel = exp.create_run_settings(exe="foamSmartSimSvd", 
                                                exe_args=f"-case cavity -fieldName {field_name} -parallel", 
                                                run_command="mpirun", 
                                                run_args={"np": f"{num_mpi_ranks}"})

openfoam_svd_model = exp.create_model(name="foamSmartSimSvd", run_settings=run_settings_parallel)

# Create the torch.svd as a RedisAI script
def calc_svd(input_tensor):
    """
    Applies the SVD (Singular Value Decomposition) function to the input tensor
    using the TorchScript API.
    """
    return torch.svd(input_tensor) 
    
client.set_function("calc_svd", calc_svd)


try:
     # Runs foamSmartSimSvd to send fields to smartRedis
     exp.start(openfoam_svd_model, block=True)

     torch.set_default_dtype(torch.double)
    
     for rank in range(num_mpi_ranks):
         client.run_script("calc_svd", "calc_svd", 
                           [f"fieldName_{field_name}-MPIrank_{rank}"],     
                           [f"U_{rank}", f"D_{rank}", f"V_{rank}"])
        
     Utilde = []
     Y = []
     X = [] 
     
     for rank in range(num_mpi_ranks):
         tensor_name = f"fieldName_{field_name}-MPIrank_{rank}"
         Xi = torch.tensor(client.get_tensor(tensor_name)) 
         X.append(Xi)
         #print(f"Xi.shape {Xi.shape}")
         Ui = torch.tensor(client.get_tensor(f"U_{rank}"))
         #print(f"Ui.shape {Ui.shape}")
         Di = torch.tensor(client.get_tensor(f"D_{rank}"))
         #print(f"Di.shape {Di.shape}")
         Vi = torch.tensor(client.get_tensor(f"V_{rank}"))
         #print(f"Vi.shape {Vi.shape}")
         Utilde.append(Ui)
         Y.append(torch.matmul(Di.diag(),Vi.t()))

     Y = torch.vstack(Y)
     #print (f"Y.shape {Y.shape}")
     Uy,Dy,Vy = torch.svd(Y)
     #print (f"Uy.shape {Uy.shape}")

     # Initialize an empty tensor for UtildeUy with dimensions m x n
     n_U = Uy.shape[1]
     m_U = sum([Ui.shape[0] for Ui in Utilde])
     U = torch.empty((m_U, n_U))
    
     # Perform the block-wise multiplication
     for i, Ui in enumerate(Utilde):
         start_row = i * n_U
         end_row = start_row + n_U     
         U[i * Ui.shape[0]:(i + 1) * Ui.shape[0], :] = Ui @ Uy[start_row:end_row, :]
     
     # Test SVD reconstruction
     X = torch.vstack(X)
     Xrec = U @ Dy.diag() @ Vy.t()
     print(f"|X - Xrec|_2 = {torch.norm(X - Xrec, p=2) / torch.norm(X, p=2)}")
         
except Exception as e:
    print("Caught an exception: ", str(e))
    
finally:
    exp.stop(db)


15:06:06 argo SmartSim[63000] INFO foamSmartSimSvd(63064): Completed
|X - Xrec|_2 = 7.838459659276447e-15
15:06:09 argo SmartSim[63000] INFO Stopping model orchestrator_0 with job name orchestrator_0-CYR8XH2EZZCG
