In [None]:
import sys
if "google.colab" in sys.modules:
    print("Colab Detected")
    import tensorflow as tf
    gpus = tf.config.list_physical_devices("GPU")
    if not gpus:
        raise RuntimeError("Nessuna GPU trovata.Controlla di aver selezionato il runtime giusto.")
    else:
        print(f"Trovate {len(gpus)} GPU:\n{gpus}")
    
    !git clone https://github.com/AtomicDonuts/Progetto_Computings.git
    %cd Progetto_Computings/
    !pip install -r requirements.txt
    !python3 fits_import/fits2csv.py
    
    sys.path.append("imports/")
    import custom_variables as custom_paths
else:
    print("Local Machine Detected")
    sys.path.append("../imports/")
    import custom_variables as custom_paths

In [None]:
from keras.layers import Dense, Input, Concatenate,Flatten, Dropout
from keras.models import Model
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.utils import plot_model

In [None]:
df = pd.read_csv(custom_paths.csv_path)
df = df[(df["CLASS_GENERIC"] == "AGN") | (df["CLASS_GENERIC"] == "Pulsar")]
print(len(df))

In [None]:
flux_band = np.array([[f"Flux_Band_{i}", f"Sqrt_TS_Band_{i}"] for i in range(8)])
flux_hist = np.array([[f"Flux_History_{i}", f"Sqrt_TS_History_{i}"] for i in range(14)])
norm_cols = np.array( ([[f"Flux_Band_{i}", f"Sqrt_TS_Band_{i}"] for i in range(8)] + [[f"Flux_History_{i}", f"Sqrt_TS_History_{i}"] for i in range(14)])).flatten()

In [None]:
drop = df[norm_cols][np.array(np.isinf(df[norm_cols]).any(axis=1))].index
if len(drop):
    df = df.drop(drop)

In [None]:
scaler = StandardScaler()
scaler.fit(df[norm_cols])
scaled_data = scaler.transform(df[norm_cols])
df[norm_cols] = scaled_data

In [None]:
flux_band = np.dstack((df[flux_band[:, 0]].to_numpy(), df[flux_band[:, 1]].to_numpy()))
flux_hist = np.dstack((df[flux_hist[:, 0]].to_numpy(), df[flux_hist[:, 1]].to_numpy()))
print(f"Flux_Band Size: {flux_band.shape}")
print(f"Flux_History Size: {flux_hist.shape}")

In [None]:
# La logica delle label nel paper non è spiegato bene, dall'output suppongo sia così
# Label = [ProbAGN,ProbPSR]
# Per AGN Label = [1,0]
# Per PSR Label = [0,1]

is_agn = df["CLASS_GENERIC"].to_numpy() == "AGN"
is_psr = df["CLASS_GENERIC"].to_numpy() == "Pulsar"
labels = np.zeros((len(df), 2), dtype=int)
labels[is_agn, 0] = 1
labels[is_psr, 1] = 1

In [None]:
model_name = "Modello_DNN_Paper_Modificato"

inputs = Input(shape=flux_band.shape[1:],name= "Input_flux_band")
hidden = Flatten(name = "Flatten_flux_band")(inputs)
hidden = Dense(32, activation="relu", name = "Dense_flux_band")(hidden)
hidden = Dropout(0.3, name = "Dropout_flux_band")(hidden)
hidden = Dense(16, activation="relu", name="Dense2_flux_band")(hidden)

inputs2 = Input(shape=flux_hist.shape[1:], name="Input_flux_hist")
hidden2 = Flatten(name="Flatten_flux_hist")(inputs2)
hidden2 = Dense(32, activation="relu", name="Dense_flux_hist")(hidden2)
hidden2 = Dropout(0.3, name="Dropout_flux_hist")(hidden2)
hidden2 = Dense(16, activation="relu", name="Dense2_flux_hist")(hidden2)

concat = Concatenate(name = "Concatenate")([hidden2, hidden])
concat = Dropout(0.7, name = "Dropout_concat")(concat)
final_hidden = Dense(16, activation="relu", name="Dense_concat")(concat)
final_hidden = Dense(4, activation="relu",name = "Dense_final")(final_hidden)
outputs = Dense(2, activation="softmax", name = "Output")(final_hidden)

model_paper = Model(inputs=[inputs, inputs2], outputs=outputs, name=model_name)
model_paper.compile(loss="categorical_crossentropy", optimizer="adam")
model_paper.optimizer.learning_rate = 0.01

model_paper.summary()
plot_model(model_paper, to_file= f"{model_name}.png", show_shapes=True, show_layer_names=True)

In [None]:
history = model_paper.fit(
    [flux_band, flux_hist],
    labels,
    batch_size=64,
    validation_split=0.5,
    epochs=150,
)

In [None]:
plt.plot(history.history["loss"])
plt.plot(history.history["val_loss"])
plt.show()

In [None]:
predictions = model_paper.predict([flux_band, flux_hist])

In [None]:
mask_predicted_agn = (predictions[:, 0] > 0.6) & (predictions[:, 1] < 0.1)
mask_predicted_psr = (predictions[:, 0] < 0.1) & (predictions[:, 1] > 0.6)
mat_h = np.vstack([is_agn, is_psr])
mat_v = np.array([mask_predicted_agn, mask_predicted_psr])
# 2 ore per creare sta linea di codice di merda
mat_vectorized = mat_h[:, None, :] & mat_v[None, :, :]
confusion_matrix = mat_vectorized.sum(axis=2)

In [None]:
TOT = len(predictions)
TN = confusion_matrix[0,0]
FP = confusion_matrix[0,1]
FN = confusion_matrix[1,0]
TP = confusion_matrix[1,1]
print(
    f"Data under the cutoff: {TOT - (TN + FP + FN + TP)} ie {np.round(((TOT - (TN + FP + FN + TP))/TOT) * 100,2)}%"
)
print(f"Accurcy: {(TP + TN) / (TN + FP + FN + TP)}")
print(f"F1 Score: {TP/(TP + 0.5*(FP+FN))}")