In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals
#import tensorflow as tf
import keras
from keras import regularizers
import numpy as np
import matplotlib.pyplot as plt
from astropy.io import fits
from astropy.table import Table
import numpy as np
import seaborn as sns
import os
import re
import time
sns.set()

Using TensorFlow backend.


In [1]:
def get_filenames(path='.', extension=None, pattern=None, identifiers=None, include_path=False):
   
    # retrieve all filenames from the directory
    filename_list = os.listdir(path)
    
    # keep all filenames with the proper extension
    if extension is not None:
        
        filename_list = [filename for filename in filename_list if
                         filename[-len(extension):] == extension]
        
    # keep all filenames that match the pattern
    if pattern is not None:
        filename_list = [filename for filename in filename_list if re.search(pattern, filename)]
        
    # keep all filenames that match the identifiers provided
    if identifiers is not None:
        storage_list = []
        
        try:
            for ident in identifiers:
                storage_list.extend([filename for filename in filename_list if str(ident) in filename])
                
        except TypeError:
            print(identifiers, 'is not a list, tuple, or otherwise iterable')
            
        else:
            filename_list = storage_list
            
    if include_path:
        filename_list = [path + filename for filename in filename_list]
        
    return filename_list


def get_filevalues(path, filename_list): 
    
    list_fluxarrays = []
    list_classtype = []
    list_noise = []
    list_wavelength = []
    list_redshift = []
    
    
    for i in range(len(filename_list)):
        with fits.open(str(path) +str(filename_list[i])+ "", memmap = False ) as hdul:
            
            data_c = hdul['COADD'].data
            data_s = hdul['SPALL'].data
            
            flux_val = data_c.field("flux")
            list_fluxarrays.append(flux_val) 
            
            classtype = data_s.field('CLASS')
            list_classtype.append(classtype)
            
            noise_val = data_s.field('SN_MEDIAN_ALL')
            list_noise.append(noise_val)
            
            wavelength_val = data_c.field('loglam')
            list_wavelength.append(wavelength_val)
            
            redshift_val = data_s.field('Z')
            list_redshift.append(redshift_val)
            
            values = {'FLUX': list_fluxarrays, 'CLASS': list_classtype, 'NOISE': list_noise,\
                      'WAVE': list_wavelength, 'REDSHIFT': list_redshift}
            
            hdul.close()
            del hdul['COADD'].data
            del hdul['PRIMARY'].data
            del hdul['SPALL'].data
            del hdul
            
    return values

def sifting_through_data(quasar_flux, quasar_label, star_flux, star_label):
    
    # getting the quasar data
    quasardata = get_filenames("/Users/matt/Desktop/DESI_Research/DESI_ML/good_quasars/", extension='.fits')
    quasar_dict = get_filevalues("/Users/matt/Desktop/DESI_Research/DESI_ML/good_quasars/", quasardata)
    
    # getting the star data
    stardata = get_filenames("/Users/matt/Desktop/DESI_Research/DESI_ML/good_stars/", '.fits')
    star_dict = get_filevalues("/Users/matt/Desktop/DESI_Research/DESI_ML/good_stars/", stardata)
    
    # initial flux & labels for the stars and quasars
    init_quasar_flux = quasar_dict['FLUX']
    init_quasar_labels = quasar_dict['CLASS'] 
    init_star_flux = star_dict['FLUX']
    init_star_labels = star_dict['CLASS'] 
    
    # empty lists to store the fluxs and labels
    quasar_labels = []
    quasar_flux = []
    star_labels = []
    star_flux = []

    # check if the classifier actually came back as QSO and if so append to empty list
    for i in range(len(init_quasar_labels)):
        if init_quasar_labels[i] == 'QSO':
            quasar_labels.append(init_quasar_labels[i])
        else:
            pass
        
    # check if the classifier actually came back as QSO and if so append to empty list, so fluxs have same length
    # as the labels
    for i in range(len(init_quasar_labels)):
        if init_quasar_labels[i] == 'QSO':
            quasar_flux.append(init_quasar_flux[i])
        else:
            pass
    
    # check if the classifier actually came back as star and if so append to empty list
    for i in range(len(init_star_labels)):
        if init_star_labels[i] == 'STAR':
            star_labels.append(init_star_labels[i])
        else:
            pass
    
    # check if the classifier actually came back as star and if so append to empty list, so fluxs have same length
    # as the labels
    for i in range(len(init_star_labels)):
        if init_star_labels[i] == 'STAR':
            star_flux.append(init_star_flux[i])
        else:
            pass
        
    data = {'STAR_FLUX':starflux, 'QUASAR_FLUX':quasar_flux, 'STAR_LABELS':star_labels, 'QUASAR_LABELS':quasar_labels}
    
def randomizing_data(cut):
    
    quasar_flux = quasar_flux[:cut]
    quasar_labels = quasar_labels[:cut]
    len(quasar_flux)
    
    
    star_labels = np.ones(len(star_labels))
    quasar_labels = np.zeros(len(quasar_labels))
    
    input_flux = star_flux + quasar_flux
    input_flux = np.asarray(input_flux)
    input_labels = np.concatenate((star_labels,quasar_labels), axis = 0)
    
    permutation = np.random.permutation(len(input_flux)) # creates the same permutation to be done on flux & labels
    
    randomized_flux = input_flux[permutation] # needs to be array to permute for classification
    randomized_labels = input_labels[permutation]

    randomized_flux = randomized_flux.tolist() # needs to be a list to be used in creating tensor function
    

def preprocessing_data(flux_list, label_list):
    
    # creates list of the lengths of each flux array and put it into a new list
    fluxlen_list = [len(flux_list[i]) for i in range(len(flux_list))]
    
    # cuts the flux length list to be of lengths >= 4550. The input tensor must all be the same value
    cut_fluxlen_list = [i for i in fluxlen_list if i >= 4550]
    
    # gives array of the flux value arrays that are >= 4550 in length 
    filtered_flux_list = np.array(flux_list)[np.array(cut_fluxlen_list)]
    
    # gives label list that correspond to 
    filtered_labels_list = label_list[range(len(cut_fluxlen_list))]
    
    fluxlen_same = [filtered_flux_list[i][:4550] for i in range(len(filtered_flux_list))]
    
    data = {'FLUX': fluxlen_same, 'LABELS': filtered_labels_list}
    
    return data

def creating_input_tensor(samples, height, width, channels, flux_list, label_list):
    
    # creates input tensor of correct dimensions
    input_tensor = np.ones((samples, height, width, channels))
    
    # brings in preprocessed data to input into the dimensions of the tensor
    processed_data = preprocessing_data(flux_list, label_list)
    
    # creating the list of the same length fluxs
    fluxlen_same = processed_data['FLUX']
    
    # putting the length of the fluxs lists into the first axis of the tensor, while filling the 3rd axis
    # with that specific samples flux array
    for i in range(samples):
        spec = fluxlen_same[i]
        input_tensor[i,0,:,0] = spec[:]
        
    data = {'IMAGES': input_tensor, 'LABELS': processed_data['LABELS']}
        
    return data

# def all_data_processing():
    
#     filename_list = get_filenames(path='.', extension=None, pattern=None, identifiers=None, include_path=False)
        
#     values_dict = get_filevalues(path, filename_list)
    
#     data_dict = combining_star_quasar_data(quasar_flux, quasar_label, star_flux, star_label)
    
#     = randomizing_data(cut)

In [4]:
quasardata = get_filenames("/Users/matt/Desktop/DESI_Research/DESI_ML/good_quasars/", extension='.fits')
quasar_dict = get_filevalues("/Users/matt/Desktop/DESI_Research/DESI_ML/good_quasars/", quasardata)

KeyboardInterrupt: 

In [None]:
stardata = get_filenames("/Users/matt/Desktop/DESI_Research/DESI_ML/good_stars/", '.fits')
star_dict = get_filevalues("/Users/matt/Desktop/DESI_Research/DESI_ML/good_stars/", stardata)

In [None]:
init_quasar_flux = quasar_dict['FLUX']
init_quasar_labels = quasar_dict['CLASS'] 
len(init_quasar_flux)

In [None]:
init_star_flux = star_dict['FLUX']
init_star_labels = star_dict['CLASS'] 
len(init_star_flux)

In [None]:
quasar_labels = []
quasar_flux = []
star_labels = []
star_flux = []

for i in range(len(init_quasar_labels)):
    if init_quasar_labels[i] == 'QSO':
        quasar_labels.append(init_quasar_labels[i])
    else:
        pass
    
for i in range(len(init_quasar_labels)):
    if init_quasar_labels[i] == 'QSO':
        quasar_flux.append(init_quasar_flux[i])
    else:
        pass
    
for i in range(len(init_star_labels)):
    if init_star_labels[i] == 'STAR':
        star_labels.append(init_star_labels[i])
    else:
        pass
    
for i in range(len(init_star_labels)):
    if init_star_labels[i] == 'STAR':
        star_flux.append(init_star_flux[i])
    else:
        pass

In [None]:
len(quasar_flux)

In [None]:
quasar_flux = quasar_flux[:3819]
quasar_labels = quasar_labels[:3819]
len(quasar_flux)

In [None]:
len(star_flux)

In [None]:
star_labels = np.ones(len(star_labels))
quasar_labels = np.zeros(len(quasar_labels))

In [None]:
input_flux = star_flux + quasar_flux
input_flux = np.asarray(input_flux)
input_labels = np.concatenate((star_labels,quasar_labels), axis = 0)
len(input_labels)
print(len(input_flux))

In [None]:
max_flux = []

for i in range(len(input_flux)):
    max_flux.append(max(input_flux[i]))

In [None]:
print((max_flux)[0])

In [None]:
permutation = np.random.permutation(len(input_flux)) # creates the same permutation to be done on flux & labels

In [None]:
randomized_flux = input_flux[permutation] # needs to be array to permute for classification
randomized_labels = input_labels[permutation]

randomized_flux = randomized_flux.tolist() # needs to be a list to be used in creating tensor function

In [None]:
len(randomized_flux[0])

In [None]:
data_preproc = preprocessing_data(randomized_flux, randomized_labels)

In [None]:
#len(data_preproc['LABELS'])
print(len(data_preproc['FLUX']))

In [None]:
data = creating_input_tensor(7620, 1, 4550, 1, randomized_flux, randomized_labels)

input_tensor = data['IMAGES']
labels = data['LABELS']


In [None]:
np.shape(labels)

In [None]:
train_images = input_tensor[:4000, :, :, :]
print(np.shape(train_images))
train_labels = labels[:4000]
print(np.shape(train_labels))

val_images = input_tensor[4000:5810, :, :, :]
print(np.shape(val_images))
val_labels = labels[4000:5810]
print(np.shape(val_labels))

test_images = input_tensor[5810:7620, :, :, :]
print(np.shape(test_images))
test_labels = labels[5810:7620]
print(np.shape(test_labels))

In [None]:
kernel_regularizer=regularizers.l2(0.01)

In [None]:
model = keras.Sequential()                                      #input_shape = (height, width, channels)

model.add(keras.layers.Conv2D(32, (1,16),\
                              activation='relu',input_shape=(1,4550,1),data_format='channels_last'))
model.add(keras.layers.MaxPooling2D((1,4), strides=(1,4)))

model.add(keras.layers.Conv2D(64, (1,16),\
                              activation='relu', data_format='channels_last'))
model.add(keras.layers.MaxPooling2D((1,4), strides=(1,4)))

model.add(keras.layers.Conv2D(128, (1,16),\
                              activation='relu', data_format='channels_last'))
model.add(keras.layers.MaxPooling2D((1,4), strides=(1,4)))


model.add(keras.layers.Flatten())
#model.add(keras.layers.Dropout(0.5, seed = 0))
model.add(keras.layers.Dense(units = 128, activation = 'relu'))
model.add(keras.layers.Dense(units = 1, activation = 'sigmoid'))


opt = keras.optimizers.SGD(lr =  2e-5)#, momentum = 0.5)
model.compile(loss='binary_crossentropy', optimizer = opt  , metrics = ['accuracy'])

In [None]:
model.summary()

In [None]:
history = model.fit(train_images,
train_labels,
epochs= 6,
batch_size= 64,
validation_data=(val_images, val_labels))

In [None]:
test_loss, test_acc = model.evaluate(test_images, test_labels)

print('Test loss = ' +str(test_loss))
print('Test accuracy = ' +str(test_acc))

In [None]:
acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(acc) + 1)
plt.plot(epochs, acc, 'bo', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()
plt.figure()
plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()
plt.show()

In [None]:
model.save('CNN_acc_6078.h5')