In [1]:
# RF em_model requires large-memory during model simulation
# how to improve this?

In [1]:
method = 4
ncpu = 128

# Packages and functions

In [2]:
import glob, os, sys, toml, pickle, time, multiprocessing
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import psutil
from sklearn.preprocessing import OneHotEncoder
from multiprocessing import Pool, cpu_count, shared_memory
import warnings
warnings.filterwarnings("ignore")

In [3]:
###################################################################################################
# optimiztion
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize
from pymoo.core.problem import Problem
from pymoo.operators.sampling.rnd import FloatRandomSampling
from pymoo.operators.crossover.sbx import SimulatedBinaryCrossover
from pymoo.operators.mutation.pm import PolynomialMutation
from pymoo.termination import get_termination

from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import Pool

class MyProblem(Problem):
    def __init__(self, xlb_mean, xub_mean, em_model):
        super().__init__(n_var=len(xlb_mean), n_obj=1, n_constr=0, xl=xlb_mean, xu=xub_mean, elementwise_evaluation=False)
        self.em_model = em_model

    def _evaluate(self, x, out, *args, **kwargs):
        out["F"] = -self.em_model.predict(x)  # Return negative to maximize

def run_ga_optimization(em_model, xlb_mean, xub_mean, num_runs=10, pop_size=100, num_generations=100, times=1):
    ga_all_solutions = []
    ga_all_outputs = []
    print(f'Run GA {times * num_runs} times and extract the best {num_runs} solutions')
    
    # Run the model `2 * num_runs` times
    for _ in range(times * num_runs):
        problem = MyProblem(xlb_mean, xub_mean, em_model)
        
        algorithm = GA(
            pop_size=pop_size,
            sampling=FloatRandomSampling(),
            crossover=SimulatedBinaryCrossover(prob=0.9, eta=15),
            mutation=PolynomialMutation(eta=20),
            eliminate_duplicates=True
        )
        
        res = minimize(problem,
                       algorithm,
                       termination=get_termination("n_gen", num_generations),
                       verbose=False)
        
        optimized_features = res.X
        max_output = -res.F  # Assuming you want to maximize the output
        
        ga_all_solutions.append(optimized_features)
        ga_all_outputs.append(np.squeeze(max_output))
    
    # Convert to numpy arrays for easier sorting
    ga_all_solutions = np.array(ga_all_solutions)
    ga_all_outputs = np.array(ga_all_outputs)

    # Sort the results based on the max_output (in descending order)
    sorted_indices = np.argsort(ga_all_outputs)[::-1]  # Sort in descending order
    
    # Select the top `num_runs` solutions
    top_indices = sorted_indices[:num_runs]
    
    top_solutions = ga_all_solutions[top_indices]
    top_outputs = ga_all_outputs[top_indices]
    
    return top_solutions, top_outputs


### internal optimization parallizatoin reduces some memory use but is very slow
# # Define optimization_task at the top level
# def optimization_task(args):
#     em_model, xlb_mean, xub_mean, pop_size, num_generations = args

#     problem = MyProblem(xlb_mean, xub_mean, em_model)
#     algorithm = GA(
#         pop_size=pop_size,
#         sampling=FloatRandomSampling(),
#         crossover=SimulatedBinaryCrossover(prob=0.9, eta=15),
#         mutation=PolynomialMutation(eta=20),
#         eliminate_duplicates=True,
#     )
#     res = minimize(problem, algorithm, termination=get_termination("n_gen", num_generations), verbose=False)
#     return res.X, -res.F  # Return optimized features and the negative result for maximization


# # Update run_ga_optimization to use the top-level function
# def run_ga_optimization(em_model, xlb_mean, xub_mean, num_runs=10, pop_size=100, num_generations=100, times=1):
#     ga_all_solutions = []
#     ga_all_outputs = []

#     print(f'Run GA {times * num_runs} times and extract the best {num_runs} solutions')

#     # Use multiprocessing.Pool for parallel optimization tasks
#     with Pool() as pool:
#         results = pool.map(
#             optimization_task,
#             [
#                 (em_model, xlb_mean, xub_mean, pop_size, num_generations)
#                 for _ in range(times * num_runs)
#             ],
#         )

#     # Collect results
#     for optimized_features, max_output in results:
#         ga_all_solutions.append(optimized_features)
#         ga_all_outputs.append(max_output)

#     # Convert to numpy arrays for easier sorting
#     ga_all_solutions = np.array(ga_all_solutions)
#     ga_all_outputs = np.array(ga_all_outputs)

#     # Sort the results based on the max_output (in descending order)
#     sorted_indices = np.argsort(ga_all_outputs)[::-1]  # Sort in descending order

#     # Select the top `num_runs` solutions
#     top_indices = sorted_indices[:num_runs]

#     top_solutions = ga_all_solutions[top_indices]
#     top_outputs = ga_all_outputs[top_indices]

#     return top_solutions, top_outputs


# Load a RF emulator model

In [4]:
em_model_file = '/glade/campaign/cgd/tss/people/guoqiang/CTSM_CAMELS_proj/Calib_HH_emulator/LSE_allbasin/RF_emulator_for_iter6.pkl'
if os.path.isfile(em_model_file):
    with open(em_model_file, 'rb') as file:
        em_model = pickle.load(file)

In [5]:
file_param_lb = '/glade/campaign/cgd/tss/people/guoqiang/CTSM_CAMELS_proj/Calib_HH_emulator/LSE_allbasin/camels_627basin_ctsm_all_param_lb.gz'
file_param_ub = '/glade/campaign/cgd/tss/people/guoqiang/CTSM_CAMELS_proj/Calib_HH_emulator/LSE_allbasin/camels_627basin_ctsm_all_param_ub.gz'
df_param_lb = pd.read_csv(file_param_lb, compression='gzip')
df_param_ub = pd.read_csv(file_param_ub, compression='gzip')

In [6]:
file_camels_attribute = f'/glade/campaign/cgd/tss/people/guoqiang/CTSM_CAMELS_proj/Calib_HH_emulator/LSE_allbasin/camels_627basin_attribute.pkl'
df_att = pd.read_csv(file_camels_attribute)

In [7]:
infile_attr_foruse = '/glade/u/home/guoqiang/CTSM_repos/CTSM_calibration/data/camels_attributes_table_TrainModel.csv'
df_att_foruse = pd.read_csv(infile_attr_foruse)
useattrs = list(df_att_foruse[df_att_foruse['att_Xie2021'].values]['Attribute_text'].values)

In [8]:
# One-hot encoding for categorical attributes

df_att = df_att[useattrs]  # Ensure only the required attributes are selected
inputnames = list(df_att.columns)  # Initialize inputnames if not defined
for att in useattrs:
    if df_att[att].dtype == "object":  # Check if the column is categorical
        print('Convert', att, 'to one-hot encoding')
        
        # Fit the OneHotEncoder on the single column
        enc = OneHotEncoder(sparse=False)
        enc.fit(df_att[[att]])  # Fit on the specific column
        
        # Create new column names for one-hot encoding
        encnames = [att + "_" + str(cat) for cat in enc.categories_[0]]
        
        # Transform the column and create a dataframe with encoded values
        df_enc = pd.DataFrame(enc.transform(df_att[[att]]), columns=encnames, index=df_att.index)
        
        # Concatenate the new one-hot-encoded columns with the original dataframe
        df_att = pd.concat([df_att, df_enc], axis=1)
        
        # Drop the original categorical column
        df_att = df_att.drop(columns=[att])
        
        # Update the inputnames list
        inputnames = [i for i in inputnames if i != att] + encnames


Convert dom_land_cover to one-hot encoding


In [9]:
# data for em_model optmz
xlb_mean = []
xub_mean = []
for i in range(ncpu*2):
    xlb_mean.append(np.hstack([df_param_lb.iloc[i].values, df_att.iloc[i].values]))
    xub_mean.append(np.hstack([df_param_ub.iloc[i].values, df_att.iloc[i].values]))
xlb_mean = np.array(xlb_mean)
xub_mean = np.array(xub_mean)        


# em_model optmz
peak mem=3.06GB (almost same with loading variables)

In [10]:
if method == 0:
    # Prepare multiprocessing
    start_time = time.time()
    
    for i in range(xub_mean.shape[0]):
        a,b = run_ga_optimization(em_model, xlb_mean[i], xub_mean[i])
    end_time = time.time()
    print("Time cost without Manager:", end_time - start_time, "seconds")

## Version 1: direct multiprocessing
cpu=10, peak mem=28GB
time = 93 s

In [11]:
if method == 1:
    # Prepare multiprocessing
    start_time = time.time()
    
    with multiprocessing.Pool(ncpu) as pool:
        results = pool.starmap(
            run_ga_optimization,
            [(em_model, xlb_mean[i], xub_mean[i]) for i in range(xub_mean.shape[0])]
        )
    
    end_time = time.time()
    print("Time cost without Manager:", end_time - start_time, "seconds")

Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions


In [13]:
97000/72*292/3600

109.27469135802468

In [14]:
292/72*97000/3600

109.27469135802468

## Version 2: filemanager
Much larger memory peak: ~75 GB  
Time = 69s

In [15]:
if method == 2:
    def optimization_task(i, xlb, xub, shared_em_model):
        # Access the shared model
        em_model = shared_em_model['model']
        return run_ga_optimization(em_model, xlb, xub)
    
    # Create a Manager for shared em_model
    manager = multiprocessing.Manager()
    shared_em_model = manager.dict()
    shared_em_model['model'] = em_model  # Share the em_model
    
    # Prepare multiprocessing
    start_time = time.time()

    with multiprocessing.Pool(ncpu) as pool:
        results = pool.starmap(
            optimization_task,
            [(i, xlb_mean[i], xub_mean[i], shared_em_model) for i in range(xub_mean.shape[0])]
        )
    
    end_time = time.time()
    print("Time cost with Manager:", end_time - start_time, "seconds")

## Version 3: joblib
Mem peak=27GB  
Time=96 sec

In [16]:
from joblib import Parallel, delayed

if method == 3:
    # Prepare multiprocessing with Joblib
    start_time = time.time()
    results = Parallel(n_jobs=ncpu)(
        delayed(run_ga_optimization)(em_model, xlb_mean[i], xub_mean[i]) for i in range(xub_mean.shape[0])
    )
    end_time = time.time()
    print("Time cost with Joblib:", end_time - start_time, "seconds")


# Version 4: shared memory
Peak mem = 55GB  
Time = 35 s

In [17]:
import multiprocessing
import numpy as np
import time
from multiprocessing import shared_memory
import pickle

if method == 4:
    def optimization_task(i, shared_name, shared_size, xlb, xub):
        # Access shared memory and load em_model
        shm = shared_memory.SharedMemory(name=shared_name)
        em_model = pickle.loads(bytes(shm.buf[:shared_size]))
        return run_ga_optimization(em_model, xlb, xub)
    
    # Serialize the model and create shared memory
    serialized_model = pickle.dumps(em_model)
    shared_size = len(serialized_model)
    shm = shared_memory.SharedMemory(create=True, size=shared_size)
    shm.buf[:shared_size] = serialized_model  # Copy model to shared memory
    
    try:
        # Prepare multiprocessing
        start_time = time.time()
        with multiprocessing.Pool(ncpu) as pool:
            results = pool.starmap(
                optimization_task,
                [(i, shm.name, shared_size, xlb_mean[i], xub_mean[i]) for i in range(xub_mean.shape[0])]
            )
        end_time = time.time()
        print("Time cost with SharedMemory:", end_time - start_time, "seconds")
    finally:
        # Clean up shared memory
        shm.close()
        shm.unlink()

Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutionsRun GA 10 times and extract the best 10 solutions

Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutionsRun GA 10 times and extract the best 10 solutions

Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions
Run GA 10 times and extract the best 10 solutions


# Version 5: concurrent
Mem = 3.1GB  
No parallel, no speed improvement

In [18]:
from concurrent.futures import ThreadPoolExecutor
import time

if method == 5:
    def optimization_task(i):
        # Directly use the em_model without duplication
        return run_ga_optimization(em_model, xlb_mean[i], xub_mean[i])
    
    # Prepare threading
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=ncpu) as executor:
        results = list(executor.map(optimization_task, range(xub_mean.shape[0])))
    end_time = time.time()
    print("Time cost with Threading:", end_time - start_time, "seconds")
