In [2]:
import sys
import unittest
import numpy as np
import torch
sys.path.append("../../hyperLAI")
sys.path.append("../../../libraries")
from utils.sim_funcs import sim_func_dict
from utils.model_utils import *
from utils.generate_dataset import *
from features.hyperLAIdataset import HyperLoader
from models.fc_model import fc_model
from utils.sim_funcs import sim_func_dict

In [4]:
data_dir = "/scratch/users/patelas/hyperLAI/snp_data/"
chrom = 22
dataset = HyperLoader(data_dir, [0,1,2,3,4,5,6], 22)
chr22_data = load_dataset(data_dir + "ref_final_beagle_phased_1kg_hgdp_sgdp_chr%s_hg19.vcf.gz"%(chrom), 
                                     data_dir + "reference_panel_metadata.tsv", "./", chromosome=chrom, 
                                     verbose=True, filter_admixed=True, filter_missing_coord=True)
# dataset_restricted = HyperLoader(data_dir, [0,1,2], 22)

start reading ...
File read: 317408 SNPs for 3558 individuals
done loading vcf... Shape of vcf file is :  (317408, 3558, 2) Shape of vcf names file is (3558,)
TSV loaded Index(['Sample', 'Population code', 'Population', 'Superpopulation code',
       'Superpopulation', 'Source', 'Latitude', 'Longitude', 'Region',
       'Sample Alias', 'Country', 'Town', 'Single_Ancestry'],
      dtype='object') DF shape is  (3558, 13)
Super population code are ,  ['EUR', 'EAS', 'AMR', 'SAS', 'AFR', 'OCE', 'WAS'] 7
Population code are ,  ['GBR', 'FIN', 'CHS', 'PUR', 'CDX', 'CLM', 'IBS', 'PEL', 'PJL', 'KHV', 'ACB', 'GWD', 'ESN', 'BEB', 'MSL', 'STU', 'ITU', 'CEU', 'YRI', 'CHB', 'JPT', 'LWK', 'ASW', 'MXL', 'TSI', 'GIH', nan] 27
Population names are ,  ['British', 'Finnish', 'Southern Han Chinese', 'Puerto Rican', 'Dai Chinese', 'Colombian', 'Spanish', 'Peruvian', 'Punjabi', 'Kinh Vietnamese', 'African-Caribbean', 'Gambian Mandinka', 'Esan', 'Bengali', 'Mende', 'Sri Lankan', 'Indian Telugu', 'CEPH', 'Yorub

In [7]:
# dataset2 = HyperLoader("/scratch/users/patelas/hyperLAI/snp_data/whole_genome/variance_filtered_500000/", [0,1,2,3,4,5,6], "all")




In [127]:
class TestDataLoader(unittest.TestCase):
    def test_correct_pos(self):
        '''
        Tests that the indices of the data in the HyperLoader class match what they should
        Specifically, in comparison to the indices of load_dataset
        Since each individual has 2 haplotypes, index 12 of the dataloader should correspond to the start of the individual at index 6 from load_dataset
        '''
        #Extract the first and second sets of chromosomes for individual 6 (just first 5 snps)
        chr22_snps_first = chr22_data[0][0:5,6,0]
        chr22_snps_second = chr22_data[0][0:5,6,1]
        #Extract what should be the corresponding sets of SNPs
        dataset_snps_first = dataset[12][0][0:5].numpy()
        dataset_snps_second = dataset[13][0][0:5].numpy()
        #Ensure the SNPs are equal
        self.assertEqual(np.sum(chr22_snps_first == dataset_snps_first), 5,
                        "Order between dataloader and initial data is wrong")
        self.assertEqual(np.sum(chr22_snps_second == dataset_snps_second), 5,
                        "Order between dataloader and initial data is wrong for second haplotype")
        #Ensure the suppop and pop labels match (only for index 12, we will do 13 in the next test)
        self.assertEqual(chr22_data[1][6], dataset[12][2],
                        "Superpopulation labels don't match")
        self.assertEqual(chr22_data[2][6], dataset[12][1],
                        "Population labels don't match")
    def test_correct_repeat(self):
        '''
        Both indices 12 come from the same individual, so the labels should match. 
        Checks this is the case. 
        '''
        #Ensure labels are same for both sets of chromosomes for one individual
        self.assertEqual(dataset[12][1], dataset[13][1],
                        "Superpopulation labels don't match between the same individual")
        self.assertEqual(dataset[12][2], dataset[13][2],
                        "Population labels don't match between the same individual")
        #Do same for another individual
        self.assertEqual(dataset[1256][1], dataset[1257][1],
                        "Superpopulation labels don't match between the same individual")
        self.assertEqual(dataset[1256][2], dataset[1257][2],
                        "Population labels don't match between the same individual")
    def test_restricted(self):
        '''
        Test that restricting to certain continents works
        '''
        #Make sure no unwanted labels are there
        self.assertEqual(np.sum(np.isin(dataset_restricted.suppop_labels, [3,4,5,6])), 0,
                        "Contains labels from individuals that shouldn't be there")
        #Make sure SNP data has same size as label data
        self.assertEqual(dataset_restricted.snps.shape[0], dataset_restricted.suppop_labels.shape[0],
                        "Individuals were not subsetted properly")
        
        

In [129]:
unittest.main(argv=['first-arg-is-ignored'], exit=False)


.......
----------------------------------------------------------------------
Ran 7 tests in 0.009s

OK


<unittest.main.TestProgram at 0x7ff97f9b8978>

In [126]:
model = fc_model(1000, 3, [50,40,30], 20, 
                     [0.1, 0.2, 0.1], 0.01, 1e-3, 1e-2, 0.999)

print(model)
print(model.HypLoss.temperature)
print(model.HypLoss.init_size)

fc_model(
  (fc_layers): ModuleList(
    (0): Sequential(
      (0): Linear(in_features=1000, out_features=50, bias=True)
      (1): ReLU()
      (2): Dropout(p=0.1, inplace=False)
    )
    (1): Sequential(
      (0): Linear(in_features=50, out_features=40, bias=True)
      (1): ReLU()
      (2): Dropout(p=0.2, inplace=False)
    )
    (2): Sequential(
      (0): Linear(in_features=40, out_features=30, bias=True)
      (1): ReLU()
      (2): Dropout(p=0.1, inplace=False)
    )
  )
  (final_layer): Linear(in_features=30, out_features=20, bias=True)
  (HypLoss): HyperbolicLoss()
)
<bound method Module.named_parameters of fc_model(
  (fc_layers): ModuleList(
    (0): Sequential(
      (0): Linear(in_features=1000, out_features=50, bias=True)
      (1): ReLU()
      (2): Dropout(p=0.1, inplace=False)
    )
    (1): Sequential(
      (0): Linear(in_features=50, out_features=40, bias=True)
      (1): ReLU()
      (2): Dropout(p=0.2, inplace=False)
    )
    (2): Sequential(
      (0): Linea

In [32]:
class test_utils(unittest.TestCase):
    '''
    Testing various utilities that are important in the training process
    '''
    def test_triple_ids(self):
        '''
        Testing that for a total of 7 elements,
        generate_triple_ids produces 35 unique elements
        '''
        ids = generate_triple_ids(7)
        self.assertEqual(len(ids), 35)
        self.assertEqual(len(np.unique(ids, axis=0)), 35)
    def test_pairwise_similarities(self):
        '''
        Ensures that the pairwise similarities are calculated correctly
        This also indirectly tests the correctness of the hamming distance function
        '''
        #Create fake "SNP data"
        dummy_data = torch.tensor([[0,0,0,1],[1,0,0,1],[1,1,1,1]])
        #Calculate similarity matrix using function
        sim_mat = make_pairwise_similarities(dummy_data, sim_func_dict["hamming"])
        #Expected result
        exp_result = np.array([[1, 0.75, 0.25], [0.75, 1, 0.5], [0.25, 0.5, 1]])
        #Determine if the two are equal
        self.assertEqual(np.sum(sim_mat.numpy() != exp_result), 0)
    def test_trips_and_sims(self):
        '''
        Tests that the trips_and_sims function works as it should
        '''
        #Fake data
        dummy_data = torch.tensor([[0,0,0,1],[1,0,0,1],[1,1,1,1], [0,0,0,0], [0,1,1,0]])
        #Produce similarity matrix (already tested)
        sim_mat = make_pairwise_similarities(dummy_data, sim_func_dict["hamming"])
        #Expected triples to be produced (already tested)
        trips_exp = generate_triple_ids(5)
        #Produce triples and similarities
        trips, sims = trips_and_sims(dummy_data, sim_func_dict["hamming"])
        #Assert trips are equal
        self.assertEqual(np.sum(np.array(trips_exp) != trips.numpy()), 0)
        #What the last set of three similarities is expected to be
        exp_last_sim = np.array([sim_mat[2,3], sim_mat[2,4], sim_mat[3,4]])
        #Assert sims are equal
        self.assertEqual(np.sum(exp_last_sim != sims.numpy()[-1]), 0)
    def test_train_test_split(self):
        '''
        Makes sure the results of the train-test split are reproducible
        '''
        #Run it twice
        tr1, va1, te1 = train_valid_test(10, 0.8, 0.1)
        tr2, va2, te2 = train_valid_test(10, 0.8, 0.1)
        #Assert valid and test are equal (so train is as well) for both runs
        self.assertEqual(va1, va2)
        self.assertEqual(te1, te2)
    def test_variance(self):
        class DummyLoader():
            def __init__(self, dummy_data):
                self.snps = dummy_data
        fake_snps = np.array([[1,1,1,1,0,0,0,0], [1,0,1,0,1,1,1,1], [1,0,0,1,0,0,0,0], [0,0,0,0,1,1,1,1]]).T
        dummy_data1 = DummyLoader(fake_snps)        
        dummy_data2 = DummyLoader(fake_snps)
        variance_filter(dummy_data1, [0,1,2,3], 2)
        variance_filter(dummy_data2, [0,1,2,3,4,5,6,7], 2)
        self.assertEqual(np.sum(dummy_data1.snps != fake_snps[:,[2,1]]), 0)
        self.assertEqual(np.sum(dummy_data2.snps != fake_snps[:,[3,0]]), 0)
        
        
        
        
        

In [33]:
unittest.main(argv=['first-arg-is-ignored'], exit=False)


.....
----------------------------------------------------------------------
Ran 5 tests in 0.007s

OK


<unittest.main.TestProgram at 0x7f19c3d90cc0>

In [86]:
ids = generate_triple_ids(7)

In [7]:
dummy_data= DummyLoader(np.array([[1,1,1,1,0,0,0,0], [1,0,1,0,1,1,1,1], [1,0,0,1,0,0,0,0], [0,0,0,0,1,1,1,1]]).T)

In [9]:
variance_filter(dummy_data, [0,1,2,3], 2)

AttributeError: 'NoneType' object has no attribute 'snps'

In [5]:
class DummyLoader():
    def __init__(self, dummy_data):
        self.snps = dummy_data

In [2]:
dummy_data.snps

NameError: name 'dummy_data' is not defined