In [1]:
# original by Emille Ishida, 31/08/2025 for CRP8

import pandas as pd
import numpy as np
import glob
from copy import deepcopy
from astropy.coordinates import SkyCoord
from astropy import units as u

In [2]:
dirname = '/media3/CRP8/TDE/data/COIN2025_ClusterData/csv/extragalactic/'

In [3]:
flist = glob.glob(dirname + '*.csv')

In [4]:
# directory to store big CSVs
dir_out = '/media3/CRP8/TDE/data/COIN2025_ClusterData/csv_100k/extragalactic/'

In [5]:
# limit of ZTF sky coverage
dec_lim = -30

In [6]:
def get_catname(fname, dir_input):
    """Generate minimalist catalog name.
    
    Parameters
    ----------
    fname: str
        complete path to csv file.
    dir_input: str
        input directory.
    
    Returns
    -------
    catname: str
        minimalist catalog name.
    """

    cat_name = deepcopy(fname.replace(dir_input, ''))
    cat_name = cat_name.replace('.fit.csv','')
    
    if '.csv' in cat_name:
        cat_name = cat_name.replace('.csv', '')

    return cat_name
    

In [7]:
def identify_ra_dec(columns, good_ra, good_dec, bad_ra, bad_dec):
    """Identify the columns containing RA, DEC information.

    Parameters
    ----------
    columns: list of str
        list of all columns of a data frame.
    good_ra: list of str
        list of acceptable strings for RA.
    good_dec: list of str
        list of acceptable strings for DEC.
    bad_ra: list of str
        list of unnaceptable strings containing RA.
    bad_dec: list of str
        list of unnaceptable strings containing DE.

    Returns
    -------
    ra_name: str or None
        Keyword identifying RA. Returns None if information not available.
    dec_name: str or None
        Keyword identifying DEC. Returns None if information not available.
    """
    # initiatlize variables
    ra_name = []
    dec_name = []
    ra_flag = False
    dec_flag = False

    for name in data.keys():
        if name in good_ra:
            ra_name.append(name)
            ra_flag = True
        elif name in good_dec:
            dec_name.append(name)
            dec_flag = True
        elif not ra_flag and name not in bad_ra and 'ra' in name or 'RA' in name:
            ra_name.append(name)
        elif not dec_flag and name not in bad_dec and 'dec' in name or 'DEC' in name or 'DE' in name:
            dec_name.append(name)

    # check if the list is longer than it should be due to position of good keys
    if len(ra_name) > 1 or len(dec_name) > 1:
        for name in good_ra:
            if name in ra_name:
                ra_name = [name]
        for name in  good_dec:
            if name in dec_name:
                dec_name = [name]

    if len(ra_name) == 1 and len(dec_name) == 1:
        return ra_name[0], dec_name[0]

    else:
        return None, None



In [8]:
def remove_south(data, dec_lim=-30):
    """Remove objects that are too south given dec limit.

    Parameters
    ----------
    data: pd.DataFrame
        Data containing RA DEC info, it must have column names: ['ra','dec'].
    dec_lim: float
        Limiting value of DEC. Default is -30 (corresponding to ZTF coverage).

    Returns
    -------
    data_use: pd.DataFrame
        Data Frame containing only elements respecting given threshold.
    """

    # initialize list
    ra_list = []
    dec_list = []
    row_flag = []
    
    for j in range(data.shape[0]):
        if data['dec'].iloc[j] in ['deg'] or set(str(data['dec'].iloc[j])) ==  set('-'):
            row_flag.append(False)
            continue
        elif ' ' in str(data['dec'].iloc[j]):
            c = SkyCoord(data['ra'].iloc[j] + ' ' + data['dec'].iloc[j], unit=(u.hourangle, u.deg))
            if c.dec.deg >= dec_lim:
                ra_list.append(c.ra.deg)
                dec_list.append(c.dec.deg)
                row_flag.append(True)
            else:
               row_flag.append(False)
        else:
            if float(data['dec'].iloc[j]) >= dec_lim:
                row_flag.append(True)
            else:
                row_flag.append(False)

    row_flag = np.array(row_flag)
    data_use = deepcopy(data[row_flag])

    # only substitute if conversion was necessary
    if len(ra_list) > 0:
        data_use['ra'] = ra_list
        data_use['dec'] = dec_list

    return data_use

In [21]:
def process_cat(data, dic_columns, dec_lim=-30):
    """
    Identify RA DEC columns and remove objects not fulfilling DEC limit.

    Parameters
    ----------
    data: pd.DataFrame
        original content of csv file.
    dec_lim: float
        Limiting value of DEC. Default is -30 (corresponding to ZTF coverage).
    dic_columns: dictionary
        Identifies good and bad names for RA, DEC. Keywords must contain 
        ['good_ra', 'good_dec','bad_ra', 'bad_dec'].    

    Returns
    -------
    data_use: pd.DataFrame or None
        Data with RA DEC in degrees and within DEC limits.
        If unable to identify RA DEC columns, returns None
    """
   
    # identify columns with RA, DEC info
    ra_name, dec_name = identify_ra_dec(data.keys(), 
                                       dic_columns['good_ra'],
                                       dic_columns['good_dec'], 
                                       dic_columns['bad_ra'],
                                       dic_columns['bad_dec'])
    
    if ra_name is not None:
        # rename columns to homogeneize nomenclature
        data.rename(columns={ra_name:'ra', dec_name:'dec'}, inplace=True)

        # remove things in the south
        data_use = remove_south(data, dec_lim)

        if data_use.shape[0] == 0:
            data_use = -1
    else:
        data_use = None

    return data_use


In [10]:
def process_big_file(data, dic_columns, partition_size=95000):
    """
    Process files with more than 100000 lines (limit of Fink xmatch).

    Parameters
    ----------
    data: pd.DataFrame
        Full data from csv file
    dic_columns: dictionary
        Identifies good and bad names for RA, DEC. Keywords must contain 
        ['good_ra', 'good_dec','bad_ra', 'bad_dec']
    partition_size: int
        Maximum number of lines to be kept in one partition. Default is 95k.

    Returns
    -------
    data_batches: dict 
        keys are the partition indexes and values pd.DataFrame with processed data.
    """
    # identify columns with RA, DEC info
    ra_name, dec_name = identify_ra_dec(data.keys(), dic_columns['good_ra'], 
                                            dic_columns['good_dec'], dic_columns['bad_ra'], 
                                            dic_columns['bad_dec'])

    if ra_name is not None and dec_name is not None:

        # determine the number of partitions
        size_part = data.shape[0] // partition_size + 1
        print('size_part=', size_part)

        data_batches = {}

        for j in range(size_part):
           # slice data
            data_part  = deepcopy(data.iloc[j*partition_size: (j + 1)*partition_size])
            print('data_part.shape=', data_part.shape)

            # define catalog name
            cat_name = get_catname(fname, dir_input) 
        
            # create identifier: catalog name + position in original file
            if j < size_part - 1:
                 data_part['cat_id'] = [cat_name + str('_') + str(item) 
                                       for item in range(j*partition_size,(j + 1)*partition_size)]
            else:
                data_part['cat_id'] = [cat_name + str('_') + str(item) 
                                       for item in range(j*partition_size,j*partition_size + data_part.shape[0])]

            # create column with catalog name
            data_part['catalog'] = cat_name

            # process catalog
            data_use = process_cat(data_part, dic_columns, dec_lim=dec_lim)
            store in dictionary
            data_batches[j] = data_use
            
    else:
        data_batches = None

    return data_batches
    

In [23]:
rec_ini = 0
list_to_pandas = []
skiped_catalogs = []
processed_catalogs = []
cat_end = None

dir_input = '/media3/CRP8/TDE/data/COIN2025_ClusterData/csv/extragalactic/'

dic_columns = {}
dic_columns['good_ra'] = ['RA','ra', '_RA', 'RAJ2000', 'RA_ICRS', '_RA_icrs']
dic_columns['good_dec'] = ['DEC','dec','_DE', 'DEJ2000', 'DE_ICRS', '_DE_icrs']
dic_columns['bad_ra'] = ['ratio', 'galcen_radius', 'Separation', 'KRON_RADIUS_F850LP','KRON_RADIUS_F475W']
dic_columns['bad_dec'] = ['DELTA_J2000_F850LP', 'DELTA_J2000_F475W']

# partition size for big files
partition_size = 95000

# flag to identify if big file was processed
big_file_done = False

for fname in flist:
    # read data
    data = pd.read_csv(fname)

    # check if one file is too big
    if data.shape[0] >= partition_size:
        # process parts of the data
        data_batches = process_big_file(data, dic_columns, partition_size=partition_size)

        if data_batches is not None:
            # save big parts to file
            for part in sorted(list(data_batches.keys()))[:-1]:
                cat_name = data_batches[part].iloc[0]['catalog']
                data_batches[part].to_csv(dir_out + 'batch_' + cat_name + '_' + str(part*partition_size) + \
                                          '_' + str((part + 1)*partition_size) + '.csv', index=False)
                print('Saving big file, part:', part + 1, ' file size:', data_batches[part].shape[0])

            # identify last part
            part = sorted(list(data_batches.keys()))[-1]
            if data_batches[part].shape[0] + rec_ini < partition_size:
                # update count
                rec_ini = rec_ini + data_batches[part].shape[0]

                # add to list of previously processed data
                list_to_pandas.append(data_batches[part].shape[0])

            else:
                # save to file
                cat_name = data_batches[part].iloc[0]['catalog']
                data_batches[part].to_csv(dir_out + 'batch_' + cat_name + '_' + str(part*partition_size) + \
                                          '_' + str(part*partition_size + data_batches[part].shape[0]) + '.csv', index=False)
                print('Saving big file, part:', part, ' file size:', data_batches[part].shape[0])

        else:
            print(cat_name, 'skipped!')
            skiped_catalogs.append(fname)
            print('Could not recognize RA DEC for big file.')

        big_file_done = True
            
    # if the new file will be too big, save current records
    elif not big_file_done and data.shape[0] + rec_ini > partition_size or fname == flist[-1]:
        data_batch = pd.concat(list_to_pandas, ignore_index=True)
        data_batch.to_csv(dir_out + 'batch_' + cat_name + '_' + str(rec_ini - data_use.shape[0]) + '_' + str(rec_ini) + '.csv', 
                                            index=False)   
        print('Limit of objects achieved, file size:', data_batch.shape[0])
        if fname != flist[-1]:
            list_to_pandas = []
        rec_ini = 0

    elif data.shape[0] + rec_ini < partition_size and not big_file_done: 
        # get minimalist catalog name
        cat_name = get_catname(fname, dir_input)
        
        # create identifier: catalog name + position in original file
        data['cat_id'] = [cat_name + str('_') + str(item) for item in range(data.shape[0])]

        data_use = process_cat(data, dic_columns, dec_lim=dec_lim)

        if data_use is not None and isinstance(data_use, pd.DataFrame):            
            # update count
            rec_ini = rec_ini + data_use.shape[0]
            
            # append to already processed data
            list_to_pandas.append(data_use[['cat_id','ra', 'dec']])
       
        else:
            print(cat_name, 'skipped!')

            if data_use != -1:
                skiped_catalogs.append(fname)

    big_file_done = False

NGC3379_F850LP_F475W.cat.r.absMag.color.apcor.fGC skipped!
NGC4278_F850LP_F475W.cat.r.absMag.color.apcor.GC skipped!
woodley2010a skipped!
NGC4278_F850LP_F475W.cat.r.absMag.color.apcor.fGC skipped!
woodley2010b skipped!
maybhate09 skipped!
Strader12UCDs skipped!
goudfrooij06 skipped!
goudrooij12 skipped!
Degraaf skipped!
NGC3607_sources skipped!
 Kontizas90 skipped!
NGC4594_F625W_F435W.cat.r.absMag.color.apcor.GC skipped!
NGC4472_F850LP_F475W.cat.r.absMag.color.apcor.fGC skipped!
Limit of objects achieved, file size: 94720
NGC4472_F850LP_F475W.cat.r.absMag.color.apcor.GC skipped!
NGC4594_F625W_F435W.cat.r.absMag.color.apcor.fGC skipped!
NGC3607_Gcc skipped!
su2022 skipped!
NGC7331_Gcc skipped!
Could not recognize RA DEC for big file.
barmby_m31_phot skipped!
ACSFCS skipped!
barmby_m31_spec skipped!
NGC3379_F850LP_F475W.cat.r.absMag.color.apcor.GC skipped!
chattopadhyay2009 skipped!
bassino skipped!
Limit of objects achieved, file size: 31572


In [24]:
len(skiped_catalogs)

13

In [37]:
for fname in skiped_catalogs:
    

['/media3/CRP8/TDE/data/COIN2025_ClusterData/csv/extragalactic/NGC3379_F850LP_F475W.cat.r.absMag.color.apcor.fGC.csv',
 '/media3/CRP8/TDE/data/COIN2025_ClusterData/csv/extragalactic/NGC4278_F850LP_F475W.cat.r.absMag.color.apcor.GC.csv',
 '/media3/CRP8/TDE/data/COIN2025_ClusterData/csv/extragalactic/NGC4278_F850LP_F475W.cat.r.absMag.color.apcor.fGC.csv',
 '/media3/CRP8/TDE/data/COIN2025_ClusterData/csv/extragalactic/Strader12UCDs.fit.csv',
 '/media3/CRP8/TDE/data/COIN2025_ClusterData/csv/extragalactic/NGC4594_F625W_F435W.cat.r.absMag.color.apcor.GC.csv',
 '/media3/CRP8/TDE/data/COIN2025_ClusterData/csv/extragalactic/NGC4472_F850LP_F475W.cat.r.absMag.color.apcor.fGC.csv',
 '/media3/CRP8/TDE/data/COIN2025_ClusterData/csv/extragalactic/NGC4472_F850LP_F475W.cat.r.absMag.color.apcor.GC.csv',
 '/media3/CRP8/TDE/data/COIN2025_ClusterData/csv/extragalactic/NGC4594_F625W_F435W.cat.r.absMag.color.apcor.fGC.csv',
 '/media3/CRP8/TDE/data/COIN2025_ClusterData/csv/extragalactic/su2022.fit.csv',
 '/me