In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
# from tensorflow.keras.models import Sequential
from keras.layers import Input, Conv2D, Conv1D, Flatten, Dense, Dropout, concatenate
# from tensorflow.keras.optimizers import Adam
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from keras.layers import Input, Conv1D, MaxPooling1D, Flatten, Dense, Dropout, concatenate
from keras.models import Model
from keras.optimizers import Adam
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np
import cv2
import os

In [2]:
# Define the input shapes
image_input_shape = (512, 512, 3)  # Define your image input shape
f107_input_shape = (1, 1)  # Define your F10.7 index input shape

# Example function to load and preprocess image data
def preprocess_image(file_path):
    # Load image using OpenCV (or any other library)
    image = cv2.imread(file_path)
    # Resize image to desired dimensions
    resized_image = cv2.resize(image, (512, 512))
    # Normalize pixel values to [0, 1] range
    normalized_image = resized_image / 255.0
    return normalized_image

# Example: Load and preprocess image data for training
X_train_image = []
directory_path = "../../data/images/0335-010122-010222"

for filename in os.listdir(directory_path):
    file_path = os.path.join(directory_path, filename)
    preprocessed_image = preprocess_image(file_path)
    X_train_image.append(preprocessed_image)

# Convert list of images to NumPy array
X_train_image = np.array(X_train_image)

data = pd.read_csv("../../data/F10.7.csv")  # Replace "your_data.csv" with the path to your data file
# Preprocess the data
X = data[['Julian day', 'Carringtonrotation', 'Observed Flux', 'Adjusted Flux', 'URSI Flux']].values
y = data['Adjusted Flux'].values  # Adjusted Flux will be our target variable
# Normalize features
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)
# Split the data into training and testing sets
X_train_f107, X_test_f107, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)
# Reshape data for CNN input (assuming 1D convolution)
X_train_f107 = X_train_f107.reshape((X_train_f107.shape[0], X_train_f107.shape[1], 1))
X_test_f107 = X_test_f107.reshape((X_test_f107.shape[0], X_test_f107.shape[1], 1))

In [3]:
print("Image Data:")
print("X_train_image shape:", X_train_image.shape)

print("\nF10.7 Index Data:")
print("X_train_f107 shape:", X_train_f107.shape)
print("X_test_f107 shape:", X_test_f107.shape)
print("y_train shape:", y_train.shape)
print("y_test shape:", y_test.shape)

Image Data:
X_train_image shape: (308, 512, 512, 3)

F10.7 Index Data:
X_train_f107 shape: (16572, 5, 1)
X_test_f107 shape: (4143, 5, 1)
y_train shape: (16572,)
y_test shape: (4143,)


In [None]:
# Define image branch
image_input = Input(shape=image_input_shape)
conv1_img = Conv2D(filters=64, kernel_size=(3, 3), activation='relu')(image_input)
pool1_img = MaxPooling2D(pool_size=(2, 2))(conv1_img)
conv2_img = Conv2D(filters=128, kernel_size=(3, 3), activation='relu')(pool1_img)
pool2_img = MaxPooling2D(pool_size=(2, 2))(conv2_img)
flatten_img = Flatten()(pool2_img)

# Define F10.7 index branch
f107_input = Input(shape=f107_input_shape)
conv1d_1_f107 = Conv1D(filters=64, kernel_size=3, activation='relu', padding='same')(f107_input)
pooling1d_1_f107 = MaxPooling1D(pool_size=2)(conv1d_1_f107)
conv1d_2_f107 = Conv1D(filters=128, kernel_size=3, activation='relu', padding='same')(pooling1d_1_f107)
flatten_f107 = Flatten()(conv1d_2_f107)

# Concatenate the outputs of the two branches
concatenated = concatenate([flatten_img, flatten_f107])

# Common dense layers
dense1 = Dense(128, activation='relu')(concatenated)
dropout1 = Dropout(0.5)(dense1)
output = Dense(1)(dropout1)  # Output layer with one neuron for regression

# Define the model with two inputs
model = Model(inputs=[image_input, f107_input], outputs=output)

: 

In [None]:
# Compile the model
model.compile(optimizer=Adam(), loss='mse')  # Using Mean Squared Error as loss function

# Train the model with both image and F10.7 index data
model.fit([X_train_image, X_train_f107], y_train, epochs=10, batch_size=32, validation_data=([X_test_f107], y_test))

# Evaluate the model using only F10.7 index data
loss = model.evaluate(X_test_f107, y_test)
print("Test Loss (F10.7 index data only):", loss)

# Predictions using only F10.7 index data
y_pred = model.predict(X_test_f107)


# Calculate additional metrics
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
rmse = np.sqrt(mse)

print("Mean Squared Error (MSE):", mse)
print("Mean Absolute Error (MAE):", mae)
print("R-squared (R2):", r2)
print("Root Mean Squared Error (RMSE):", rmse)