In [1]:
import axs
import numpy as np

import os
from pyspark.sql.functions import size as spark_size
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.functions import pandas_udf

import pandas as pd

import cesium
from cesium.time_series import TimeSeries
from cesium.featurize import featurize_single_ts, featurize_time_series


In [2]:
def spark_start(project_path, metastore=None):
    from pyspark.sql import SparkSession

    warehouse_location = os.path.join(project_path, 'spark-warehouse')

    local_dir = os.path.join(project_path, 'spark-tmp')

    spark = ( 
            SparkSession.builder
            .appName("LSD2")
            .config("spark.sql.warehouse.dir", warehouse_location)
            .config('spark.master', "local[4]")
            .config('spark.driver.memory', '6G') # 128
            .config('spark.local.dir', local_dir)
            .config('spark.memory.offHeap.enabled', 'true')
            .config('spark.memory.offHeap.size', '4G') # 256
            .config("spark.sql.execution.arrow.enabled", "true")
            .config("spark.driver.maxResultSize", "6G")
            .config("spark.driver.extraJavaOptions", f"-Dderby.system.home={metastore}")
            .enableHiveSupport()
            .getOrCreate()
                    )   

    return spark

spark_session = spark_start("/epyc/users/ctslater")

catalog = axs.AxsCatalog(spark_session)

In [3]:
ztf = catalog.load("ztf_1am_lc")

In [4]:
sesar_axs = catalog.load("sesar_rrlyrae")

In [None]:
%%time
matched = sesar_axs.crossmatch(ztf).drop("axsdist")
matched_filtered = (matched.select("ra", "dec", "matchid", "Per", "weightedmeanmag", "filterid", "mjd", "psfflux")
                    .where((spark_size(matched['mjd']) > 5) &
                           ((matched['S3ab'] > 0.8) | (matched['S3c'] > 0.8))
                            ))

Cesium in the python layer
========

In [19]:
%%time
results = matched_filtered.head(10)
result = results[0]

features_to_use = ["amplitude",
                   "percent_beyond_1_std",
                   "maximum",
                   "max_slope",
                   "median",
                   "median_absolute_deviation",
                   "percent_close_to_median",
                   "minimum",
                   "skew",
                   "std",
                   "weighted_average"]

ls_features = ["freq1_amplitude1",
                "freq1_amplitude2",
                "freq1_amplitude3",
                "freq1_amplitude4",
                "freq1_freq",
                "freq1_lambda",
                "freq1_rel_phase2",
                "freq1_rel_phase3",
                "freq1_rel_phase4",
                "freq1_signif",
                "freq2_amplitude1",
                "freq2_amplitude2",
                "freq2_amplitude3",
                "freq2_amplitude4",
                "freq2_freq",
                "freq2_rel_phase2",
                "freq2_rel_phase3",
                "freq2_rel_phase4"]

ts = TimeSeries(t=np.array(result['mjd']), m=np.array(result['psfflux']))
feat_out = featurize_single_ts(ts, features_to_use + ls_features)

CPU times: user 1.36 s, sys: 145 ms, total: 1.5 s
Wall time: 51.6 ms


In [21]:
1/feat_out['freq1_freq']

channel
0    0.574923
dtype: float64

In [22]:
result['Per']

0.57479179

In [20]:
feat_out

feature                    channel
amplitude                  0            9396.832794
percent_beyond_1_std       0               0.307190
maximum                    0           19601.496094
max_slope                  0          825072.546836
median                     0           12735.896484
median_absolute_deviation  0            2143.480469
percent_close_to_median    0               0.431373
minimum                    0             807.830505
skew                       0              -0.375430
std                        0            3359.907709
weighted_average           0           13263.989591
freq1_amplitude1           0            3022.199473
freq1_amplitude2           0            1063.568925
freq1_amplitude3           0             230.753765
freq1_amplitude4           0              16.869063
freq1_freq                 0               1.739364
freq1_lambda               0               7.425334
freq1_rel_phase2           0              -0.952610
freq1_rel_phase3           0 

In [23]:
%%time

ztf.where(spark_size(matched['mjd']) > 10).count()

CPU times: user 92.3 ms, sys: 36.8 ms, total: 129 ms
Wall time: 13min 13s


109850351

In [33]:
%%time
ts = TimeSeries(t=np.array(result['mjd']), m=np.array(result['psfflux']))
feat_out = featurize_single_ts(ts, features_to_use + ls_features)

CPU times: user 284 ms, sys: 1.58 s, total: 1.87 s
Wall time: 74.2 ms


In [32]:
%%time
feat_out2 = featurize_time_series([np.array(res['mjd']) for res in results[:20]],
                                  [np.array(res['psfflux']) for res in results[:20]],
                                  features_to_use=features_to_use + ls_features
                                  )

  slopes = np.diff(x) / np.diff(t)


CPU times: user 1min 2s, sys: 3min 25s, total: 4min 27s
Wall time: 3.37 s


Cesium UDF
=======

In [12]:

features_to_use = ["amplitude",
                   "percent_beyond_1_std",
                   "maximum",
                   "max_slope",
                   "median",
                   "median_absolute_deviation",
                   "percent_close_to_median",
                   "minimum",
                   "skew",
                   "std",
                   "weighted_average"]

ls_features = ["freq1_amplitude1",
                "freq1_amplitude2",
                "freq1_amplitude3",
                "freq1_amplitude4",
                "freq1_freq",
                "freq1_lambda",
                "freq1_rel_phase2",
                "freq1_rel_phase3",
                "freq1_rel_phase4",
                "freq1_signif",
                "freq2_amplitude1",
                "freq2_amplitude2",
                "freq2_amplitude3",
                "freq2_amplitude4",
                "freq2_freq",
                "freq2_rel_phase2",
                "freq2_rel_phase3",
                "freq2_rel_phase4"]
    
def featurize_udf(mjd, psfflux):
    feat_outs = []
    
    for row_mjd, row_psfflux in zip(mjd, psfflux):
        mjd_arr = np.array(row_mjd)
        
        # If the time between the first and last observation is <0.06, 
        # cesium throws an error.
        if(np.max(mjd_arr) - np.min(mjd_arr) < 0.10):
            feat_outs.append(np.zeros(len(features_to_use) + len(ls_features)))
        else:
            feat_out = featurize_time_series(mjd_arr, np.array(row_psfflux),
                                             features_to_use=features_to_use + ls_features)
            feat_outs.append(feat_out.values.flatten())

    return pd.Series(feat_outs)

featurize_lc = pandas_udf(featurize_udf, returnType=ArrayType(DoubleType()))


In [14]:
%%time

feature_results = matched_filtered.select(
    featurize_lc(matched_filtered['mjd'], matched_filtered['psfflux'])).head(1000)


CPU times: user 50.9 ms, sys: 14 ms, total: 65 ms
Wall time: 5min 46s


In [15]:
feature_results[0]

Row(featurize_udf(mjd, psfflux)=[9396.832794189453, 0.30718954248366015, 19601.49609375, 825072.5468358505, 12735.896484375, 2143.48046875, 0.43137254901960786, 807.8305053710938, -0.37543006309128435, 3359.907708598801, 13263.989590912863, 3022.1994726892235, 1063.5689251811777, 230.7537648132592, 16.869063248696087, 1.7393636957614456, 7.425334084418403, -0.9526102393556031, -1.6430480738036535, 2.4733237980405356, 7.547383578188932, 892.550621721922, 59.02211695616729, 3.508887093596339, 2.409814231998434, 1.3538175947376925, 1.8663721976359344, 1.857010582095423, 3.1269629685411493])