# Converting Green+19 to AXS

In [48]:
%matplotlib inline
import axs
import os
from glob import glob

import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import astropy.coordinates as coord
import astropy.units as u
from multiprocessing import Pool

import h5py
import pyspark.sql.functions as sparkfunc

In [3]:
h5py.__version__

'2.9.0'

In [4]:
def spark_start(project_path, metastore=None, local_dir=None):
    from pyspark.sql import SparkSession

    warehouse_location = project_path #os.path.join(project_path, 'spark-warehouse')

    if local_dir is None:
        local_dir = os.path.join(project_path, 'spark-tmp')

    spark = ( 
            SparkSession.builder
            .appName("LSD2")
            .config("spark.sql.warehouse.dir", warehouse_location)
            .config('spark.master', "local[6]")
            .config('spark.driver.memory', '8G') # 128
            .config('spark.local.dir', local_dir)
            .config('spark.memory.offHeap.enabled', 'true')
            .config('spark.memory.offHeap.size', '4G') # 256
            .config("spark.sql.execution.arrow.enabled", "true")
            .config("spark.driver.maxResultSize", "6G")
            .config("spark.driver.extraJavaOptions", f"-Dderby.system.home={metastore}")
            .enableHiveSupport()
            .getOrCreate()
                    )   

    return spark

spark_session = spark_start("/epyc/data/", local_dir="/epyc/users/ecbellm")

#catalog = axs.AxsCatalog(spark_session)

In [5]:
spark_session

In [30]:
filename = '/data/epyc/data/bayestar/green_19_stellar_params/3424177.h5'

In [12]:
def recast_uint(df):
    for column, dtype in zip(df.columns, df.dtypes):
        if(dtype == np.uint16):
            df[column] = df[column].astype(np.int16)
        elif(dtype == np.uint32):
            df[column] = df[column].astype(np.int32)
        elif(dtype == np.uint64):
            df[column] = df[column].astype(np.int64)

In [49]:
def make_parquet(filename, clobber=False):
    if os.path.exists(filename.replace('h5','parquet')) and not clobber:
        print(f'{filename} already converted')
        return
    
    print(f'Converting {filename}...')
    f = h5py.File(filename,'r')
    percentiles_cols = ['dm','E', 'Mr', 'FeH']
    percentiles = [16,50,84]
    groups = {'percentiles': percentiles_cols,'metadata':['obj_id','l','b'],'gaia':['gaia_id'], 'chisq':['0']}

    dfd = []
    for group, cols in groups.items():
        dfg = []
    
        for node, data in f[group].items():
            if group not in ['percentiles','chisq']:
                dfgi = pd.DataFrame(data[()])[cols]
            else:
                if group == 'chisq':
                    dfgi = pd.DataFrame(data[()],columns=['chisq'])
                if group == 'percentiles':
                    # TODO: this ordering makes the columns line up by percentiles rather than the model quantity, which is annoying
                    dfgi = pd.DataFrame({f'{col}_{p}':data[col][:,i] for i,p in enumerate(percentiles) 
                                    for col in percentiles_cols}, 
                                    index=np.arange(len(data[()])))

            dfg.append(dfgi)
        dfg = pd.concat(dfg)
        dfd.append(dfg)
    df = pd.concat(dfd, axis=1)
    recast_uint(df)

    sc = coord.SkyCoord(frame="galactic",l=df['l'],b=df['b'],unit=u.degree)
    sc = sc.transform_to(coord.ICRS)
    df['ra'] = sc.ra
    df['dec'] = sc.dec
    df.to_parquet(filename.replace('h5','parquet'))
    return

In [34]:
base_dir = '/data/epyc/data/bayestar/green_19_stellar_params/'
input_h5_files = glob(base_dir+'*.h5')

In [50]:
with Pool(8) as p:
    p.map(make_parquet, input_h5_files)

/data/epyc/data/bayestar/green_19_stellar_params/3424177.h5 already converted
Converting /data/epyc/data/bayestar/green_19_stellar_params/3424196.h5...
Converting /data/epyc/data/bayestar/green_19_stellar_params/3424213.h5...
Converting /data/epyc/data/bayestar/green_19_stellar_params/3424181.h5...
Converting /data/epyc/data/bayestar/green_19_stellar_params/3424217.h5...
Converting /data/epyc/data/bayestar/green_19_stellar_params/3424200.h5...
Converting /data/epyc/data/bayestar/green_19_stellar_params/3424178.h5...
Converting /data/epyc/data/bayestar/green_19_stellar_params/3424222.h5...
Converting /data/epyc/data/bayestar/green_19_stellar_params/3424226.h5...
Converting /data/epyc/data/bayestar/green_19_stellar_params/3424197.h5...
Converting /data/epyc/data/bayestar/green_19_stellar_params/3424192.h5...
Converting /data/epyc/data/bayestar/green_19_stellar_params/3424214.h5...
Converting /data/epyc/data/bayestar/green_19_stellar_params/3424201.h5...
Converting /data/epyc/data/bayesta

In [40]:
# conversion from single pandas dataframa
#sdf = spark_session.createDataFrame(df)

In [52]:
# read a directory of files
sdf = spark_session.read.parquet('/epyc/data/bayestar/green_19_stellar_params/parquet/')

In [38]:
catalog = axs.AxsCatalog(spark_session)

In [62]:
catalog.drop_table('green19_stellar_params')


'Table or view not found: green19_stellar_params;'


In [63]:
catalog.save_axs_table( sdf, 'green19_stellar_params', repartition=True, calculate_zone=True)

In [64]:
catalog.list_tables().keys()

dict_keys(['sdss_zoned1am_hd', 'gaia_zoned1am_hd', 'gaia_dr2_1am_dup', 'sdss_zoned1am_700lc', 'gaia_dr2_700lc2', 'allwise_1am_dup', 'gaia_sdss_wise_1asec', 'gaia_sdss_3asec', 'ztf_1am_old', 'ztf_exposures', 'ztf_1am_lc', 'ztf_1am', 'ztf_1am_test', 'sesar_rrlyrae', 'ztf_fid1_sdss_stars_lt20_2asec', 'ztf_fid2_sdss_stars_lt20_2asec', 'ztf_fid3_sdss_stars_lt20_2asec', 'sdss_dr9_qso_s82', 'faraway_training_dataset', 'faraway_labeled_training_dataset', 'ztf_nobs100', 'jnk_ztf_test', 'jnk_ztf_test2', 'ztf1000', 'ztf10', 'ztf_dec18', 'asassn_variability_catalog', 'ztf10_assasn', 'ztf10_assasn_cesium', 'ztf10_assasn_best', 'ztf10_assasn_cesium_best', 'unwise_v1', 'cklein_flare', 'unwise_v2', 'cesium_speedtest_ztfsample', 'paula_listcvs', 'nemec_rrlyrae_metalicity', 'dambis_rrlyrae_metalicity', 'sdss_500b_28e_10800z', 'gaia_500b_28e_10800z', 'allwise_500b_28e_10800z', 'ztf_500b_28e_10800z', 'ztf_mar19_all', 'ztf_dr1_s82_qso', 'green19_stellar_params'])

In [65]:
%%time
g19 = catalog.load("green19_stellar_params")

CPU times: user 8.69 ms, sys: 9.7 ms, total: 18.4 ms
Wall time: 129 ms


In [66]:
g19.columns

['dm_16',
 'E_16',
 'Mr_16',
 'FeH_16',
 'dm_50',
 'E_50',
 'Mr_50',
 'FeH_50',
 'dm_84',
 'E_84',
 'Mr_84',
 'FeH_84',
 'obj_id',
 'l',
 'b',
 'gaia_id',
 'chisq',
 'ra',
 'dec',
 '__index_level_0__',
 'zone',
 'dup']

In [67]:
%%time
ztf = catalog.load("ztf_mar19_all")

CPU times: user 4.63 ms, sys: 6.05 ms, total: 10.7 ms
Wall time: 1.55 s


In [68]:
ztf.columns

['matchid',
 'ra',
 'dec',
 'nobs_avail',
 'combined_matchids',
 'mjd',
 'programid',
 'filterid',
 'mag',
 'magerr',
 'psfmag',
 'psfmagerr',
 'psfflux',
 'psffluxerr',
 'chi',
 'catflags',
 'sharp',
 'xpos',
 'ypos',
 'zone',
 'dup']

In [69]:
testra = 287.9869104
testdec = 13.0748496
d = 10/3600.


In [71]:
%%time
g19_match = ztf.region(ra1=testra-d, ra2=testra+d, dec1=testdec-d, dec2=testdec+d).crossmatch(g19).select("matchid","ra","dec",'dm_16',
 'E_16',
 'Mr_16',
 'FeH_16',
 'dm_50',
 'E_50',
 'Mr_50',
 'FeH_50',
 'dm_84',
 'E_84',
 'Mr_84',
 'FeH_84').toPandas()

CPU times: user 28.4 ms, sys: 13.1 ms, total: 41.5 ms
Wall time: 22.4 s


In [72]:
g19_match

Unnamed: 0,matchid,ra,dec,dm_16,E_16,Mr_16,FeH_16,dm_50,E_50,Mr_50,FeH_50,dm_84,E_84,Mr_84,FeH_84
0,10539322169560,287.985648,13.076047,11.757848,1.268043,2.95,-0.624185,12.931591,1.423063,3.95,-0.4,14.213696,1.5358,5.774185,-0.2
1,10539322144433,287.985927,13.075574,11.255817,1.014585,3.15,-0.6,13.029734,1.568482,4.35,-0.375,14.455096,1.734339,8.074185,-0.175815
2,10539322095877,287.986183,13.076572,12.025167,1.498015,1.95,-0.7,13.333874,1.615024,3.05,-0.45,14.796666,1.712202,4.274185,-0.2
3,10539322095931,287.986859,13.074891,7.737913,0.082347,11.475815,-0.15,7.996738,0.223051,11.95,0.1,8.255173,0.340853,12.55,0.25
