In [2]:
%matplotlib inline
import axs
import os
from glob import glob

import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from astropy.io import ascii

import pandas as pd

from multiprocessing import Pool

import h5py
import pyspark.sql.functions as sparkfunc

In [3]:
from pyspark.sql import SparkSession
spark_session = (SparkSession.builder
         .config('spark.master', "local[10]") # 20 when running the full crossmatch, 10 when doing validation.
         .config('spark.driver.memory', '80G')
         .config("spark.sql.execution.arrow.enabled", "true")
         .config("spark.ui.proxyBase", "https://epyc.astro.washington.edu/jupyter/user/ecbellm/proxy/4040")
         .config("spark.port.maxRetries", "128")
         .enableHiveSupport()
         .getOrCreate())
spark_session

In [4]:
catalog = axs.AxsCatalog(spark_session)

In [5]:
catalog.list_tables().keys()

dict_keys(['gaia_dr2_1am_dup', 'allwise_1am_dup', 'unwise_v1', 'cklein_flare', 'unwise_v2', 'catalina_variables_n', 'gaia', 'gaia_dr2_1am_dup_ssd', 'ps1', 'green19_stellar_params', 'rosat_2rxs', 'rosat_2rxs_z4am_b2am', 'ztf_dr3_2rxs_obj', 'wtf_ztf_dr3', 'ztf_wds_kjb_dr3', 'karenws_cut_wtf_fits_r_band', 'karenws_cut_wtf_fits', 'ztf_aug2020_2rxs_obj', 'skymapper_dr2', 'test_skymapper', 'skymapper_dr2_ver2', 'skymapper_dr2_ver3', 'ztf_rrlyr', 'gaia_source_edr3', 'gaia_edr3_distances', 'rrlyrae_sample_andy', 'stevengs_test_small_df', 'ztf5', 'feh_rrlyr_ab_020620', 'kepler_rrlyrae', 'ztf_kepler_rrlyrae', 'ps_uband', 'ps_uband_ver2', 'debug_match_a', 'debug_match_b', 'debug_match_c', 'ztf_rrlyr_grid_50', 'igaps_halpha_emitters', 'wtf_aug2020_asymmetric_2', 'wtf_aug2020_asymmetric_3', 'wtf_aug2020_dip_candidates', 'ztf_rr_lyrae_preprocessed_and_gridded_09_01_2021', 'ztf_rr_lyrae_phi_31_fits_09_01_2021', 'stevengs_test_sources', 'ddf_sources_bigger', 'ztf_dr7', 'jrad_zg98m', 'jrad_zgd98m', 'jr

## VariSummary table

In [6]:
df_test = spark_session.read.csv('/astro/users/ecbellm/epyc/data/gaia_dr3/csv/EpochPhotometry/EpochPhotometry_334120-334488.csv.gz',comment='#',header=True,inferSchema=True)

In [7]:
df_test.schema

StructType(List(StructField(solution_id,LongType,true),StructField(source_id,LongType,true),StructField(n_transits,IntegerType,true),StructField(transit_id,StringType,true),StructField(g_transit_time,StringType,true),StructField(g_transit_flux,StringType,true),StructField(g_transit_flux_error,StringType,true),StructField(g_transit_flux_over_error,StringType,true),StructField(g_transit_mag,StringType,true),StructField(g_transit_n_obs,StringType,true),StructField(bp_obs_time,StringType,true),StructField(bp_flux,StringType,true),StructField(bp_flux_error,StringType,true),StructField(bp_flux_over_error,StringType,true),StructField(bp_mag,StringType,true),StructField(rp_obs_time,StringType,true),StructField(rp_flux,StringType,true),StructField(rp_flux_error,StringType,true),StructField(rp_flux_over_error,StringType,true),StructField(rp_mag,StringType,true),StructField(photometry_flag_noisy_data,StringType,true),StructField(photometry_flag_sm_unavailable,StringType,true),StructField(photomet

In [8]:
df_test.count()

5923

In [9]:
df_test.head()

Row(solution_id=375316653866487564, source_id=2938950841100157312, n_transits=60, transit_id='[18069250989630604,20081003806606854,22706725654656144,22720558963773548,22724651294581075,22734392168432477,22738484470404101,22748225281340799,22752317554869743,22762058306168531,22766150555186951,22775891253532520,22779983479351130,22793816333653734,23070468755723338,23084301631399163,23094042324111851,23098134573266186,23107875318800777,23111967591679096,23121708389511302,23125800686900040,23135541544507083,23139633868366200,23149374786921610,23163208124363181,26314633632444684,27783881400427822,27793622225285707,29386832587710924,31651570605794034,34979530952843565,34989271794598041,34993364056208153,35449796425331542,37824872828933516,39317405762494973,40892894143226347,40902634976212633,47140366378383240,52624731989657084,52634472786186730,52998194239816451,55717668152453020,56422960814004007,58632557413066012,61710446709187739,61714538987704888,64875711831369571,64879804161516032,64893

In [10]:
df = spark_session.read.csv('/astro/users/ecbellm/epyc/data/gaia_dr3/csv/EpochPhotometry/',comment='#',header=True,schema=df_test.schema)

In [11]:
# to load into AXS we need ra and dec, so join to our export from GaiaSource

In [12]:
df_radec = spark_session.read.parquet('/epyc/data/gaia_dr3/source_id_ra_dec.parquet')

In [13]:
df_radec.head()

Row(source_id=4116923378115693568, ra=265.4280847078738, dec=-22.989304805079296)

https://stackoverflow.com/questions/33778664/spark-dataframe-distinguish-columns-with-duplicated-name

In [14]:
df_radec_alias = df_radec.select(sparkfunc.col('source_id').alias('source_id_radec'),'ra','dec')


In [15]:
df_radec_alias.head()

Row(source_id_radec=4116923378115693568, ra=265.4280847078738, dec=-22.989304805079296)

In [16]:
# need the select to avoid duplicating the source_id column
(df_radec_alias.join(df_test,df_test.source_id == df_radec_alias.source_id_radec, 'inner')\
    .select('ra','dec',*(sparkfunc.col(x) for x in df_test.columns)).head())

Row(ra=97.85384312124881, dec=-20.03419535391924, solution_id=375316653866487564, source_id=2939116940375297024, n_transits=99, transit_id='[18069211898991820,20067139392508152,20071231676797118,22762024014082232,22789690678863451,22793783016617388,22803523926250521,22807616250241838,22817357125271836,22835282585375715,22845023414530258,22849115703269149,22858856507913074,22862948793369007,22872689561056619,22876781838386026,22886522631763632,22890614921020545,22900355715577666,22904447989761251,22914188759938851,22918281034122384,22928021809804883,22932114087789454,22941854876841177,22945947157971429,22955687957508788,22959780244281203,22973613355881588,22983354192643153,22987446498283724,22997187361521715,23011020569989968,23015112901457733,23024853824333257,23028946170212785,23038687129663398,23052520493576450,26346394106771081,27793661776909935,29396597124260268,31675168942076589,31679261225705862,35007179786331314,35016920655087406,35021012931896099,35404208379683260,3540830063118

In [17]:
%%time

df_joined = (df_radec_alias.join(df,df.source_id == df_radec_alias.source_id_radec, 'inner')\
    .select('ra','dec',*(sparkfunc.col(x) for x in df.columns)))

catalog.save_axs_table(df_joined, "gaia_dr3_epoch_photometry", calculate_zone=True)

CPU times: user 355 ms, sys: 166 ms, total: 520 ms
Wall time: 30min 21s


## test Epoch Photometry

In [5]:
gaia = catalog.load("gaia_dr3_source")

In [18]:
lc = catalog.load("gaia_dr3_epoch_photometry")

In [19]:
lc.columns

['ra',
 'dec',
 'solution_id',
 'source_id',
 'n_transits',
 'transit_id',
 'g_transit_time',
 'g_transit_flux',
 'g_transit_flux_error',
 'g_transit_flux_over_error',
 'g_transit_mag',
 'g_transit_n_obs',
 'bp_obs_time',
 'bp_flux',
 'bp_flux_error',
 'bp_flux_over_error',
 'bp_mag',
 'rp_obs_time',
 'rp_flux',
 'rp_flux_error',
 'rp_flux_over_error',
 'rp_mag',
 'photometry_flag_noisy_data',
 'photometry_flag_sm_unavailable',
 'photometry_flag_af1_unavailable',
 'photometry_flag_af2_unavailable',
 'photometry_flag_af3_unavailable',
 'photometry_flag_af4_unavailable',
 'photometry_flag_af5_unavailable',
 'photometry_flag_af6_unavailable',
 'photometry_flag_af7_unavailable',
 'photometry_flag_af8_unavailable',
 'photometry_flag_af9_unavailable',
 'photometry_flag_bp_unavailable',
 'photometry_flag_rp_unavailable',
 'photometry_flag_sm_reject',
 'photometry_flag_af1_reject',
 'photometry_flag_af2_reject',
 'photometry_flag_af3_reject',
 'photometry_flag_af4_reject',
 'photometry_flag_af

In [20]:
lc.head()

Row(ra=2.0565882573506062, dec=-84.93676073039339, solution_id=375316653866487564, source_id=4617405884073871232, n_transits=37, transit_id='[19023800545502203,21012676142219326,22142039190304468,22146131471422734,25291129739830653,25295222024749871,27376218606036741,28799658487346649,28803750789305270,30637318898603009,32723891858711929,32727984111522817,34317716073391112,34327456878061424,36524606930568025,36528699213526186,38293445848089923,41898887111786934,42953557787604256,45107268871959727,46157859281106269,51528001433912830,53711325563613579,53715417895729010,55314895755155033,55318988022399774,57415290233320681,57419382546556838,60690237597959306,60694329887859548,62765616179364114,63770686777738787,65914690437420663,65924431260972617,69036077643876126,70721943294325214,70726035567846633]', g_transit_time='[1740.879479222647,1776.8459085513907,1797.2693452173266,1797.3433585330386,1854.2189312391615,1854.2929113177918,1891.9277570967747,1917.6709520767274,1917.7449622869608,Na

In [21]:
lc.count()

13715388