# MULTIPROCESSING VESICAL SATURATION PRESSURE CALCULATIONS 

- This notebook shows how to run hundreds or thousands of melt inclusion saturation pressure calculations. It returns nans and error messages for conversion failures. The example and data are from DeVitre et al. (in review). Please ask about citing the paper if you use this notebook. 

### CRUCIAL NOTE : DO NOT USE THIS NOTEBOOK ON THE ENKI SERVER, YOU NEED TO INSTALL THERMOENGINE LOCALLY!!! 
- This method using multiprocessing, so it is CPU and memory intensive. YOU HAVE TO DO THIS LOCALLY. See section "Running a container image locally" at https://gitlab.com/ENKI-portal/ThermoEngine. If you ignore this message and run this on ENKI, the authors will not be held responsible. 

## Install needed dependencies

In [1]:
## Uncomment and run this if you haven't installed Thermobar
# %pip install Thermobar

## Import needed packages

In [2]:
import VESIcal as v
import pandas as pd
import numpy as np
import os

import multiprocessing as mp
import time
import sys
import warnings
import io
from tqdm.notebook import tqdm
import gc

import Thermobar as pt


  from VESIcal.models import magmasat


### Set working functions - no need to edit

In [2]:
# #################### don't touch unless you know what you're doing ############

# This function is to check your variable settings
def check_cpu_and_chunk(df, num_cores, chunk_size,model):
    # ANSI escape codes for colors
    RED_BACKGROUND = '\033[48;5;224m'
    YELLOW_BACKGROUND = '\033[48;5;223m'
    RESET_FORMAT = '\033[0m'

    messages = []
    messages.append(f"Number of processors available: {mp.cpu_count()}")
    messages.append(f"Number of processors selected: {num_cores}")
    messages.append(f"Chunk size: {chunk_size}")
    
    # Check if df is a DataFrame
    if not isinstance(df, pd.DataFrame):
        messages.append(f"{RED_BACKGROUND}TypeERROR: df must be a pandas DataFrame{RESET_FORMAT}")

    # Check if num_cores is an integer
    if not isinstance(num_cores, int):
        messages.append(f"{RED_BACKGROUND}TypeERROR: num_cores must be an integer{RESET_FORMAT}")

    # Check if chunk_size is an integer
    if not isinstance(chunk_size, int):
        messages.append(f"{RED_BACKGROUND}TypeERROR: chunk_size must be an integer{RESET_FORMAT}")
    if model not in ['MagmaSat','Dixon','MooreWater','Liu','IaconoMarziano','ShishkinaIdealMixing','AllisonCarbon']:
        messages.append(f"{RED_BACKGROUND}ERROR: Selected solubility model does not exist. See https://vesical.readthedocs.io/en/latest/models.html {RESET_FORMAT}")
    with warnings.catch_warnings(record=True) as w:
        warnings.simplefilter("always")

        if chunk_size >= 200:
            messages.append(f"{YELLOW_BACKGROUND}WARNING: chunk_size seems very large, notebook may freeze."
                            f" If unsure set chunk_size=mp.cpu_count().{RESET_FORMAT}")

        if num_cores < mp.cpu_count():
            messages.append(f"{YELLOW_BACKGROUND}WARNING: Fewer cores selected than max available. This is inefficient."
                            f" If unsure set num_cores=mp.cpu_count().{RESET_FORMAT}")

        for warning in w:
            messages.append(f"{RED_BACKGROUND}Warning: {warning.message}{RESET_FORMAT}")
    
    for message in messages:
        print (message)
    return 

# # ########## Multiprocessing functions ###########

#### Worker function to do Sat P calculations ##
def worker_function(queue, i, row, model):
    sys.stdout = io.StringIO()
    sub = pd.DataFrame([row])
    sub.index = [i]  
    sub_batchfile = v.BatchFile(filename=None, dataframe=sub, label=sub.index)
    
    try:
        sub_satP = sub_batchfile.calculate_saturation_pressure(model=model, temperature="Temp")
    except Exception as e:
        print(f"An error occurred: {e}")
        sub_satP = sub_batchfile.data.copy()
        sub_satP['SaturationP_bars_VESIcal'] = np.nan
        sub_satP['XH2O_fl_VESIcal'] = np.nan
        sub_satP['XCO2_fl_VESIcal'] = np.nan
        sub_satP['FluidMass_grams_VESIcal'] = np.nan
        sub_satP['FluidSystem_wt_VESIcal'] = np.nan
        sub_satP['Model'] = model
        sub_satP['Warnings'] = 'Could not converge'
    
    queue.put({'i': i, **sub_satP.to_dict(orient='records')[0]})
    gc.collect()


#### Chunk processing function ##
def process_chunk(chunk, num_cores, chunk_index, num_chunks, model):
    manager = mp.Manager()
    queue = manager.Queue()
    
    rows = chunk.to_dict('index')
    original_indices = list(rows.keys())  # Store the original order of indices
    pool = mp.Pool(num_cores)
    
    args = [(queue, i, row, model) for i, row in rows.items()]
    
    results = [pool.apply_async(worker_function, arg) for arg in args]
    
    pool.close()
    
    df_step = pd.DataFrame()
    
    processed = 0
    total = len(rows)
    
    with tqdm(total=total, desc=f"Processing chunk {chunk_index+1}/{num_chunks}") as pbar:
        while processed < total:
            result = queue.get()
            processed += 1
            pbar.update(1)
            
            i = result.pop('i')
            df_step = pd.concat([df_step, pd.DataFrame(result, index=[i])])
            
            # Clear memory incrementally
            gc.collect()
    
    pool.join()
    
    df_step = df_step.loc[original_indices]
    
    return df_step

#### Multiprocessing main function ##

def multiprocess_satP(df, num_cores, chunk_size, model='MagmaSat'):
    num_chunks = (len(df) + chunk_size - 1) // chunk_size
    
    final_df = pd.DataFrame()
    original_indices = []
    
    with tqdm(total=num_chunks, desc="Overall Progress") as total_pbar:
        for chunk_index in range(num_chunks):
            start_index = chunk_index * chunk_size
            end_index = min((chunk_index + 1) * chunk_size, len(df))
            chunk = df.iloc[start_index:end_index]
            original_indices.extend(chunk.index)
            chunk_result = process_chunk(chunk, num_cores, chunk_index, num_chunks, model)
            
            final_df = pd.concat([final_df, chunk_result])
            total_pbar.update(1)
    
    final_df = final_df.reindex(original_indices)
    display(final_df.head())
    
    return final_df

## Import data for calcs
- This data is a melt inclusion compilation from DeVitre et al., (In Review). 

In [3]:
out=pt.import_excel("MI_compilation_XH2O.xlsx", sheet_name="XH2O_forThermobar")

# This subdivdes outputs into a dataframe for all inputs (my_input), ols, and liqs
my_input=out['my_input']
myOls=out['Ols']
myLiquids1=out['Liqs']

## Lets check the outputs have loaded right, this compilation only has liquids, if you need minerals check 
# display(myOls.head()) 
display(myLiquids1.head())

Unnamed: 0,SiO2_Liq,TiO2_Liq,Al2O3_Liq,FeOt_Liq,MnO_Liq,MgO_Liq,CaO_Liq,Na2O_Liq,K2O_Liq,Cr2O3_Liq,P2O5_Liq,H2O_Liq,Fe3Fet_Liq,NiO_Liq,CoO_Liq,CO2_Liq,Sample_ID_Liq
0,49.5551,1.999709,13.4501,13.569085,0.233964,6.163391,11.1423,2.462,0.247106,0.021049,0.13795,0.0679,0.0,0.0,0.0,0.002328,Bali_2018_Holohraun_1
1,48.8914,1.941204,13.473,14.000915,0.236328,6.45614,11.45715,2.29605,0.205395,0.014379,0.226211,0.0772,0.0,0.017466,0.0,0.019493,Bali_2018_Holohraun_2
2,49.3044,1.824232,13.68435,13.305956,0.25005,6.560064,11.39425,2.5154,0.213113,0.013414,0.18338,0.16405,0.0,0.024897,0.0,0.025766,Bali_2018_Holohraun_3
3,49.944,1.834,13.32,12.576778,0.207592,6.395452,11.2252,2.56,0.278156,0.01364,0.158312,0.176,0.0,0.0,0.0,0.026488,Bali_2018_Holohraun_4
4,50.4,1.92,14.01,11.289602,0.22,6.92,11.69,2.38,0.24,0.00126,0.24,0.456,0.0,0.028665,0.0,0.027455,Bali_2018_Holohraun_5


## Now let's calculate temperatures using Thermobar
- Here we use the Thermobar implementation of liquid-only thermometer of Putirka 2008 eq 22 using Beattie DMg to calculate equilibrium olivine compositions as we don't have minerald data for the whole compilation (see Thermobar documentation) and Wieser et al., 2022 http://www.jvolcanica.org/ojs/index.php/volcanica/article/view/161.
- Note, since this thermometer requires pressures, you'll want to calculate a first-pass temperature (here we originally used CaO liquid-only) and a first pass saturation pressure. 

In [4]:
## Calculate temperatures
T_put_2008=pt.calculate_liq_only_temp(liq_comps=myLiquids1,equationT="T_Put2008_eq22_BeattDMg", 
                           P=my_input['SaturationP_bars_VESIcal_TCaO'].values/1000,H2O_Liq=my_input['H2O_Liq'])

# Convert to vesical format and use a Fe3Fet you want
df_Tput2008=pt.convert_to_vesical(liq_comps=myLiquids1, 
                              T1=T_put_2008,unit='Kelvin', Fe3Fet_Liq=0.15)

## Convert to VESIcal batchfile and show data
VESIcal_out = v.BatchFile(filename='Sample_ID', dataframe=df_Tput2008, label='Sample_ID')
VESIcal_out.data

Unnamed: 0,SiO2,TiO2,Al2O3,MnO,MgO,CaO,Na2O,K2O,Cr2O3,P2O5,H2O,NiO,CoO,CO2,Temp,FeO,Fe2O3
Bali_2018_Holohraun_1,49.555100,1.999709,13.450100,0.233964,6.163391,11.142300,2.462000,0.247106,0.021049,0.137950,0.067900,0.000000,0.0,0.002328,1162.437015,11.533722,2.261514
Bali_2018_Holohraun_2,48.891400,1.941204,13.473000,0.236328,6.456140,11.457150,2.296050,0.205395,0.014379,0.226211,0.077200,0.017466,0.0,0.019493,1171.706599,11.900778,2.333486
Bali_2018_Holohraun_3,49.304400,1.824232,13.684350,0.250050,6.560064,11.394250,2.515400,0.213113,0.013414,0.183380,0.164050,0.024897,0.0,0.025766,1171.782337,11.310063,2.217659
Bali_2018_Holohraun_4,49.944000,1.834000,13.320000,0.207592,6.395452,11.225200,2.560000,0.278156,0.013640,0.158312,0.176000,0.000000,0.0,0.026488,1166.581210,10.690261,2.096129
Bali_2018_Holohraun_5,50.400000,1.920000,14.010000,0.220000,6.920000,11.690000,2.380000,0.240000,0.001260,0.240000,0.456000,0.028665,0.0,0.027455,1165.092044,9.596162,1.881600
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
Rasmussen_2017_Antarctica_73,42.225269,3.944228,13.036595,0.186592,7.843792,13.043543,3.361625,1.457995,0.000000,0.774156,1.129474,0.000000,0.0,0.403899,1203.535575,10.216901,2.003314
Rasmussen_2017_Antarctica_74,42.131724,3.744558,12.509280,0.187327,8.643803,12.773917,3.285656,1.441129,0.000000,0.752281,1.312280,0.000000,0.0,0.546552,1223.940706,10.288579,2.017368
Rasmussen_2017_Antarctica_75,42.524741,3.722019,12.403091,0.211467,8.693978,13.024586,3.228596,1.389925,0.000000,0.726732,1.004717,0.000000,0.0,0.396855,1226.042286,10.303566,2.020307
Rasmussen_2017_Antarctica_76,42.243044,3.795631,12.814479,0.202805,9.052649,12.713077,3.338326,1.471329,0.000000,0.771454,0.610403,0.000000,0.0,0.246959,1238.346800,10.344999,2.028431


## STOP! BEFORE YOU CONTINUE READ THIS.

DO NOT USE THIS NOTEBOOK ON THE ENKI SERVER, YOU NEED TO INSTALL THERMOENGINE LOCALLY!!! 
- This method using multiprocessing, so it is CPU and memory intensive. YOU HAVE TO DO THIS LOCALLY. See section "Running a container image locally" at https://gitlab.com/ENKI-portal/ThermoEngine. If you ignore this message and run this on ENKI, the authors will not be held responsible. 

## Now we can go ahead and run VESIcal saturation pressure calculations 

### First set your variables

In [5]:
# Set input data (must be a dataframe)
mp_df_in = VESIcal_out.data[0:80]  # We test this on a small size because the full compilation is >4000 MI

# Set the number of logical processors you want to use (how many cores does your computer have?)
num_cores = mp.cpu_count()  # leave this as is if you don't know the number you want to use.

# Set chunk size fed to the processors
chunk_size = num_cores*5  # if you don't know what to do use chunk_size=num_cores. Don't use less.

#Set solubility model you want to use, recommend MagmaSat
model = 'MagmaSat' 

# This line check you set your variable correctly, it will throw messages for each error/warning
# FIX THEM. Errors will break the code, warnings are performance related.
check_cpu_and_chunk(df=mp_df_in, num_cores=num_cores, chunk_size=chunk_size,model=model)

Number of processors available: 8
Number of processors selected: 8
Chunk size: 40


### Now run the calculations

In [6]:
final_df = multiprocess_satP(df=mp_df_in, num_cores=num_cores, chunk_size=chunk_size, model=model)

Overall Progress:   0%|          | 0/2 [00:00<?, ?it/s]

Processing chunk 1/2:   0%|          | 0/40 [00:00<?, ?it/s]

Processing chunk 2/2:   0%|          | 0/40 [00:00<?, ?it/s]

Unnamed: 0,SiO2,TiO2,Al2O3,MnO,MgO,CaO,Na2O,K2O,Cr2O3,P2O5,...,FeO,Fe2O3,Temp,SaturationP_bars_VESIcal,XH2O_fl_VESIcal,XCO2_fl_VESIcal,FluidMass_grams_VESIcal,FluidSystem_wt_VESIcal,Model,Warnings
Bali_2018_Holohraun_1,49.5551,1.999709,13.4501,0.233964,6.163391,11.1423,2.462,0.247106,0.021049,0.13795,...,11.533722,2.261514,1162.437015,30,0.031593,0.968407,0.000556,0.000556,MagmaSat,
Bali_2018_Holohraun_2,48.8914,1.941204,13.473,0.236328,6.45614,11.45715,2.29605,0.205395,0.014379,0.226211,...,11.900778,2.333486,1171.706599,320,0.004356,0.995644,3.5e-05,3.5e-05,MagmaSat,
Bali_2018_Holohraun_3,49.3044,1.824232,13.68435,0.25005,6.560064,11.39425,2.5154,0.213113,0.013414,0.18338,...,11.310063,2.217659,1171.782337,420,0.014479,0.985521,0.000476,0.000476,MagmaSat,
Bali_2018_Holohraun_4,49.944,1.834,13.32,0.207592,6.395452,11.2252,2.56,0.278156,0.01364,0.158312,...,10.690261,2.096129,1166.58121,440,0.015395,0.984605,0.0001,0.0001,MagmaSat,
Bali_2018_Holohraun_5,50.4,1.92,14.01,0.22,6.92,11.69,2.38,0.24,0.00126,0.24,...,9.596162,1.8816,1165.092044,480,0.086968,0.913032,0.000423,0.000423,MagmaSat,


In [8]:
#################################### END OF VERSION ################################
## BELOW IS TESTING ###