In [4]:
import numpy as np
import pandas as pd

In [5]:
import os

print(os.listdir("."))


['sdss-gs.tgz', '.DS_Store', 'spectra', 'extension.ipynb', 'data.csv', 'final.ipynb', 'project_notebook_updated.ipynb', '.gitattributes', 'explore_data_analysis.ipynb', 'sdss-gs', '.ipynb_checkpoints', 'final_copy.ipynb', '.git']


In [6]:
import tarfile

tgz_path = 'sdss-gs.tgz'

csv_path_in_archive = 'sdss-gs/data.csv'

with tarfile.open(tgz_path, 'r:gz') as tar:
    f = tar.extractfile(csv_path_in_archive)
    df = pd.read_csv(f)

print(df.head())


                 objid    mjd  plate   tile  fiberid   run  rerun  camcol  \
0  1237674649924469005  51612    280    108      187  6793    301       3   
1  1237652943170371605  52262    743    523       38  1739    301       3   
2  1237663787951653251  56326   6374  15286      516  4264    301       3   
3  1237658803103531163  56605   6668  15549      640  3103    301       6   
4  1237678892819808615  56596   6606  15223      492  7781    301       2   

   field         ra  ...  modelMag_i modelMag_z  redshift   stellarmass  \
0     65  170.36352  ...    16.42507   16.06842  0.100474  5.198358e+10   
1    227  348.34067  ...    15.30176   14.93129  0.039402  2.520189e+10   
2     57  113.34011  ...    18.88552   18.46291  0.278570  8.518087e+10   
3     31  153.60405  ...    19.07585   18.92487  0.172246  4.401721e+10   
4    134   31.91200  ...    17.29700   16.91050  0.175692  1.768276e+10   

    w1mag   w2mag   w3mag  w4mag  gz2c_f  gz2c_s  
0  13.423  12.911   9.871  7.613   

In [14]:
from scipy.interpolate import interp1d

# Define a common wavelength grid (example: 1000 points from min to max)
all_wavelengths = []
for objid in df['objid']:
    data = np.loadtxt(f'./spectra/{objid}.csv', delimiter=',', skiprows=1, usecols=(0, 1))
    wavelength = data[:, 0]
    all_wavelengths.extend(wavelength)
common_min, common_max = min(all_wavelengths), max(all_wavelengths)
common_grid = np.linspace(common_min, common_max, 1000)

X_list_interp = []
for objid in df['objid']:
    data = np.loadtxt(f'./spectra/{objid}.csv', delimiter=',', skiprows=1, usecols=(0, 1))
    wavelength, flux = data[:, 0], data[:, 1]
    # Create an interpolation function
    interp_func = interp1d(wavelength, flux, bounds_error=False, fill_value="extrapolate")
    # Interpolate to the common grid
    flux_interp = interp_func(common_grid)
    X_list_interp.append(flux_interp)
X = np.array(X_list_interp)


le = LabelEncoder()
y_encoded = le.fit_transform(df['class'].values)  # e.g. Elliptical -> 0, Spiral -> 1

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y_encoded, test_size=0.2, stratify=y_encoded, random_state=42
)
print("Dataset shapes:")
print("X_train:", X_train.shape, " | X_test:", X_test.shape)

Dataset shapes:
X_train: (63654, 1000)  | X_test: (15914, 1000)


In [15]:
from sklearn.decomposition import PCA, FastICA, NMF

# Number of components for dimensionality reduction
n_components = 40

# 1. Principal Component Analysis (PCA)
pca = PCA(n_components=n_components, random_state=42)
X_train_pca = pca.fit_transform(X_train)
X_test_pca = pca.transform(X_test)
print("PCA explained variance (50 components):",
      f"{100*pca.explained_variance_ratio_.sum():.2f}%")

# 2. Independent Component Analysis (ICA)
ica = FastICA(n_components=n_components, random_state=42)
X_train_ica = ica.fit_transform(X_train)
X_test_ica = ica.transform(X_test)

# 3. Non-Negative Matrix Factorization (NMF)
# Ensure no negative values (NMF requires non-negative data)
X_train_nmf = np.clip(X_train, a_min=0, a_max=None)
X_test_nmf = np.clip(X_test, a_min=0, a_max=None)
nmf = NMF(n_components=n_components, init='random', random_state=42)
X_train_nmf = nmf.fit_transform(X_train_nmf)
X_test_nmf = nmf.transform(X_test_nmf)

print("Original feature dimension:", X_train.shape[1])
print("Reduced feature dimension:", X_train_pca.shape[1])


PCA explained variance (50 components): 99.99%




Original feature dimension: 1000
Reduced feature dimension: 40




In [11]:
!pip install tensorflow



In [16]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense

# Define a function to create a CNN model for a given input length
def make_cnn(input_length, num_classes=2):
    model = Sequential([
        Conv1D(filters=16, kernel_size=3, activation='relu', input_shape=(input_length, 1)),
        MaxPooling1D(pool_size=2),
        Conv1D(filters=32, kernel_size=3, activation='relu'),
        MaxPooling1D(pool_size=2),
        Flatten(),
        Dense(64, activation='relu'),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Prepare data shapes for CNN (add channel dimension)
X_train_full = X_train[..., np.newaxis]   # shape (n_samples, n_wavelengths, 1)
X_test_full = X_test[..., np.newaxis,]
X_train_pca_cnn = X_train_pca[..., np.newaxis]   # shape (n_samples, 50, 1)
X_test_pca_cnn = X_test_pca[..., np.newaxis]
X_train_ica_cnn = X_train_ica[..., np.newaxis]
X_test_ica_cnn = X_test_ica[..., np.newaxis]
X_train_nmf_cnn = X_train_nmf[..., np.newaxis]
X_test_nmf_cnn = X_test_nmf[..., np.newaxis]

# Build CNN models for each input scenario
model_full = make_cnn(input_length=X_train_full.shape[1])
model_pca  = make_cnn(input_length=X_train_pca_cnn.shape[1])
model_ica  = make_cnn(input_length=X_train_ica_cnn.shape[1])
model_nmf  = make_cnn(input_length=X_train_nmf_cnn.shape[1])

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [17]:
# Train the CNN on full spectra
history_full = model_full.fit(X_train_full, y_train, epochs=20, batch_size=32, 
                              validation_split=0.1, verbose=0)
# Evaluate on test set
loss_full, acc_full = model_full.evaluate(X_test_full, y_test, verbose=0)

# Train the CNN on PCA-reduced data
history_pca = model_pca.fit(X_train_pca_cnn, y_train, epochs=20, batch_size=32, 
                            validation_split=0.1, verbose=0)
loss_pca, acc_pca = model_pca.evaluate(X_test_pca_cnn, y_test, verbose=0)

# Train the CNN on ICA-reduced data
history_ica = model_ica.fit(X_train_ica_cnn, y_train, epochs=20, batch_size=32, 
                            validation_split=0.1, verbose=0)
loss_ica, acc_ica = model_ica.evaluate(X_test_ica_cnn, y_test, verbose=0)

# Train the CNN on NMF-reduced data
history_nmf = model_nmf.fit(X_train_nmf_cnn, y_train, epochs=20, batch_size=32, 
                            validation_split=0.1, verbose=0)
loss_nmf, acc_nmf = model_nmf.evaluate(X_test_nmf_cnn, y_test, verbose=0)

print(f"Test Accuracy (Full spectra): {acc_full:.3f}")
print(f"Test Accuracy (PCA 50 comps): {acc_pca:.3f}")
print(f"Test Accuracy (ICA 50 comps): {acc_ica:.3f}")
print(f"Test Accuracy (NMF 50 comps): {acc_nmf:.3f}")


Test Accuracy (Full spectra): 1.000
Test Accuracy (PCA 50 comps): 1.000
Test Accuracy (ICA 50 comps): 1.000
Test Accuracy (NMF 50 comps): 1.000
