Feb 6, 2024

This is my test development notebook.

#### Imports

In [221]:
import pandas as pd
import lcanalyzer.models as models
import random
import pandas.testing as pdt
import pytest

import numpy as np
from functools import reduce
import time


#### Params

In [111]:
# Define the bands names
bands = 'ugrizy'
mag_col = 'psfMag'
time_col = 'expMidptMJD'

#### Data loading

In [91]:
lc_datasets = {}
lc_datasets['lsst'] = pd.read_pickle('data/lsst_RRLyr.pkl')
lc_datasets['lsst'].info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11177 entries, 0 to 11176
Data columns (total 12 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   band         11177 non-null  object 
 1   ccdVisitId   11177 non-null  int64  
 2   coord_ra     11177 non-null  float64
 3   coord_dec    11177 non-null  float64
 4   objectId     11177 non-null  int64  
 5   psfFlux      11177 non-null  float64
 6   psfFluxErr   11177 non-null  float64
 7   psfMag       10944 non-null  float64
 8   ccdVisitId2  11177 non-null  int64  
 9   band2        11177 non-null  object 
 10  expMidptMJD  11177 non-null  float64
 11  zeroPoint    11177 non-null  float32
dtypes: float32(1), float64(6), int64(3), object(2)
memory usage: 1004.3+ KB


#### Data inspection

In [92]:
lc_datasets['lsst'].head()

Unnamed: 0,band,ccdVisitId,coord_ra,coord_dec,objectId,psfFlux,psfFluxErr,psfMag,ccdVisitId2,band2,expMidptMJD,zeroPoint
0,y,1032263018,62.462569,-44.11336,1251384969897480052,-515.183603,1697.21849,,1032263018,y,61100.069706,30.602301
1,y,1033987172,62.462569,-44.11336,1251384969897480052,3151.738459,1686.955775,22.653625,1033987172,y,61102.068464,30.6061
2,u,675163080,62.462569,-44.11336,1251384969897480052,183.449123,209.242045,25.741211,675163080,u,60582.247144,30.469101
3,y,443055067,62.462569,-44.11336,1251384969897480052,-704.848327,1624.400086,,443055067,y,60215.203585,30.612801
4,u,466722002,62.462569,-44.11336,1251384969897480052,382.472233,278.92667,24.9435,466722002,u,60261.078221,30.461201


#### Selecting light curves for a single object

In [93]:
### Pick an object
obj_id = lc_datasets['lsst']['objectId'].unique()[4]


In [94]:
### Get all the observations for this obj_id for each band
# Create an empty dict
lc = {}

# For each band create a bool array that indicates
# that this observation belongs to a certain object and is made in a
# certain band
for b in bands:
    filt_band_obj = (lc_datasets['lsst']['objectId'] == obj_id) & (
        lc_datasets['lsst']['band'] == b
    )
    # Select the observations and store in the dict 'lc'
    lc[b] = lc_datasets['lsst'][filt_band_obj]


#### Trying the model.py functions

In [95]:
models.max_mag(lc['g'], mag_col)

19.183367224358136

#### Test development

In [96]:
test_input = pd.DataFrame(data=[[1, 5, 3], [7, 8, 9], [3, 4, 1]], columns=list("abc"))
test_output = 7
models.max_mag(test_input, "a") == test_output

True

In [97]:
### Get maximum values for all bands
def calc_stat(lc, bands, mag_col):
    # Define an empty dictionary where we will store the results
    stat = {}
    # For each band get the maximum value and store it in the dictionary
    for b in bands:
        stat[b + "_max"] = models.max_mag(lc[b], mag_col)
    return stat


In [98]:
df1 = pd.DataFrame(data=[[1, 5, 3], [7, 8, 9], [3, 4, 1]], columns=list("abc"))
df2 = pd.DataFrame(data=[[7, 3, 2], [8, 4, 2], [5, 6, 4]], columns=list("abc"))
df3 = pd.DataFrame(data=[[2, 6, 3], [1, 3, 6], [8, 9, 1]], columns=list("abc"))
test_input = {"df1": df1, "df2": df2, "df3": df3}
test_output = {"df1_max": 8, "df12_max": 6, "df3_max": 8}
test_output == calc_stat(test_input, ["df1", "df2", "df3"], "b")

False

In [99]:
random.seed(1)
print(random.sample(range(0, 100), 10))
random.seed(1)
print(random.sample(range(0, 100), 10))
random.seed(1)
print(random.sample(range(0, 100), 10))


[17, 72, 97, 8, 32, 15, 63, 57, 60, 83]
[17, 72, 97, 8, 32, 15, 63, 57, 60, 83]
[17, 72, 97, 8, 32, 15, 63, 57, 60, 83]


In [100]:
def calc_stats(lc, bands, mag_col):
    # Calculate max, mean and min values for all bands of a light curve
    stats = {}
    for b in bands:
        stat = {}
        stat["max"] = models.max_mag(lc[b], mag_col)
        stat["mean"] = models.mean_mag(lc[b], mag_col)
        stat["min"] = models.min_mag(lc[b], mag_col)
        stats[b] = stat
    return pd.DataFrame.from_records(stats)


In [101]:
test_cols = list("abc")
test_dict = {}
test_dict["df0"] = pd.DataFrame(
    data=[[8, 8, 0], 
          [0, 1, 1], 
          [2, 3, 1], 
          [7, 9, 7]], columns=test_cols
)
test_dict["df1"] = pd.DataFrame(
    data=[[3, 8, 2], 
          [3, 8, 0], 
          [3, 9, 8], 
          [8, 2, 5]], columns=test_cols
)
test_dict["df2"] = pd.DataFrame(
    data=[[8, 4, 3], 
          [7, 6, 3], 
          [4, 2, 9], 
          [6, 4, 0]], columns=test_cols
)


In [102]:
test_output = pd.DataFrame(data=[[9,9,6],[5.25,6.75,4.],[1,2,2]],columns=['df0','df1','df2'],index=['max','mean','min'])

In [104]:
test_output

Unnamed: 0,df0,df1,df2
max,9.0,9.0,6.0
mean,5.25,6.75,4.0
min,1.0,2.0,2.0


In [105]:
calc_stats(test_dict, test_dict.keys(), 'b')

Unnamed: 0,df0,df1,df2
max,9.0,9.0,6.0
mean,5.25,6.75,4.0
min,1.0,2.0,2.0


In [106]:
#assert calc_stats(test_dict, test_dict.keys(), 'b') == test_output

In [107]:
pdt.assert_frame_equal(calc_stats(test_dict, test_dict.keys(), 'b'),
                              test_output,
                             check_exact=False,
                             atol=0.01)

In [64]:
# Procedural style factorial function.

def factorial(n):
    """Calculate the factorial of a given number.

    :param int n: The factorial to calculate
    :return: The resultant factorial
    """
    if n < 0:
        raise ValueError('Only use non-negative integers.')

    factorial = 1
    for i in range(1, n + 1): # iterate from 1 to n
        # save intermediate value to use in the next iteration
        factorial = factorial * i

    return factorial


In [65]:
# Functional style factorial function

def factorial(n):
    """Calculate the factorial of a given number.

    :param int n: The factorial to calculate
    :return: The resultant factorial
    """
    if n < 0:
        raise ValueError('Only use non-negative integers.')

    if n == 0 or n == 1:
        return 1 # exit from recursion, prevents infinite loops
    else:
        return  n * factorial(n-1) # recursive call to the same function


In [73]:
def add_one(x):
    return x + 1

def say_hello(name):
    print('Hello', name)

def append_item_1(a_list, item):
    a_list += [item]
    return a_list

def append_item_2(a_list, item):
    result = a_list + [item]
    return result

def add_two(x):
    return add_one(add_one(x))


In [72]:
c1=[0,0,1]
c2=[0,0,1]
b1 = append_item_1(c1,1)
b2 = append_item_2(c2,1)
print(c1)
print(b1)
print(c2)
print(b2)

[0, 0, 1, 1]
[0, 0, 1, 1]
[0, 0, 1]
[0, 0, 1, 1]


In [75]:
b = add_one(2)
print(b)

3


In [76]:
c=add_two(2)
print(c)

4


In [79]:
#This is a simple mapping that takes a list of names and returns 
#a list of the lengths of those names using the built-in function len():

name_lengths = map(len, ["Mary", "Isla", "Sam"])
print(list(name_lengths))


[4, 4, 3]


In [80]:
#This is a mapping that squares every number in the passed collection using anonymous, 
#inlined lambda expression (a simple one-line mathematical expression representing a function):

squares = map(lambda x: x * x, [0, 1, 2, 3, 4])
print(list(squares))


[0, 1, 4, 9, 16]


In [109]:
lc_datasets['lsst'].head()

Unnamed: 0,band,ccdVisitId,coord_ra,coord_dec,objectId,psfFlux,psfFluxErr,psfMag,ccdVisitId2,band2,expMidptMJD,zeroPoint
0,y,1032263018,62.462569,-44.11336,1251384969897480052,-515.183603,1697.21849,,1032263018,y,61100.069706,30.602301
1,y,1033987172,62.462569,-44.11336,1251384969897480052,3151.738459,1686.955775,22.653625,1033987172,y,61102.068464,30.6061
2,u,675163080,62.462569,-44.11336,1251384969897480052,183.449123,209.242045,25.741211,675163080,u,60582.247144,30.469101
3,y,443055067,62.462569,-44.11336,1251384969897480052,-704.848327,1624.400086,,443055067,y,60215.203585,30.612801
4,u,466722002,62.462569,-44.11336,1251384969897480052,382.472233,278.92667,24.9435,466722002,u,60261.078221,30.461201


In [118]:
# Create an empty list where we will be storing our light curves
lcs = []

#For each observed object
for obj_id in lc_datasets["lsst"]["objectId"].unique():
    # Create an empty dict for the light curves of this object
    lc = {}
    lc['objectId'] = obj_id
    for b in bands:
        filt_band_obj = (lc_datasets["lsst"]["objectId"] == obj_id) & (
            lc_datasets["lsst"]["band"] == b
        )
        # The observations in each band are converted to lists and stored as dict elements
        lc[b+'_'+mag_col] = np.array(lc_datasets["lsst"][filt_band_obj][mag_col])
        lc[b+'_'+time_col] = np.array(lc_datasets["lsst"][filt_band_obj][time_col])
    lcs.append(lc)

# Turn the list of dicts into a DataFrame    
lcs = pd.DataFrame.from_records(lcs)


In [121]:
# we replace the NaN values with zeros, 
# we can use pd.map function, that act similarly to the classic Python map,
                                                                                                                                                                                                                                                            b = 'u'
lcs[b+'_'+mag_col+'_cleaned'] = lcs[b+'_'+mag_col].map(lambda l: np.where(np.isnan(l),0,l))


In [123]:
# we discard NaN values entirely. this works only for pd.Series.

b = "u"

# Create column names variables for better readability
mcol = b + "_" + mag_col
tcol = b + "_" + time_col
mcol_cl = mcol + "_cleaned"
tcol_cl = tcol + "_cleaned"

# The new cleaned columns, `mcol_cl` and `tcol_cl`, contain the result of applying
# a lambda function to each row (`axis=1` argument). The lambda function returns a tuple
# of two numpy arrays, filtered according to the mask that is `False` for the elements that
# are NaNs and `True` to all other elements.

lcs[[mcol_cl, tcol_cl]] = lcs.apply(
    lambda l: (
        l[mcol][~np.isnan(l[mcol])],
        l[tcol][~np.isnan(l[mcol])],
    ),
    axis=1,
    result_type="expand",
)




In [125]:
integers = range(5)
double_ints = [2 * i for i in integers]

print(double_ints)

[0, 2, 4, 6, 8]


In [126]:
integers = range(5)
double_ints = map(lambda i: 2 * i, integers)
print(list(double_ints))

[0, 2, 4, 6, 8]


In [132]:
integers = range(5)
double_even_ints = [2 * i for i in integers if i % 2 == 0]
print(double_even_ints)

[0, 4, 8]


In [131]:
double_even_int_set = {2 * i for i in integers if i % 2 == 0}
print(double_even_int_set)

double_even_int_dict = {i: 2 * i for i in integers if i % 2 == 0}
print(double_even_int_dict)

{0, 8, 4}
{0: 0, 2: 4, 4: 8}


In [133]:

doubles_generator = (2 * i for i in integers)
for x in doubles_generator:
   print(x)

0
2
4
6
8


In [137]:
sequence = [1, 2, 3, 4, 2]

def product(a, b):
    return a * b

print(reduce(product, sequence))

# The same reduction using a lambda function
print(reduce((lambda a, b: a * b), sequence))


48
48


In [139]:
sequence = [1, 2, 3, 4, 2, 3]

def add(a, b):
    return a + b

print(reduce(add, sequence))

# The same reduction using a lambda function
print(reduce((lambda a, b: a + b), sequence))



15
15


In [140]:
# function that calculates the sum of the squares of the values in a list using the MapReduce approach.

def sum_of_squares(sequence):
    squares = [x * x for x in sequence]  # use list comprehension for mapping
    return reduce(lambda a, b: a + b, squares)

In [141]:
print(sum_of_squares([0]))
print(sum_of_squares([1]))
print(sum_of_squares([1, 2, 3]))
print(sum_of_squares([-1]))
print(sum_of_squares([-1, -2, -3]))

0
1
14
1
14


In [146]:
def sum_of_squares(sequence):
    integers = [int(x) for x in sequence if x[0] != '#']
    squares = [x * x for x in integers]
    return reduce(lambda a, b: a + b, squares)

In [147]:
print(sum_of_squares(['1', '2', '3']))
print(sum_of_squares(['-1', '-2', '-3']))
print(sum_of_squares(['1', '2', '#100', '3']))

14
14
14


In [148]:
def with_logging(func):

    """A decorator which adds logging to a function."""
    def inner(*args, **kwargs):
        print("Before function call")
        result = func(*args, **kwargs)
        print("After function call")
        return result

    return inner


def add_one(n):
    print("Adding one")
    return n + 1

# Redefine function add_one by wrapping it within with_logging function
add_one = with_logging(add_one)

# Another way to redefine a function - using a decorator
@with_logging
def add_two(n):
    print("Adding two")
    return n + 2

print(add_one(1))
print(add_two(1))



Before function call
Adding one
After function call
2
Before function call
Adding two
After function call
3


In [150]:

def profile(func):
    def inner(*args, **kwargs):
        start = time.process_time_ns()
        result = func(*args, **kwargs)
        stop = time.process_time_ns()

        print("Took {0} seconds".format((stop - start) / 1e9))
        return result

    return inner

@profile
def measure_me(n):
    total = 0
    for i in range(n):
        total += i * i

    return total

print(measure_me(1000000))


Took 0.212073 seconds
333332833333500000


In [151]:
my_list = [1, 2, 3]
my_dict = {1: '1', 2: '2', 3: '3'}
my_set = {1, 2, 3}

print(type(my_list))
print(type(my_dict))
print(type(my_set))


<class 'list'>
<class 'dict'>
<class 'set'>


In [172]:
class Variable:
    def __init__(self, obj_id):
        self.obj_id = obj_id
        self.lc = {
                   'mjd': np.array([]),
                   'mag': np.array([])
                  }


In [173]:
star_obs = Variable(obj_id)
print(star_obs.obj_id)

1405624461041897445


In [189]:
class Variable:
    """A Variable class"""
    def __init__(self, obj_id):
        self.obj_id = obj_id
        self.lc = {
                   'mjd': np.array([]),
                   'mag': np.array([])
                  }
    
    def __str__(self):
      return str(self.obj_id)

    def __len__(self):
      return len(self.lc["mjd"])

    
    def convert_to_array(self,data):
       # Check if the data is iterable and convert it into np.array, otherwise raise an exception
       if not isinstance(data, np.ndarray):
           if isinstance(data, (list,tuple,pd.Series)):
               data = np.array(data)
           elif isinstance(data, (int, float)):
               data = np.array([data])
           else:
               raise ValueError("The data type of the input is incorrect!")
       return data

    def compare_len(self,arrs):
       # Check that all arrays are of the same length
       lens = [len(arr) for arr in arrs]
       if len(set(lens)) > 1: # set() turns an iterable into a set of unique values.
       # If the values are all the same, the set will contain only one element
           raise ValueError("Passed timestamps and mags or mag_errs arrays have different lengths!")
       return
       
    def add_observations(self, mjds, mags, mag_errs=None):
       """
       Adds observations to the light curve.

       Args:
         mjds: A vector of Modified Julian Dates (x values).
         mags: A vector of luminosities (y values).
         mag_errs: A vector of magnitude errors.
       """
       self.lc["mjd"] = self.convert_to_array(mjds)
       self.lc["mag"] = self.convert_to_array(mags)
       if mag_errs is not None:
           self.lc["mag_errs"] = self.convert_to_array(mag_errs)
       self.compare_len(self.lc.values())
       return

    @property
    def mean_mag(self):
        return np.mean(self.lc['mag'])



In [191]:
obj_id = lc_datasets['lsst']['objectId'].unique()[7]
b = 'g'
filt_band_obj = (lc_datasets['lsst']['objectId'] == obj_id) & (
        lc_datasets['lsst']['band'] == b
    )
obj_obs = lc_datasets['lsst'][filt_band_obj]

star = Variable(obj_id)
print(star)

star.add_observations(obj_obs[time_col],obj_obs[mag_col])
print(star.lc)

mean_mag = star.mean_mag
print(mean_mag)







1405624461041897445
{'mjd': array([60559.2973682, 59791.3473572, 60559.2978172, 61017.0665232,
       60281.1630512, 59840.2103322, 60560.2654012, 61298.2853162,
       60610.1669992, 60881.3838612, 61269.3333562, 59841.2948172,
       59840.2107802, 60555.2837972, 60880.4114602, 61109.0454832,
       59841.3286042, 60962.2788762, 60260.1801602, 60145.4057402,
       60610.1014732, 60962.2669892, 61329.2779102, 60588.1944462,
       61329.2769632, 59791.3582372, 59840.2367152, 60993.0741872,
       60880.3849662, 60993.0622822, 59958.1258722, 61109.0305462,
       60286.0827232, 60962.2656102, 60962.1967542, 60962.1836422,
       59840.2284672, 60610.1253832, 60588.2046822, 61356.1581872,
       60610.1010252, 60962.3061112, 61084.1131352, 60530.3755112]), 'mag': array([17.0270149 , 17.9296522 , 17.02132289, 17.35962392, 17.64018619,
       18.37622276, 17.42335279, 18.38669221, 18.38521324, 18.38482772,
       18.38450526, 17.25162572, 18.37937516, 17.88716136, 18.25681606,
       18.

In [217]:
class Lightcurve:
    """Class Lightcurve"""

    def __init__(self, mjds=None, mags=None, mag_errs=None):
        self.lc = {}
        if mjds is not None:
            self.add_observations(mjds, mags, mag_errs)

    def add_observations(self, mjds, mags, mag_errs=None):
        self.lc["mjds"] = self.convert_to_array(mjds)
        self.lc["mags"] = self.convert_to_array(mags)
        if mag_errs is not None:
            self.lc["mag_errs"] = self.convert_to_array(mag_errs)
        self.compare_len(self.lc.values())
        return self.lc

    def convert_to_array(self, data):
        if not isinstance(data, np.ndarray):
            if isinstance(data, (list, tuple, pd.Series)):
                data = np.array(data)
            elif isinstance(data, (int, float)):
                data = np.array([data])
            else:
                raise ValueError("The data type of the input is incorrect!")
        return data

    def compare_len(self, arrs):
        lens = [len(arr) for arr in arrs]
        if len(set(lens)) > 1:
            raise ValueError(
                "Passed timestamps and mags or mag_errs arrays have different lengths!"
            )
        return

    @property
    def mean_mag(self):
        return np.mean(self.lc["mags"])

    def __len__(self):
        return len(self.lc["mjds"])

In [218]:
star = Variable(obj_id)
star.add_lc(band = b,mjds = obj_obs[time_col], mags = obj_obs[mag_col])


{'g': <__main__.Lightcurve at 0x11eb090d0>}

In [219]:
star.mband_lc['g'].mean_mag

18.03180312045771

In [216]:
class RRLyrae(Variable):
    """A class for RR Lyrae stars."""
    def __init__(self, obj_id):
        super().__init__(obj_id)
        self.period = None

    def period_determination(self, period_range=(0.1,3)):
        """A function to determine the period"""
        self.period = 0.3
        return

rr_lyrae = RRLyrae(obj_id)
rr_lyrae.period_determination()
print(rr_lyrae.mband_lc)
print(rr_lyrae.period)


{}
0.3
