In [1]:
#imports
import numpy as np
import statistics
from statistics import mode
import pandas as pd
import time

In [28]:
#cartesian coordinates
def dist1(star_i, star_j):
    return np.arccos(np.dot(star_i, star_j))

In [3]:
# Create Feature Set
def create_feature_set(catalog, FOV):
    feature_set = [] # [(id(0), id(1)), theta < FOV]
    for i in range(len(catalog)):
        for j in range(i+1, len(catalog)):
            theta = dist1(catalog['position'][i], catalog['position'][j])
            if theta <= FOV:
                feature_set.append((catalog['id'][i], catalog['id'][j], theta))
    return feature_set

In [4]:
def initialise(stars, ids, num_poles):
    init = []
    for i in range(len(stars)):
        init.append((stars[i], ids[i]))
    init = np.array(list(init), dtype=[('position', object), ('id', object)])
    init = init[:num_poles]
    combos = []
    for i in range(len(stars)-1, 0, -1):
        for j in range(i-1, -1, -1):
            combos.append([init[i], init[j]])
    return combos

In [5]:
#Create neighbour set of first pole using angular distances
def create_distance_set(stars, pole):
    distance = [(star, pole, dist1(star, pole)) for star in stars if not np.array_equal(star, pole)]
    return np.array(distance, dtype=[('star', object), ('pole', object), ('distance', object)])

In [6]:
def create_pair_list_simple(feature_set, distance, eps_pi):
    pairs = []
    distances = []
    for i in range(len(feature_set)):
        if np.abs(feature_set[i][2] - distance) <= eps_pi:
            pairs.append((feature_set[i][0], feature_set[i][1]))
        distances.append(((feature_set[i][0], feature_set[i][1]), feature_set[i][2], distance, np.abs(feature_set[i][2] - distance)))
    distances = np.array(distances, dtype = [("feature ids", object), ("ground truth", object), ("calculated", object), ("distance", object)])
    distances = np.sort(distances, order='distance')
    return pairs

In [7]:
def create_pair_list_long(feature_set, distances, eps_pi):
    pairs = []
    for j, distance in enumerate(distances):
        for i in range(len(feature_set)):
            if np.abs(feature_set[i][2] - distance['distance']) <= eps_pi:
                pairs.append((j, (feature_set[i][0], feature_set[i][1])))
                #if j >= N:
                #    pairs.append((j+1, (feature_set[i][0], feature_set[i][1])))
                #else:
                #    pairs.append((j, (feature_set[i][0], feature_set[i][1])))
    return np.array(pairs, dtype=[('index', object), ('feature ids', object)])

In [8]:
#pole is one that occurs most in list. one vote per pole per star

def find_pole_in_pairs(pair_list, gt):
    index_counts = list(np.empty([len(stars),1]))
    counts = []
    for pair in pair_list:
        for star_id in pair['feature ids']:
            if star_id not in index_counts[pair['index']]:
                index_counts[int(pair['index'])] = np.append(index_counts[int(pair['index'])], star_id)
                counts.append(star_id)
    print("ground truth in counts", counts.count(gt))
    print("max counts", counts.count(mode(counts)))
    return mode(counts), counts


In [9]:
# neighbours are all stars paired with pole

def create_neighbour_set(pairs, pole):
    pole_neighbours = []
    for pair in pairs:
        if pair['feature ids'][0] == pole:
            pole_neighbours.append((pair['index'], pair['feature ids'][1]))
        elif pair['feature ids'][1] == pole:
            pole_neighbours.append((pair['index'], pair['feature ids'][0]))
    pole_neighbours = np.array(pole_neighbours, dtype=[('index', object), ('id', object)])
    return pole_neighbours

def create_neighbour_id_set(pairs, pole):
    return [ID for pair in pairs for ID in pair if pole in pair and ID != pole]

In [10]:
def intersection(set_1, set_2):    
    intersection = [(star['index'], star['id']) for star in set_1 if star in set_2]
    return intersection

In [11]:
#returns intersection list if u and w true, else returns 0

def verification_phase(pole_i, neighbours_i, pole_j, neighbours_j, th):
    u = (pole_i['id'] in neighbours_j['id']) and (pole_j['id'] in neighbours_i['id'])
    if not u:
        return 'not u'
    V = intersection(neighbours_i, neighbours_j)
    w = len(V) >= th
    if not w:
        return 'not w'
    position = np.where(neighbours_i['id'] == pole_j['id'])
    V.append(neighbours_i[position][0])
    V = np.array(V, dtype = [('index', object), ('id', object)])
    return V

In [30]:
def confirmation_phase(verified, confirmed, features, stars):
    if len(verified) == 0:
        print(confirmed['position'])
        #label_false_stars(confirmed, stars)
        return confirmed
    v = verified[0]
    verified = verified[1:]
    v_pos = stars[v['index']]
    c_pos = confirmed[-1]['position']
    distance = dist1(v_pos.tolist(), c_pos.tolist())
    confirmed_features = [feature for feature in features if confirmed[-1]['id'] in feature]
    pairs = create_pair_list_simple(confirmed_features, distance, eps_pi)
    pairs = [pair for pair in pairs if v['id'] in pair]
    
    if len(pairs) == 0:
        return confirmation_phase(verified, confirmed, features, stars)
    else:
        dtype = confirmed.dtype
        confirmed = list(confirmed)
        confirmed.append((v['id'], v_pos))
        confirmed = np.array(confirmed, dtype=dtype)
        return confirmation_phase(verified, confirmed, features, stars)
    return

In [13]:
#returns pole and neighbour ids
def acceptance_phase(stars, features, candidate_pole, eps_pi, gt):
    distances = create_distance_set(stars, candidate_pole)
    pairs = create_pair_list_long(features, distances, eps_pi)
    pole, counts = find_pole_in_pairs(pairs, gt)
    neighbours = create_neighbour_set(pairs, pole)
    pole = np.array((pole, candidate_pole), dtype=[('id', object), ('position', object)])
    return pole, neighbours, counts

In [14]:
'''def recursive_MPA(stars, candidate_poles, i, j, features, th):
    if i == 0:
        return -1
    pole_i, neighbours_i, counts = acceptance_phase(stars, features, candidate_poles['position'][i], eps_pi, N, candidate_poles['id'][i])
    print("id in catalog", candidate_poles['id'][i] in catalog['id'])
    print("id in pair list", candidate_poles['id'][i] in counts)
    print("GROUND TRUTH POLE FOUND: ", pole_i['id'], candidate_poles['id'][i] == pole_i['id'])
    print("NEIGHBOUR SET LENGTH: ", len(neighbours_i))
    
    pole_j, neighbours_j, counts = acceptance_phase(stars, features, candidate_poles['position'][j], eps_pi, N, candidate_poles['id'][j])
    print("id in catalog", candidate_poles['id'][i] in catalog['id'])
    print("id in pair list", candidate_poles['id'][j] in counts)
    print("GROUND TRUTH POLE FOUND: ", pole_j['id'], candidate_poles['id'][j] == pole_j['id'])
    print("NEIGHBOUR SET LENGTH: ", len(neighbours_j))
    
    verified = verification_phase(pole_i, neighbours_i, pole_j, neighbours_j, th)
    if not isinstance(verified, str):
        confirmed = np.array([(pole_i['id'], pole_i['position'])], dtype=[('id', int), ('position', list)])
        return confirmation_phase(verified, confirmed, features, stars)
    elif verified == 'not u':
        print("not u")
        return recursive_MPA(stars, candidate_poles, i-1, i-2, features, th)
    elif verified == 'not w':
        print("not w")
        return recursive_MPA(stars, candidate_poles, i, j-1, features, th)
    else:
        print(f"error: verified returned wrong type/output... {verified}")'''
        
def recursive_MPA(stars, pole_combinations, i, features):
    if i == len(pole_combinations):
        return -1
    pole_i, neighbours_i, counts = acceptance_phase(stars, features, pole_combinations[i][0]['position'], eps_pi, pole_combinations[i][0]['id'])
    #print("acceptance i ground truth", pole_combinations[i][0]['id'])
    #print("id in catalog", pole_combinations[i][0]['id'] in catalog['id'])
    #print("id in pair list", pole_combinations[i][0]['id'] in counts)
    print("GROUND TRUTH POLE FOUND: ", pole_i['id'], pole_combinations[i][0]['id'] == pole_i['id'])
    #print("NEIGHBOUR SET LENGTH: ", len(neighbours_i))
    
    pole_j, neighbours_j, counts = acceptance_phase(stars, features, pole_combinations[i][1]['position'], eps_pi, pole_combinations[i][1]['id'])
    #print("acceptance j ground truth", pole_combinations[i][1]['id'])
    #print("id in catalog", pole_combinations[i][1]['id'] in catalog['id'])
    #print("id in pair list", pole_combinations[i][1]['id'] in counts)
    print("GROUND TRUTH POLE FOUND: ", pole_j['id'], pole_combinations[i][1]['id'] == pole_j['id'])
    #print("NEIGHBOUR SET LENGTH: ", len(neighbours_i))
    
    verified = verification_phase(pole_i, neighbours_i, pole_j, neighbours_j, th)
    if not isinstance(verified, str):
        confirmed = np.array([(pole_i['id'], pole_i['position'])], dtype=[('id', object), ('position', object)])
        return confirmation_phase(verified, confirmed, features, stars)
    else:
        return recursive_MPA(stars, pole_combinations, i+1, features)

In [31]:
def get_data_from_file_arr(file):
    catalog = []
    for i, line in enumerate(file):
        line = list(map(float, line.split()))
        if np.linalg.norm(line) - 1.54 <= 5.5:
            line = line/np.linalg.norm(line)
            catalog.append((i, line))
    return np.array(catalog, dtype=[('id', object), ('position', object)])

def get_data_from_file(stars, ids):
    ids = ids.read().split()
    star_image = []
    ids_image = []
    for i, line in enumerate(stars):
        line = list(map(float, line.split()))
        if np.linalg.norm(line) - 1.54 <= 5.5:
            line = line/np.linalg.norm(line)
            star_image.append(list(line))
            ids_image.append(int(ids[i]))
    return np.array(star_image), ids_image

FOV = np.deg2rad(14)
eps_pi = 10e-5
th = 5

#file = open("HIP_catalog_vectors_with_mag_less_than_6", "r")
#catalog = get_data_from_file_arr(file)
#features = create_feature_set(catalog, FOV)

start = time.time()
#for eps_pi in eps_pis:
print(f'{eps_pi}')
for i in range(1):
    i = 3
    print(i, end='\r')
    stars = open(f"0 2/stars_{i}")
    ids = open(f"0 2/id_{i}")
    stars, ids = get_data_from_file(stars, ids)
    Rp = len(stars)

    pole_combinations = initialise(stars, ids, Rp)
    chain_set = recursive_MPA(stars, pole_combinations, 0, features)

    #chain_set = recursive_MPA(stars, candidate_poles, N, N-1, features, th)

    if not isinstance(chain_set, int):
        print("correctly calculated ids", len([i for i in chain_set['id'] if i in ids]))
        print("ground truth ids", len([i for i in ids if i in catalog['id']]))
    else:
        print("no result")
    get_results(i, ids, chain_set, f'output/{eps_pi}')
end = time.time()
print("\ntime", end-start)


0.0001
ground truth in counts 8
max counts 8
GROUND TRUTH POLE FOUND:  2425 True
ground truth in counts 8
max counts 8
GROUND TRUTH POLE FOUND:  2407 True
[array(array([ 0.11502778, -0.04491449,  0.99234636]), dtype=object)
 array([-0.10986611, -0.03472711,  0.99333955])
 array([0.09083594, 0.02376832, 0.99558219])
 array([ 0.03169557, -0.03475242,  0.99889322])
 array([ 0.0340467 , -0.09032346,  0.99533034])
 array([-0.07762772, -0.02176654,  0.99674478])
 array([-0.06372659,  0.10522019,  0.99240497])]
correctly calculated ids 7
ground truth ids 10


NameError: name 'get_results' is not defined

In [None]:
def label_false_stars(confirmed, stars):
    print(confirmed)
    confirmed_ids = []
    for star in stars:
        for confirmed_star in confirmed['position']:
            print(star == confirmed_star)
            if star == confirmed_star:
                confirmed_ids.append(confirmed_star)
                break
            confirmed_ids.append(-1)
    print(confirmed_ids)
    return confirmed_ids

In [26]:
a = np.array([(1, 2)], dtype=[('id', object), ('position', object)])
print(a)
a = [-0.1098661097413664, -0.03472710675645746, 0.9933395522109364]
b = [0.11502778008127919, -0.04491448755009479, 0.9923463602078084]
print(np.arccos(np.dot(a, b)))

[(1, 2)]
0.22560484240782508


In [None]:
def get_results(i, ids, chain_set, out_dir):
    no_result = isinstance(chain_set, int)
    if no_result:
        # write 0 to id rate
        result_str = '-1, -1\n'
        with open(out_dir, 'a') as f:
            f.write(result_str)
    else:
        id_set = [link for link in chain_set['id'] if link in ids]
        id_rate = len(id_set) / (len(ids) - ids.count(-1))
        error_rate = (len([link for link in chain_set['id'] if not link in ids])) / len(id_set)

        result_str = str(id_rate) + ', ' + str(error_rate) + '\n'
        with open(out_dir, 'a') as f:
            f.write(result_str)
    return
    

graphs x axis is pixel noise, y axis is id rate
same for x axis is mag noise
same for x axis is false stars percentage against true stars
each graph for each FOV
return file with numbers
implement id rate metric
implement id = -1 for unidentified stars
1. id rate = frac{true positive}{total number of stars} > 0.7
2. measure false recognition rate. False positives/total N
3. No result. number of times back out of verification/total times run

TP + FP + NR = number of images in set (1000)

if it's not true positive or no result, it's false positive

get pole star from Angular distance between star and [0,0,1]
eps_pi test from 2e-4 to 6e-4 in steps of 0.8
- 2e-4, 2.8e-4, 3.6e-4, 4.4e-4, 5.2e-4, 6e-4
- 10e-5 -> 6e-4 steps of 2e-5

test on data/5/FOV 14

after all that, set feature set threshold to <= 6 instead of 5.5

In [None]:
print((6e-4 - 10e-5) / 5)
print(f'{6e-4 - 1e-4:.1E}')
print(f'{5e-4 - 1e-4:.1E}')
eps_pis = [6e-4, 5e-4, 4e-4, 3e-4, 2e-4, 1e-4]