# Science modules in Fink: an example

A science module contains necessary routines and classes to process the data, and add values. Typically, you will receive alerts in input, and output the same alerts with additional information. Input alert information contains position, flux, telescope properties, ... You can find what information is available in an alert [here](https://zwickytransientfacility.github.io/ztf-avro-alert/), or check the current [Fink added values](https://fink-broker.readthedocs.io/en/latest/science/added_values/).

In this simple example, we explore a simple science module that takes magnitudes contained in each alert, and computes the change in magnitude between the last two measurements.

In [2]:
# utility from fink-science
from fink_science.utilities import concat_col

## Loading the data

Fink receives data as Avro. However, the internal processing makes use of Parquet files. We provide here alert data as Parquet: it contains original alert data from ZTF and some added values from Fink:

In [3]:
# Load the data into a Spark DataFrame
df = spark.read.format('parquet').load('sample.parquet')

22/01/21 13:52:51 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.


You can check what's in the data

In [4]:
df.printSchema()

root
 |-- candid: long (nullable = true)
 |-- schemavsn: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- objectId: string (nullable = true)
 |-- candidate: struct (nullable = true)
 |    |-- jd: double (nullable = true)
 |    |-- fid: integer (nullable = true)
 |    |-- pid: long (nullable = true)
 |    |-- diffmaglim: float (nullable = true)
 |    |-- pdiffimfilename: string (nullable = true)
 |    |-- programpi: string (nullable = true)
 |    |-- programid: integer (nullable = true)
 |    |-- candid: long (nullable = true)
 |    |-- isdiffpos: string (nullable = true)
 |    |-- tblid: long (nullable = true)
 |    |-- nid: integer (nullable = true)
 |    |-- rcid: integer (nullable = true)
 |    |-- field: integer (nullable = true)
 |    |-- xpos: float (nullable = true)
 |    |-- ypos: float (nullable = true)
 |    |-- ra: double (nullable = true)
 |    |-- dec: double (nullable = true)
 |    |-- magpsf: float (nullable = true)
 |    |-- sigmapsf: float (nullab

## Calling the science module

First, you need to concatenate historical + current measurements for the quantities of interest. Here, we only need `magpsf`. Hence we create a new column to the DataFrame called `cmagpsf` (for _concatenated_ `magpsf`):

In [5]:
# Required alert columns
what = ['magpsf', 'jd', 'sigmapsf', 'fid']

# Use for creating temp name
prefix = 'c'
what_prefix = [prefix + i for i in what]

# Concatenate historical + current measurements
for colname in what:
    df = concat_col(df, colname, prefix=prefix)

what_prefix

['cmagpsf', 'cjd', 'csigmapsf', 'cfid']

In [15]:
import light_curve as lc
import numpy as np
import pandas as pd
from fink_science.ztf.utilities import fix_nans

In [9]:
def extract_row(frame, index=0, dtype='float64'):
    return np.array(frame[index][0], dtype=dtype)


def npsave(*args, **kwargs):
    args = [*args]
    args[0] = f"samples/{args[0]}"
    if not args[0].endswith('.npy'):
        args[0] += '.npy'
    return np.save(*args, **kwargs)


def npload(*args, **kwargs):
    args = [*args]
    args[0] = f"samples/{args[0]}"
    if not args[0].endswith('.npy'):
        args[0] += '.npy'
    args[0] = open(args[0], 'rb')
    return np.load(*args, **kwargs)


In [10]:
import os


def create_sample(n=0):
    cmagpsf = extract_row(df.select('cmagpsf').take(n+1), n)
    cjd = extract_row(df.select('cjd').take(n+1), n)
    csigmapsf = extract_row(df.select('csigmapsf').take(n+1), n)

    if not os.path.exists(f'samples/{n}'):
        os.mkdir(f'samples/{n}')

    npsave(f"{n}/cmagpsf", cmagpsf)
    npsave(f"{n}/cjd", cjd)
    npsave(f"{n}/csigmapsf", csigmapsf)

    return cmagpsf, cjd, csigmapsf


def load_sample(n=0):
    cmagpsf = npload(f"{n}/cmagpsf")
    cjd = npload(f"{n}/cjd")
    csigmapsf = npload(f"{n}/csigmapsf")

    return cmagpsf, cjd, csigmapsf

In [11]:
create_sample(0)
create_sample(1)
create_sample(2)
create_sample(3)
create_sample(4)
create_sample(5)

print(os.listdir("samples"))

                                                                                

['test.npy', '0', '1', '4', '3', '2', '5']


In [12]:
def extract(magpsf, jd, sigmapsf):
    fix_nans(cmagpsf)
    fix_nans(csigmapsf)

    extractor = lc.Extractor(
        lc.Amplitude(),
        lc.BeyondNStd(nstd=1),
        lc.LinearFit(),
        lc.Mean(),
        lc.Median(),
        lc.StandardDeviation(),
        lc.Cusum(),
        lc.ExcessVariance(),
        lc.MeanVariance(),
        lc.Kurtosis(),
        lc.MaximumSlope(),
        lc.Skew(),
        lc.WeightedMean(),
        lc.Eta(),
        lc.AndersonDarlingNormal(),
        lc.ReducedChi2(),
        lc.InterPercentileRange(quantile=0.1),
        #lc.MagnitudePercentageRatio(),
        lc.MedianBufferRangePercentage(quantile=0.1),
        lc.PercentDifferenceMagnitudePercentile(quantile=0.1),
        lc.MedianAbsoluteDeviation(),
        lc.PercentAmplitude(),
        lc.EtaE(),
        lc.LinearTrend(),
        lc.StetsonK(),
        lc.WeightedMean(),
        #lc.Bins(),
        #lc.OtsuSplit(),
    )

    result = extractor(cjd, cmagpsf, csigmapsf)
    print('\n'.join("{} = {:.2f}".format(name, value) for name, value in zip(extractor.names, result)))  # DEBUG
    return result

In [16]:
cmagpsf, cjd, csigmapsf = load_sample(1)

describe = lambda v: print(type(v), len(v), np.mean(v), np.std(v))

assert cmagpsf.shape == cjd.shape == csigmapsf.shape, 'Mismatched shapes'

describe(cmagpsf)
describe(cjd)
describe(csigmapsf)

# separator for result
print("\n--------------\n")

res = extract(cmagpsf, cjd, csigmapsf)

<class 'numpy.ndarray'> 34 nan nan
<class 'numpy.ndarray'> 34 2459526.124098582 7.321780152225849
<class 'numpy.ndarray'> 34 nan nan

--------------

amplitude = 1.52
beyond_1_std = 0.47
linear_fit_slope = -0.07
linear_fit_slope_sigma = 0.00
linear_fit_reduced_chi2 = 98.81
mean = 18.47
median = 18.22
standard_deviation = 0.97
cusum = 0.32
excess_variance = 0.00
mean_variance = 0.05
kurtosis = -1.26
maximum_slope = 123.08
skew = -0.17
weighted_mean = 17.55
eta = 1.32
anderson_darling_normal = 1.25
chi2 = 112.38
inter_percentile_range_10 = 2.52
median_buffer_range_percentage_10 = 0.00
percent_difference_magnitude_percentile_10 = 0.14
median_absolute_deviation = 0.85
percent_amplitude = 1.56
eta_e = 591.79
linear_trend = -0.07
linear_trend_sigma = 0.02
linear_trend_noise = 0.86
stetson_K = 0.84
weighted_mean = 17.55


In [31]:
%load_ext autoreload

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [56]:
%autoreload 2
%aimport processor
importlib.reload(processor)

names = processor.names
print(names)

ztf_base_col = 'ztf_TEST'
df_change = df.withColumn(ztf_base_col, processor.extract_features_ztf(*what_prefix))
for index, name in enumerate(names):
    df_change = df_change.withColumn(name, df_change[ztf_base_col][index])
df_change = df_change.drop(ztf_base_col)
df_change.select(['objectId', *names]).show()

['amplitude', 'beyond_1_std', 'linear_fit_slope', 'linear_fit_slope_sigma', 'linear_fit_reduced_chi2', 'mean', 'median', 'standard_deviation', 'cusum', 'excess_variance', 'mean_variance', 'kurtosis', 'maximum_slope', 'skew', 'weighted_mean', 'eta', 'anderson_darling_normal', 'chi2', 'inter_percentile_range_10', 'median_buffer_range_percentage_10', 'percent_difference_magnitude_percentile_10', 'median_absolute_deviation', 'percent_amplitude', 'eta_e', 'linear_trend', 'linear_trend_sigma', 'linear_trend_noise', 'stetson_K', 'weighted_mean']


[Stage 50:>                                                         (0 + 1) / 1]

+------------+--------------------+-------------------+--------------------+----------------------+-----------------------+------------------+------------------+--------------------+-------------------+--------------------+--------------------+-------------------+------------------+--------------------+------------------+-------------------+-----------------------+--------------------+-------------------------+---------------------------------+------------------------------------------+-------------------------+-------------------+------------------+--------------------+--------------------+-------------------+------------------+------------------+
|    objectId|           amplitude|       beyond_1_std|    linear_fit_slope|linear_fit_slope_sigma|linear_fit_reduced_chi2|              mean|            median|  standard_deviation|              cusum|     excess_variance|       mean_variance|           kurtosis|     maximum_slope|                skew|     weighted_mean|                eta|

                                                                                

We can also quickly check some statistics on this new column:

In [6]:
df_change.select('deltamag').describe().show()

+-------+-------------------+
|summary|           deltamag|
+-------+-------------------+
|  count|                176|
|   mean|0.09352213686162775|
| stddev| 0.9564824046920042|
|    min| -2.828317642211914|
|    max| 3.4397459030151367|
+-------+-------------------+

