# Ingetsing larger catalogs: PanStarrs DR1 Mean/Thin Objects

This notebook follows up on *insert_example*, be sure to go through that one first.

Here we show a possible way in which the catalog ingestion step can be sped up though the use of mulitprocessing. In particular, we will try to import some of the files from a subset and compilation of PS1 DR1 mean and thin object tables. The entire subsample contains 1.92B sources, selected requiring nDetections>2.

Reference: https://panstarrs.stsci.edu/

The test files we will be using for this test can be downloaded from:

https://desycloud.desy.de/index.php/s/stCkA6uJ8ayKvjI

In [None]:
import CatalogPusher
import pandas as pd
import concurrent.futures
from healpy import ang2pix

import importlib
importlib.reload(CatalogPusher)

# build the pusher object and point it to the raw files.
ps1p = CatalogPusher.CatalogPusher(
    catalog_name = 'ps1_test',                    # short name of the catalog
    data_source = '../testdata/PS1DR1_test/',     # where to find the data (other options are possible)
    file_type = '*.csv.gz'                        # filter files (there is col definition file in data_source)
    )


# define the reader for the raw files (import column names from file.)
headfile = '../testdata/PS1DR1_test/column_headings.csv'
with open(headfile, 'r') as header:
    catcols=[c.strip() for c in header.readline().split(',')]
ps1p.assign_file_reader(
        reader_func = pd.read_csv,           # callable to use to read the raw_files. 
        read_chunks = True,                  # weather or not the reader process each file into smaller chunks.
        names=catcols,                       # All other arguments are passed directly to this function.
        chunksize=50000,
        engine='c')


# define modifier. This time the healpix grid is finer
hp_nside12=2**12
def ps1_modifier(srcdict):
    ra=srcdict['raMean'] if srcdict['raMean']<180. else srcdict['raMean']-360.
    srcdict['pos']={
            'type': 'Point', 
            'coordinates': [ra, srcdict['decMean']]
                    }
    srcdict['hpxid_12']=int(
        ang2pix(hp_nside12, srcdict['raMean'], srcdict['decMean'], lonlat = True, nest = True))
    return srcdict
ps1p.assign_dict_modifier(ps1_modifier)


# wrap up the file pushing function so that we can 
# use multiprocessing to speed up the catalog ingestion
def pushfiles(filerange):
    ps1p.push_to_db(
        coll_name = 'srcs',
        index_on = ['hpxid_12'],
        filerange = filerange,
        overwrite_coll = False,
        dry = False)
    

# each job will run on a subgroup of all the files
file_groups = ps1p.file_groups(group_size=4)
with concurrent.futures.ProcessPoolExecutor(max_workers = 2) as executor:
    executor.map(pushfiles, file_groups)