In [1]:
import numpy as np
from sklearn.decomposition import PCA

#### Step 1: Read training data from pca_matrix.dat

In [2]:
# Read the entire pca_matrix
def ReadPCAMatrixData(pca_matrix: list):
    with open(file = 'pca_matrix.dat', mode = 'r') as pca_matrix_file:
        while 1:
            line = pca_matrix_file.readline()
            if not line:
                break
            line = [ord(x) - ord('0') for x in line]  # convert characters to integers
            line.pop()
            pca_matrix.append(line)

In [3]:
pca_train_data = list()  # create an empty list
ReadPCAMatrixData(pca_train_data)
print('size of training data: ', len(pca_train_data))
print('dimension: ', len(pca_train_data[0]))

size of training data:  150
dimension:  32768


#### Step 2: PCA Training Process

Create PCA object, train PCA and save the object:

In [4]:
# Code to save pca
import pickle

def save_obj(obj, file_name):
    with open(file_name + '.pkl', 'wb') as file:
        pickle.dump(obj, file, pickle.HIGHEST_PROTOCOL)

def load_obj(file_name):
    with open(file_name + '.pkl', 'rb') as file:
        return pickle.load(file)

In [5]:
def PCAProcess(pca_train: list):
    # Create PCA object
    pca = PCA(n_components = len(pca_train))

    # Fit with training data
    np_pca_train = np.array(pca_train)
    pca.fit(np_pca_train)

    # Save pca
    save_obj(pca, 'pca')

In [6]:
PCAProcess(pca_train_data)

#### Step 3: Read testing data and perform rasterization

In [7]:
def ReadPCATestData(file_name):
    test_data = []
    with open(file = file_name, mode = 'r') as file:
        for line in file.readlines():
            if not line:
                break
            tmp = line.split()
            test_data.append([float(x) for x in tmp])
    
    return test_data

In [8]:
from math import floor

def Standardize(points):
    
    return

def Rasterize(
    points, N: int,
    x_min: float, x_max: float, 
    y_min: float, y_max: float, 
    z_min: float, z_max: float):

    bool_arr = [0] * (N ** 3)    
    for i in range(len(points)):
        i_x = floor(N * (points[i][0] - x_min) / (x_max - x_min))
        i_y = floor(N * (points[i][1] - y_min) / (y_max - y_min))
        i_z = floor(N * (points[i][2] - z_min) / (z_max - z_min))
        bool_arr[i_x + N * i_y + N * N * i_z] = 1
    
    return bool_arr


In [10]:
point_test_data = ReadPCATestData('E:\\SRTP\\part\\000\\points.txt')
pca_test_data = Rasterize(point_test_data, 32, -0.6, 0.6, -0.6, 0.6, -0.6, 0.6)
pca_test_data = np.array([pca_test_data])

In [None]:
# # Split data into training data and testing data
# # parameter ratio is the proportion of data for testing
# def SplitPCAMatrixData(pca_matrix: list, ratio):
#     size_for_train = int(len(pca_matrix) * (1 - ratio))
#     return pca_matrix[:size_for_train], pca_matrix[size_for_train:]

# pca_train, pca_test = SplitPCAMatrixData(pca_matrix, 0.05)

# print('size of training data:', len(pca_train))
# print('size of testing data:', len(pca_test))

#### Step 4: Load `pca` and perform transformation on testing data

In [None]:
pca = load_obj('pca')

In [None]:
pca_train_result = pca.transform(pca_train_data)
pca_test_result = pca.transform(pca_test_data)
print(len(pca_train_result))
print(len(pca_test_result))

#### Step 5: Find the most similar model

Select a model in the testing set, and find the most similar model in the training set

Method 1: Iterate all the possibilities

In [None]:
import heapq

def FindMostSimilarModel1(train_result: list, test_result: list, index = 0):
    min_dist = 999999999999
    min_index = cur_index = 0
    for row in train_result:
        dist = np.linalg.norm(row - test_result[index])
        if dist < min_dist:
            min_dist = dist
            min_index = cur_index
        cur_index = cur_index + 1
    return min_index

def FindMostSimilarModel(train_result: list, test_result: list, index = 0, n = 1):
    # When n is 1, it is more efficient to iterate directly rather than using priority queue
    if n == 1:
        return FindMostSimilarModel1(train_result, test_result, index)
    
    # When n is greater than 1, use priority queue
    tuple_heap = []
    cur_index = 0
    for row in train_result:
        cur_dist = np.linalg.norm(row - test_result[index])
        cur_tuple = (cur_dist, cur_index)
        heapq.heappush(tuple_heap, cur_tuple)
        cur_index = cur_index + 1
    return heapq.nsmallest(n, tuple_heap)

In [None]:
print(FindMostSimilarModel(pca_train_result, pca_test_result, n = 1))

Method 2: HNSW

In [None]:
import hnswlib

Create the index object:

In [None]:
dimension = len(pca_test_result[0])
hnsw_index = hnswlib.Index('l2', dim = dimension)
hnsw_index.init_index(max_elements = 157)
hnsw_index.add_items(pca_train_result)

Save the index object:

In [None]:
save_obj(hnsw_index, 'hnsw_index')

Load the index object:

In [None]:
hnsw_index = load_obj('hnsw_index')

In [None]:
hnsw_index.knn_query(pca_test_result, k = 3)

Method 3: LSH

In [None]:
# from datasketch import WeightedMinHashGenerator, MinHashLSH
# from tqdm import tqdm

# lsh_train = np.array(result_train)
# lsh_test = np.array(result_test)

# mg = WeightedMinHashGenerator(lsh_train.shape[1])
# lsh = MinHashLSH(threshold = 0.5)
# for lsh_index, lsh_value in tqdm(enumerate(lsh_train)):
#     m_hash = mg.minhash(lsh_value)
#     lsh.insert(lsh_index, m_hash)

# lsh_result = list()
# for test_case in lsh_test:
#     lsh_result.append(lsh.query(mg.minhash(test_case)))
# print(lsh_result)

# from datasketch import WeightedMinHashGenerator, MinHashLSHForest
# from tqdm import tqdm

# lsh_train = np.array(result_train)
# lsh_test = np.array(result_test)

# mg = WeightedMinHashGenerator(lsh_train.shape[1])
# forest = MinHashLSHForest()
# for lsh_index, lsh_value in tqdm(enumerate(lsh_train)):
#     m_hash = mg.minhash(lsh_value)
#     forest.add(lsh_index, m_hash)

# forest.index()
# lsh_result = list()
# for test_case in lsh_test:
#     lsh_result.append(forest.query(mg.minhash(test_case), k = 6))
# print(lsh_result)