In [None]:
import numpy as np
import matplotlib.pyplot as plt
import sys
import pickle

if './SelfCalGroupFinder/py/' not in sys.path:
    sys.path.append('./SelfCalGroupFinder/py/')
from pyutils import *
from dataloc import *
import groupcatalog as gc
from nnanalysis import *
from redshift_guesser import SimpleRedshiftGuesser, PhotometricRedshiftGuesser

%load_ext autoreload
%autoreload 2

In [2]:
blue_color = GLOBAL_RED_COLOR_CUT - 0.1
red_color = GLOBAL_RED_COLOR_CUT + 0.1

blue_dn = -1
red_dn = 3

results = is_quiescent_BGS_smart(np.array([7,8,9]), np.array([red_dn, np.nan, blue_dn]), np.array([blue_color, blue_color, red_color]))
assert results[0] == True
assert results[1] == False
assert results[2] == False

In [None]:
# Basic test of multiple versions of SimpleRedshiftGuesser
# Ensure it handles arrays of inputs and gives a reasonable answer for a couple obvious cases

# Target (lost galaxies) properties
t_app_mag = np.array([19.0,18.0,12.0])
t_pobs = np.array([0.5, 0.5, 0.5])
t_q = np.array([True, True, False])

# Neighbor properties
nn_z = np.array([0.1, 0.2, 0.3])
nn_dist = np.array([250.0, 3.0, 30.0])
nn_q = np.array([True, True, False])

simple = SimpleRedshiftGuesser(None, None, ver='5.0')
z, nn_used = simple.choose_redshift(nn_z, nn_dist, t_pobs, t_app_mag, t_q, nn_q)

assert not nn_used[0]
assert nn_used[1]
assert not nn_used[2]
assert z[0] > 0.0
assert z[1] == 0.2
assert z[2] < 0.1

simple = SimpleRedshiftGuesser(None, None, ver='4.0')
z, nn_used = simple.choose_redshift(nn_z, nn_dist, t_pobs, t_app_mag, t_q, nn_q)

assert not nn_used[0]
assert nn_used[1]
assert not nn_used[2]
assert z[0] > 0.0
assert z[1] == 0.2
assert z[2] < 0.1

simple = SimpleRedshiftGuesser(None, None, ver='2.0')
z, nn_used = simple.choose_redshift(nn_z, nn_dist, t_pobs, t_app_mag, t_q, nn_q)

assert not nn_used[0]
assert nn_used[1]
assert not nn_used[2]
assert z[0] > 0.0
assert z[1] == 0.2
assert z[2] < 0.1


In [None]:
target_prob_obs = None #[0.5]
target_app_mag = [19.0, 14.0, 18.0]
target_quiescent = [1.0, 0.0, 1.0]
neighbor_z = [0.1, 0.35, 0.05]
neighbor_ang_dist = [30.0, 120.0, 2.0]
nn_quiescent = [1.0, 0.0, 1.0]

nna = NNAnalyzer_cic.from_results_file(NEIGHBOR_ANALYSIS_SV3_BINS_FILE)
score_b = nna.get_score(target_prob_obs, target_app_mag, target_quiescent, neighbor_z, neighbor_ang_dist, nn_quiescent)
print(score_b)

assert score_b[0] > 0.01, score_b[0]
assert score_b[1] < 0.1, score_b[1]
assert score_b[2] > 0.4, score_b[2]


In [None]:
# Basic tests for PhotometricRedshiftGuesser

# Target (lost galaxies) properties
t_app_mag = np.array([19.0,18.0,12.0])
t_pobs = np.array([0.5, 0.5, 0.5])
t_q = np.array([True, True, False])
t_zphot = np.array([0.105, 0.230, 0.011])

# Neighbor properties
nn_z = np.array([[0.1, 0.2, 0.3],
                 [0.2, 0.3, 0.1]])
nn_dist = np.array([[250.0, 3.0,  30.0],
                    [260.0, 40.0, 40.0]])
nn_q = np.array([[True, True, False],
                 [False, False, False]])

scorer = PhotometricRedshiftGuesser.from_files(IAN_MXXL_LOST_APP_TO_Z_FILE, NEIGHBOR_ANALYSIS_SV3_BINS_FILE)
z, nn_used = scorer.choose_redshift(nn_z, nn_dist, t_zphot, t_pobs, t_app_mag, t_q, nn_q)

print(z)
print(nn_used)

# TODO more tests

assert np.isnan(nn_used[0]), "nn_used[0] should be NaN"
assert nn_used[1] == 0, "nn_used[1] should be 0"
assert np.isnan(nn_used[2]), "nn_used[2] should be NaN"

assert np.isnan(z).sum() == 0, "z should not have any NaNs"
assert z[0] > 0.0, "z[0] should be greater than 0.0"
assert z[1] == 0.2, "z[1] should be 0.2"
assert z[2] < 0.1, "z[2] should be less than 0.1"

### CIC Binning

In [None]:
# 2D test of my N-dimensional CIC binning function
data_2d = np.array([
    [0.0, 0.0], # Test corner case
    [0.0, -0.5], # Test left edge case
    [3.5, 3.5], # Test middle case
    [2, 5.9],
    [0.5, 40.0], # Test right edge case
    [-7.0, -3.0], # Extra edge case
])
first_dim  = np.linspace(0, 5, 6)
second_dim  = np.linspace(0, 6, 7)

bin_counts = cic_binning(data_2d, [first_dim, second_dim])
print(bin_counts)

assert np.shape(bin_counts) == (6, 7), np.shape(bin_counts)
assert np.sum(bin_counts) == len(data_2d), np.sum(bin_counts)
assert np.isclose(bin_counts[0,0], 3.0), bin_counts[0,0]
assert np.isclose(bin_counts[2,5], 0.1), bin_counts[2,5]
assert np.isclose(bin_counts[2,6], 0.9), bin_counts[2,6]
assert np.isclose(bin_counts[3,3], 0.25), bin_counts[3,3]
assert np.isclose(bin_counts[3,4], 0.25), bin_counts[3,4]
assert np.isclose(bin_counts[4,3], 0.25), bin_counts[4,3]
assert np.isclose(bin_counts[4,4], 0.25), bin_counts[4,4]
assert np.isclose(bin_counts[4,4], 0.25), bin_counts[4,4]
assert np.isclose(bin_counts[0,6], 0.5), bin_counts[0,6]
assert np.isclose(bin_counts[1,6], 0.5), bin_counts[1,]

In [None]:
# 3D test of CIC binning
data_3d = np.array([
    [0.0, 0.0, 0.0],  # Test corner case
    [1.5, 1.5, 1.5],  # Test middle case
    [10.0, 10.0, -10.0],  #  edge case
    [0.0, -1.0, 1.6]
])
first_dim_3d = np.linspace(0, 3, 4)
second_dim_3d = np.linspace(0, 4, 5)
third_dim_3d = np.linspace(0, 2, 3)

# Perform CIC binning
bin_counts_3d = cic_binning(data_3d, [first_dim_3d, second_dim_3d, third_dim_3d])
print(bin_counts_3d)

# Assertions to verify the binning results
assert np.shape(bin_counts_3d) == (4, 5, 3), np.shape(bin_counts_3d)
assert np.isclose(np.sum(bin_counts_3d), len(data_3d)), np.sum(bin_counts_3d)
assert np.isclose(bin_counts_3d[0,0,0], 1.0), bin_counts_3d[0,0,0]
assert np.isclose(bin_counts_3d[1,1,1], 1/8), bin_counts_3d[1,1,1]
assert np.isclose(bin_counts_3d[1,1,2], 1/8), bin_counts_3d[1,1,2]
assert np.isclose(bin_counts_3d[1,2,1], 1/8), bin_counts_3d[1,2,1]
assert np.isclose(bin_counts_3d[1,2,2], 1/8), bin_counts_3d[1,2,2]
assert np.isclose(bin_counts_3d[2,1,1], 1/8), bin_counts_3d[2,1,1]
assert np.isclose(bin_counts_3d[2,1,2], 1/8), bin_counts_3d[2,1,2]
assert np.isclose(bin_counts_3d[2,2,1], 1/8), bin_counts_3d[2,2,1]
assert np.isclose(bin_counts_3d[2,2,2], 1/8), bin_counts_3d[2,2,2]
assert np.isclose(bin_counts_3d[3,4,0], 1.0), bin_counts_3d[3,4,0]
assert np.isclose(bin_counts_3d[0,0,1], 0.4), bin_counts_3d[3,4,1]
assert np.isclose(bin_counts_3d[0,0,2], 0.6), bin_counts_3d[3,4,1]

In [None]:
# Test CIC binning with weights
data_2d = np.array([
    [0.0, 0.0],
    [0.0, 1.0],
])
first_dim  = np.linspace(0, 1, 2)
second_dim  = np.linspace(0, 1, 2)

bin_counts = cic_binning(data_2d, [first_dim, second_dim], weights=[0.66, 2.99])
print(bin_counts)

assert np.shape(bin_counts) == (2, 2), np.shape(bin_counts)
assert np.isclose(bin_counts[0,0], 0.66), bin_counts[0,0]
assert np.isclose(bin_counts[0,1], 2.99), bin_counts[0,0]

In [None]:
# Test CIC binning with repeats
data_2d = np.array([
    [0.0, 0.0],  
    [1.0, 1.0],  
    [1.0, 1.0],  
    [1.0, 1.0],  
])
first_dim  = np.linspace(0, 1, 2)
second_dim  = np.linspace(0, 1, 2)

bin_counts = cic_binning(data_2d, [first_dim, second_dim])
print(bin_counts)

assert np.shape(bin_counts) == (2, 2), np.shape(bin_counts)
assert np.isclose(bin_counts[0,0], 1.0), bin_counts[0,0]
assert np.isclose(bin_counts[0,1], 0.0), bin_counts[0,1]
assert np.isclose(bin_counts[1,0], 0.0), bin_counts[1,0]
assert np.isclose(bin_counts[1,1], 3.0), bin_counts[1,1]

In [None]:
# Test CIC binning with negative dimensional ranges
data_2d = np.array([
    [-1.0, -1.2],  
    [0.5, 1.0],  
])
first_dim  = np.linspace(-1, 1, 3)
second_dim  = np.linspace(-1, 1, 3)

bin_counts = cic_binning(data_2d, [first_dim, second_dim])
print(bin_counts)

assert np.shape(bin_counts) == (3, 3), np.shape(bin_counts)
assert np.isclose(np.sum(bin_counts), 2.0), bin_counts[0,0]
assert np.isclose(bin_counts[0,0], 1.0), bin_counts[0,0]
assert np.isclose(bin_counts[1,2], 0.5), bin_counts[1,1]
assert np.isclose(bin_counts[2,2], 0.5), bin_counts[1,1]

In [None]:
# Stress test of CIC binning
data_stress = np.random.rand(1000000, 5) # 5M rows of random data
dim_stress = np.linspace(0, 1, 11)

# Perform CIC binning
bin_counts_stress = cic_binning(data_stress, [dim_stress, dim_stress, dim_stress, dim_stress, dim_stress])   

# Assertions to verify the binning results
assert np.shape(bin_counts_stress) == (11, 11, 11, 11, 11), np.shape(bin_counts_stress)
assert np.isclose(np.sum(bin_counts_stress), len(data_stress)), np.sum(bin_counts_stress)

with np.printoptions(precision=1, suppress=True, linewidth=100):
    print(np.sum(np.sum(np.sum(bin_counts_stress, axis=0), axis=0), axis=0))

### Other

In [None]:
# TODO not sure what this was
"""
with open('SimpleRedshiftGuesserMap.pkl', 'rb') as f:    
    app_mag_bins, the_map = pickle.load(f)

print(the_map.keys())

indexes = [10,11,30,45]
# histogram of the map at those indexes
for i in indexes:
    plt.hist(the_map[i], bins = 20)
    plt.title(f'app mag ~ {app_mag_bins[i-1]}')
    plt.show()

test_mags = np.linspace(12.0, 20.0, 10000)
test_z = np.linspace(0.0, 0.5, 10000) * np.random.rand(10000)
app_mag_bins, the_map = build_app_mag_to_z_map_new(test_mags, test_z)

print(the_map)
"""