In [3]:
import numpy as np
import h5py as h5
import matplotlib.pyplot as plt
import astropy.units as u
from typing import Union, Dict, List, Tuple, Optional
from dataclasses import dataclass
from pathlib import Path

import os
import glob
from collections import defaultdict

In [7]:
class COMPASDataProcessor:
    """Class to process COMPAS HDF5 files and extract DCO properties"""
    
    def __init__(self, solar_metallicity: float = 0.0142):
        self.solar_metallicity = solar_metallicity
        
    def analytical_star_forming_mass_per_binary_using_kroupa_imf(
        self, m1_min: float, m1_max: float, m2_min: float, 
        fbin: float = 1., imf_mass_bounds: List[float] = [0.01, 0.08, 0.5, 200]
    ) -> float:
        """
        Analytical computation of the mass of stars formed per binary star formed
        using the Kroupa IMF.
        
        Parameters
        ----------
        m1_min, m1_max : float
            Primary mass range [Msun]
        m2_min : float  
            Minimum secondary mass [Msun]
        fbin : float
            Binary fraction
        imf_mass_bounds : list
            IMF mass boundaries [Msun]
            
        Returns
        -------
        float
            Mass represented by each binary [Msun]
        """
        m1, m2, m3, m4 = imf_mass_bounds
        
        if m1_min < m3:
            raise ValueError(f"This analytical derivation requires IMF break m3 < m1_min ({m3} !< {m1_min})")
        
        alpha = (-(m4**(-1.3) - m3**(-1.3))/1.3 - 
                (m3**(-0.3) - m2**(-0.3))/(m3*0.3) + 
                (m2**0.7 - m1**0.7)/(m2*m3*0.7))**(-1)
        
        # Average mass of stars
        m_avg = alpha * (-(m4**(-0.3) - m3**(-0.3))/0.3 + 
                        (m3**0.7 - m2**0.7)/(m3*0.7) + 
                        (m2**1.7 - m1**1.7)/(m2*m3*1.7))
        
        # Fraction of binaries that COMPAS simulates
        fint = (-alpha / 1.3 * (m1_max**(-1.3) - m1_min**(-1.3)) + 
                alpha * m2_min / 2.3 * (m1_max**(-2.3) - m1_min**(-2.3)))
        
        # Mass represented by each binary
        m_rep = (1/fint) * m_avg * (1.5 + (1-fbin)/fbin)
        
        return m_rep
    
    def get_dco_mask(self, 
                     fdata: h5.File, 
                     dco_type: str = 'BBH', 
                     pessimistic: bool = True, 
                     merges_hubble: bool = True, 
                     no_RLOF_post_CE: bool = True
                    ) -> np.ndarray:
        """
        Create mask for Double Compact Objects of specified type.
        
        Parameters
        ----------
        fdata : h5py.File
            COMPAS HDF5 file
        dco_type : str
            Type of DCO: 'BBH', 'BNS', 'NSBH'
        pessimistic : bool
            pessimistic CE: True, False
        merges_hubble : bool
            mask merging in a Hubble time: True, False
        no_RLOF_post_CE : bool
            mask systems with RLOF immediately after CE (assume these are stellar mergers): True, False
        
        Returns
        -------
        dco_mask : np.ndarray
            Boolean mask for DCOs
        """
        stellar_type1 = fdata['BSE_Double_Compact_Objects']['Stellar_Type(1)'][()]
        stellar_type2 = fdata['BSE_Double_Compact_Objects']['Stellar_Type(2)'][()]

        
        # Pessimistic CE mask        
        if pessimistic==True:
            optimistic_ce = fdata['BSE_Common_Envelopes']['Optimistic_CE'][()]
            pessimistic_ce_mask = np.in1d(
                fdata['BSE_Double_Compact_Objects']['SEED'][()], 
                fdata['BSE_Common_Envelopes']['SEED'][()][optimistic_ce == 0]
            )
        else: pessimistic_ce_mask = np.repeat(True, len(stellar_type2))
        
        
        if merges_hubble == True: merges_hubble_mask = (fdata['BSE_Double_Compact_Objects']['Merges_Hubble_Time'][()]==1)
        else: merges_hubble_mask = np.repeat(True, len(stellar_type2))   
        
        if no_RLOF_post_CE==True:
            rlof_post_ce = fdata['BSE_Common_Envelopes']["Immediate_RLOF>CE"][()]
            no_rlof_post_ce_mask = np.in1d(
                fdata['BSE_Double_Compact_Objects']['SEED'][()], 
                fdata['BSE_Common_Envelopes']['SEED'][()][rlof_post_ce == 0]
            )
        else: no_rlof_post_ce_mask = np.repeat(True, len(stellar_type2))
        
    
            
          
        
        # Define stellar type mappings
        type_map = {'NS': 13, 'BH': 14}
        
        if dco_type == 'BBH':
            type_mask = (stellar_type1 == type_map['BH']) & (stellar_type2 == type_map['BH'])
        elif dco_type == 'BNS':
            type_mask = (stellar_type1 == type_map['NS']) & (stellar_type2 == type_map['NS'])
        elif dco_type == 'NSBH':
            type_mask = ((stellar_type1 == type_map['NS']) & (stellar_type2 == type_map['BH'])) | \
                       ((stellar_type1 == type_map['BH']) & (stellar_type2 == type_map['NS']))
        else:
            raise ValueError(f"Unknown DCO type: {dco_type}")

        dco_mask = type_mask & (merges_hubble_mask == True) & (pessimistic_ce_mask == True) & (no_rlof_post_ce_mask == True)

            
        return dco_mask
    
    def get_primary_secondary(self, m1: np.ndarray, m2: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """
        Return (primary, secondary) where primary >= secondary element-wise.
        
        Parameters
        ----------
        m1, m2 : np.ndarray
            Component masses
            
        Returns
        -------
        primary, secondary : np.ndarray
            Ordered masses
        """
        primary = np.maximum(m1, m2)
        secondary = np.minimum(m1, m2)
        return primary, secondary
    
    def chirp_mass(self, m1: np.ndarray, m2: np.ndarray) -> np.ndarray:
        """
        Compute chirp mass from component masses.
        
        Parameters
        ----------
        m1, m2 : np.ndarray
            Component masses [Msun]
            
        Returns
        -------
        np.ndarray
            Chirp masses [Msun]
        """
        return (m1 * m2)**(3/5) / (m1 + m2)**(1/5)
    
    def process_compas_file(self, file_path: str,
                            dco_type: str = 'BBH', 
                            pessimistic: bool = True,
                            merges_hubble: bool = True, 
                            no_RLOF_post_CE: bool = True
                           ) -> DCOParameters:
        """
        Process a COMPAS HDF5 file and extract DCO parameters.
        
        Parameters
        ----------
        file_path : str
            Path to COMPAS HDF5 file
        dco_type : str
            Type of DCO to extract
        pessimistic : bool
            pessimistic CE: True, False
        merges_hubble : bool
            mask merging in a Hubble time: True, False
        no_RLOF_post_CE : bool
            mask systems with RLOF immediately after CE (assume these are stellar mergers): True, False
        
        Returns
        -------
        DCOParameters
            Container with all DCO properties
        """
        with h5.File(file_path, 'r') as fdata:
            # Get simulation parameters
            initial_mass_min = fdata['Run_Details']['initial-mass-min'][()][0]
            initial_mass_max = fdata['Run_Details']['initial-mass-max'][()][0] 
            minimum_secondary_mass = fdata['Run_Details']['minimum-secondary-mass'][()][0]
            
            # Calculate mass representation
            m_rep_per_binary = self.analytical_star_forming_mass_per_binary_using_kroupa_imf(
                m1_min=initial_mass_min, 
                m1_max=initial_mass_max,
                m2_min=minimum_secondary_mass, 
                fbin=1.0
            )
            
            n_binaries = len(fdata['BSE_System_Parameters']['SEED'][()])
            total_mass_evolved = n_binaries * m_rep_per_binary
            
            # Get DCO mask
            dco_mask = self.get_dco_mask(fdata, dco_type, pessimistic, merges_hubble, no_RLOF_post_CE)
            n_dcos = np.sum(dco_mask)
            
            if n_dcos == 0:
                print(f"Warning: No {dco_type} systems found in {file_path}")
                return None
            
            # Get system parameters for DCOs
            mask_sys_dcos = np.in1d(
                fdata['BSE_System_Parameters']['SEED'][()], 
                fdata['BSE_Double_Compact_Objects']['SEED'][()][dco_mask]
            )
            
            # Extract properties
            metallicities = fdata['BSE_System_Parameters']['Metallicity@ZAMS(1)'][()][mask_sys_dcos]
            mixture_weights = fdata['BSE_Double_Compact_Objects']['mixture_weight'][()][dco_mask]
            formation_efficiencies = mixture_weights / total_mass_evolved
            
            delay_times = (fdata['BSE_Double_Compact_Objects']['Coalescence_Time'][()] + 
                          fdata['BSE_Double_Compact_Objects']['Time'][()])[dco_mask]
            
            # Masses
            dco_masses_1 = fdata['BSE_Double_Compact_Objects']['Mass(1)'][()][dco_mask]
            dco_masses_2 = fdata['BSE_Double_Compact_Objects']['Mass(2)'][()][dco_mask]
            primary_masses, secondary_masses = self.get_primary_secondary(dco_masses_1, dco_masses_2)
            chirp_masses = self.chirp_mass(dco_masses_1, dco_masses_2)

            
        return DCOParameters(
            metallicities=metallicities,
            delay_times=delay_times,
            formation_efficiencies=formation_efficiencies,
            dco_masses_1=dco_masses_1,
            dco_masses_2=dco_masses_2,
            primary_masses=primary_masses,
            secondary_masses=secondary_masses,
            chirp_masses=chirp_masses,
            mixture_weights=mixture_weights,
            total_mass_evolved=total_mass_evolved,
            n_systems=n_dcos
        )

In [8]:
@dataclass
class DCOParameters:
    """Container for Double Compact Object parameters"""
    metallicities: np.ndarray
    delay_times: np.ndarray
    formation_efficiencies: np.ndarray
    dco_masses_1: np.ndarray
    dco_masses_2: np.ndarray
    primary_masses: np.ndarray
    secondary_masses: np.ndarray
    chirp_masses: np.ndarray
    mixture_weights: np.ndarray
    total_mass_evolved: float
    n_systems: int

        
        
        
            

        

def build_growl_catalog(base_path='/Volumes/GROWL/GROWL_bps'):
    """
    Build a dictionary structure for GROWL catalog with authors and their datasets.
    
    Structure:
    {
        'author_name': {
            'datasets': ['dataset1', 'dataset2', ...],
            'file_name': 'COMPAS_Output_Weighted.h5',
            'paths': {
                'dataset1': '/Volumes/GROWL/GROWL_bps/Boesky24/alpha0_1beta0_25/',
                'dataset2': '/Volumes/GROWL/GROWL_bps/Boesky24/alpha0_1beta0_5/'
            }
            'labels':{'dataset1': r'$\alpha 0.1 \ \beta=0.25$',
                      'dataset2': r'$\alpha 0.1 \ \beta=0.5$'
            
            }
        }
    }
    """
    catalog = {}
    
    if not os.path.exists(base_path):
        print(f"Base path {base_path} does not exist")
        return catalog
    
    # Get all author directories
    author_dirs = [d for d in os.listdir(base_path) 
                  if os.path.isdir(os.path.join(base_path, d)) and not d.startswith('.')]
    
    for author in author_dirs:
        author_path = os.path.join(base_path, author)
        
        # Get all dataset directories for this author
        dataset_dirs = [d for d in os.listdir(author_path) 
                       if os.path.isdir(os.path.join(author_path, d)) and not d.startswith('.')]
        
        if not dataset_dirs:
            continue
            
        # Find the common HDF5 file name by checking the first dataset
        first_dataset_path = os.path.join(author_path, dataset_dirs[0])
        h5_files = glob.glob(os.path.join(first_dataset_path, '*.h5'))
        
        if not h5_files:
            print(f"Warning: No HDF5 files found in {first_dataset_path}")
            continue
            
        # Assume the first HDF5 file is the standard one
        file_name = os.path.basename(h5_files[0])
        
        # Build paths dictionary
        paths = {}
        for dataset in dataset_dirs:
            dataset_path = os.path.join(author_path, dataset)
            # Verify the HDF5 file exists in this dataset
            expected_file = os.path.join(dataset_path, file_name)
            if os.path.exists(expected_file):
                paths[dataset] = dataset_path + '/'
            else:
                print(f"Warning: {expected_file} not found")
        
        catalog[author] = {
            'datasets': sorted(dataset_dirs),
            'file_name': file_name,
            'paths': paths
        }
    
    return catalog


def process_multiple_models(
    catalog: Dict, 
    author: str, 
    datasets: List[str], 
    dco_type: str = 'BBH',
    pessimistic: bool = True,
    merges_hubble: bool = True,
    no_RLOF_post_CE: bool = True
) -> Dict[str, DCOParameters]:
    """
    Process multiple COMPAS models for comparison.
    
    Parameters
    ----------
    catalog : dict
        GROWL catalog dictionary
    author : str
        Author name
    datasets : list
        List of dataset names to process
    dco_type : str
        Type of DCO to extract : 'BBH', 'BHNS', 'BNS'
    pessimistic: bool
        Assuming Pessimistic Common Envelope CE : True (Pessimistic) or False (Optimistic CE)
    merges_hubble : bool
        mask merging in a Hubble time: True, False
    no_RLOF_post_CE : bool
        mask systems with RLOF immediately after CE (assume these are stellar mergers): True, False

    Returns
    -------
    dict
        Dictionary with dataset names as keys and DCOParameters as values
    """
    processor = COMPASDataProcessor()
    results = {}
    
    for dataset in datasets:
        try:
            file_path = catalog[author]['paths'][dataset] + catalog[author]['file_name']
            print(f"Processing {author}/{dataset}...")
            
            data = processor.process_compas_file(file_path, dco_type, pessimistic, merges_hubble, no_RLOF_post_CE)
            if data is not None:
                results[dataset] = data
                print(f"  Found {data.n_systems} {dco_type} systems")
            else:
                print(f"  No {dco_type} systems found")
                
        except Exception as e:
            print(f"Error processing {author}/{dataset}: {e}")
            continue
    
    return results


In [9]:
# Assuming you have the GROWL catalog from the previous artifact
growl_catalog = build_growl_catalog()

# Process multiple Boesky24 models
datasets_to_process = ['alpha0_1beta0_25'] #, 'alpha0_1beta0_5', 'alpha0_1beta0_75', 'alpha0_5beta0_25', 'alpha0_5beta0_5', 'alpha0_5beta0_75', 'alpha10_beta0_5'\
#                       , 'alpha10_beta0_75', 'alpha2_beta0_5', 'alpha2beta0_25']  

boesky_data = process_multiple_models(growl_catalog, 'Boesky24', datasets_to_process, 
                                      dco_type='BBH', pessimistic= True, merges_hubble=True, no_RLOF_post_CE=True)






Processing Boesky24/alpha0_1beta0_25...
  Found 1649874 BBH systems


In [None]:
######
# New directory structure approach:
BASE_DIR = '/Volumes/GROWL/GROWL_bps_compact'
# Create separate entries for each dataset
output_files = create_hdf5_from_boesky_data(boesky_data, BASE_DIR, create_separate_files=True)