# Classification based on  magnitude

In [1]:
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score

c =[]
# Read the file
data = pd.read_csv("data/train.csv")

# Evaluate extinction corrected magnitude
u = data['petroMag_u']-data['extinction_u']
r = data['petroMag_r']-data['extinction_r']

# Combine the columns generated with the original data frame
data['u'] = u.values
data['r'] = r.values

for index, row in data.iterrows():
    if (row['u'] - row['r'] > 2.2):
        c.append(0)
    else:
        c.append(1)

# Print the accuracy score
prediction = np.array(c)
correct = np.array(data['moving'])
accuracy_score(correct, prediction)

FileNotFoundError: [Errno 2] File data/train.csv does not exist: 'data/train.csv'

In [None]:
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

%matplotlib inline
font = {'family' : 'serif',
        'size'   : 14}

mpl.rc('font', **font)

# Identify the correct Spirals
xx = data['u'].mask(correct.astype('bool'))
yy = data['r'].mask(correct.astype('bool'))

# Identify the predicted spirals
x = data['u'].mask(prediction.astype('bool'))
y = data['r'].mask(prediction.astype('bool'))
#data.plot.scatter('u','r')
#plt.plot(u, r, 'k.')

fig, ax = plt.subplots(figsize=(10, 7))
ax.get_xaxis().set_minor_locator(mpl.ticker.AutoMinorLocator())
ax.get_yaxis().set_minor_locator(mpl.ticker.AutoMinorLocator())

plt.plot(xx,yy, 'bo', label="Actual")
plt.plot(x, y, 'r+', label="Predicted")
plt.xlabel('u')
plt.ylabel('r')
plt.legend()

# Exercise 2: Support Vector Clustering

In [None]:
from sklearn import svm
import matplotlib

# Define the classifier, where kernel must be one of ‘linear’, ‘poly’, ‘rbf’, ‘sigmoid’ or ‘precomputed’.
clf = svm.SVC(kernel='linear')

# Train the classifier on the training set: size of [1000,2]
train_in = np.column_stack((u.values, r.values))
train_out = correct

clf.fit(train_in, train_out)

In [None]:
# Testing the accuracy of the SVM classifier by plotting the regression line to see if it really
# separates the two types of galaxies on u-r plane.

# Weights assigned to the features (features are 'spiral' and 'elliptical' in our case)
w = clf.coef_[0] 
slope = - w[0] / w[1]

line_x = np.linspace(np.amin(u.values), np.amax(u.values))
line_y = slope * line_x - clf.intercept_[0] / w[1]

svm_regression = 'r = ' + str(round(slope,2)) + r'$\times$u + ' + str(round(-1.0*clf.intercept_[0] / w[1],2))

# Plot the regression lines

fig, ax = plt.subplots(figsize=(10, 7))
ax.get_xaxis().set_minor_locator(mpl.ticker.AutoMinorLocator())
ax.get_yaxis().set_minor_locator(mpl.ticker.AutoMinorLocator())

plt.scatter((train_in[:, 0])[train_out == 0], (train_in[:, 1])[train_out == 0], label='Spirals', alpha=0.3, s = 70)
plt.scatter((train_in[:, 0])[train_out == 1], (train_in[:, 1])[train_out == 1], label='Ellipticals', alpha=0.3, s = 70)

plt.plot(line_x, line_y, 'k-', label="Linear Regression from SVM: " + svm_regression)
plt.plot(line_x, line_x - 2.2, 'm-', label="Linear Regression expected:  r = u - 2.2")

plt.ylim([12,19])
plt.xlabel('u')
plt.ylabel('r')
plt.legend()
plt.show()

In [None]:
# Prediction on training set itself
train_pred = clf.predict(train_in)

# Accuracy score
accuracy_score(train_out, train_pred)

In [None]:
# Prediction on testing set of 250 objects
test_data = pd.read_csv("data/test.csv")

# Evaluate extinction corrected magnitude
test_u = test_data['petroMag_u'] - test_data['extinction_u']
test_r = test_data['petroMag_r'] - test_data['extinction_r']

# Combine the columns generated with the original data frame
test_data['u'] = test_u.values
test_data['r'] = test_r.values

# Testing data
test_in = np.column_stack((test_u.values, test_r.values))
test_pred = clf.predict(test_in)
test_correct = np.array(test_data['moving'])

# Accuracy score
accuracy_score(test_correct, test_pred)

# Exercise 3: Using Neural network for more rigorous training

Train the neural network on different colors for classifying to check if other colors can help.

In [None]:
# Read the files and prepare training and test matrices
color_data = pd.read_csv("data/train_colors.csv")

# Calculate dereddended colors
u_g = color_data['dered_u'] - color_data['dered_g']
g_r = color_data['dered_g'] - color_data['dered_r']
r_i = color_data['dered_r'] - color_data['dered_i']
i_z = color_data['dered_i'] - color_data['dered_z']

# Combine the columns generated with the original data frame
color_data['u_g'] = u_g.values
color_data['g_r'] = g_r.values
color_data['r_i'] = r_i.values
color_data['i_z'] = i_z.values

train_in_colors = np.column_stack((u_g.values, g_r.values, r_i.values, i_z.values))
train_out_colors = np.array(color_data['spiral'])

# Testing data
color_test_data = pd.read_csv("data/test_colors.csv")

# Calculate dereddended colors
tu_g = color_test_data['dered_u'] - color_test_data['dered_g']
tg_r = color_test_data['dered_g'] - color_test_data['dered_r']
tr_i = color_test_data['dered_r'] - color_test_data['dered_i']
ti_z = color_test_data['dered_i'] - color_test_data['dered_z']

# Prepare test matrix and expected output
test_in_colors = np.column_stack((tu_g.values, tg_r.values, tr_i.values, ti_z.values))
test_out_colors = np.array(color_test_data['spiral'])

print("Size of the training matrix: ", np.shape(train_in_colors), np.shape(train_out_colors))
print("Size of the test matrix:     ", np.shape(test_in_colors), np.shape(test_out_colors))

In [None]:
from keras.models import Sequential
from keras.layers import Dense
from sklearn.model_selection import StratifiedKFold
from keras.utils import plot_model
import pydot

# fix random seed for reproducibility
seed = 7
np.random.seed(seed)

# Design a neural network
model = Sequential()
model.add(Dense(8, input_dim=4, activation='relu'))
model.add(Dense(3, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

# Compile model
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()
plot_model(model, to_file='data/model.png', show_shapes=True, show_layer_names=True)

# Fit the model
model_info = model.fit(train_in_colors, train_out_colors, epochs=150, batch_size=10, validation_split=0.3, 
                       shuffle=True, verbose=2)
# evaluate the model
scores = model.evaluate(train_in_colors, train_out_colors)
print("\n%s: %.2f%%" % (model.metrics_names[1], scores[1]*100))

In [None]:
# Test the trained model
test_pred_colors = model.predict(test_in_colors)

# Prediction are in probabilities. Round them to convert into binary sequence
test_pred_colors = np.around(test_pred_colors)

# Print accuracy score
accuracy_score(test_out_colors, test_pred_colors)

In [None]:
# Plot Accuracy and/or Loss as a Function of Number of Epoch

def plot_model_history(model_history):
    fig, axs = plt.subplots(1,2,figsize=(15,5))
    # summarize history for accuracy
    axs[0].plot(range(1,len(model_history.history['acc'])+1),model_history.history['acc'])
    axs[0].plot(range(1,len(model_history.history['val_acc'])+1),model_history.history['val_acc'])
    axs[0].set_title('Model Accuracy')
    axs[0].set_ylabel('Accuracy')
    axs[0].set_xlabel('Epoch')
    axs[0].get_xaxis().set_minor_locator(mpl.ticker.AutoMinorLocator())
    axs[0].get_yaxis().set_minor_locator(mpl.ticker.AutoMinorLocator())
    axs[0].legend(['train', 'val'], loc='best')
    # axs[0].tick_params(top=True, right=True, labeltop=True, labelright=True, which='both')
    
    # summarize history for loss
    axs[1].plot(range(1,len(model_history.history['loss'])+1),model_history.history['loss'])
    axs[1].plot(range(1,len(model_history.history['val_loss'])+1),model_history.history['val_loss'])
    axs[1].set_title('Model Loss')
    axs[1].set_ylabel('Loss')
    axs[1].set_xlabel('Epoch')
    axs[1].get_xaxis().set_minor_locator(mpl.ticker.AutoMinorLocator())
    axs[1].get_yaxis().set_minor_locator(mpl.ticker.AutoMinorLocator())
    axs[1].legend(['train', 'val'], loc='best')
    plt.show()
    
plot_model_history(model_info)

In [None]:
# Use 10-fold cross validation for shuffling the validation sample
kfold = StratifiedKFold(n_splits=10, shuffle=True, random_state=seed)
cvscores = []
for train, test in kfold.split(train_in_colors, train_out_colors):
  # create model
    model = Sequential()
    model.add(Dense(8, input_dim=4, activation='relu'))
    model.add(Dense(3, activation='relu'))
    model.add(Dense(1, activation='sigmoid'))
    # Compile model
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    # Fit the model
    model.fit(train_in_colors[train], train_out_colors[train], epochs=150, batch_size=10, verbose=0)
    # evaluate the model
    scores = model.evaluate(train_in_colors[test], train_out_colors[test], verbose=0)
    print("%s: %.2f%%" % (model.metrics_names[1], scores[1]*100))
    cvscores.append(scores[1] * 100)
print("%.2f%% (+/- %.2f%%)" % (np.mean(cvscores), np.std(cvscores)))

In [None]:
# Test the trained model
test_pred_colors = model.predict(test_in_colors)

# Prediction are in probabilities. Round them to convert into binary sequence
test_pred_colors = np.around(test_pred_colors)

# Print accuracy score
accuracy_score(test_out_colors, test_pred_colors)

# Exercise 4: Using Convolutional based classification

In [None]:
# We are using keras framework with tensorflow as the backend
# Convolutional Neural Networks are multi-layer neural networks that assume the input data to be images.

# Sequential is simply a linear stack of neural network layers, and it's perfect for the type of 
# feed-forward CNN
from keras.models import Sequential

# Import the core layers from Keras which are used in almost any neural network
from keras.layers import Dense, Flatten

# Import CNN layers from Keras that will help us efficiently train on image data
from keras.layers import Conv2D, MaxPooling2D

# Helps to preprocess the images
from keras.preprocessing.image import ImageDataGenerator

In [None]:
# Create your model
model = Sequential()

# Add first convolutional layer. A full-color image with all 3 RGB channels will have a depth of 3.
model.add(Conv2D(32, (3, 3), input_shape = (64, 64, 3), activation = 'relu'))

# Define the maxpooling layer
model.add(MaxPooling2D(pool_size=(2, 2), data_format="channels_first"))

# Layer which Flatten the data
model.add(Flatten())

# Add more layers to the network
model.add(Dense(units = 128, activation = 'relu'))
model.add(Dense(units = 1, activation = 'sigmoid'))

# We need to compile the model. Declare the loss function and the optimizer (SGD, Adam, etc.).
model.compile(optimizer = 'adam', loss = 'binary_crossentropy', metrics = ['accuracy'])

In [None]:
# Preprocess the images
train_datagen = ImageDataGenerator(rescale = 1./255, shear_range = 0.2, zoom_range = 0.2, horizontal_flip = True)
test_datagen = ImageDataGenerator(rescale = 1./255)

# Define the training and test set
training_set = train_datagen.flow_from_directory('data/Training', target_size = (64, 64), batch_size = 32, class_mode = 'binary')
test_set = test_datagen.flow_from_directory('data/Testing',target_size = (64, 64), batch_size = 32, class_mode = 'binary')

In [None]:
# Train your model
model.fit_generator(training_set, steps_per_epoch = 8000, epochs = 2, validation_data = test_set, validation_steps = 2000)