In [1]:
import os
import random
import numpy as np
from PIL import Image
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import mean_squared_error, r2_score
from collections import defaultdict
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import xgboost as xgb
from sklearn.model_selection import learning_curve
import matplotlib.pyplot as plt


def convert_img_grayscale(df: pd.DataFrame) -> pd.DataFrame:
    """
    Converts the images specified in the 'img_path' column to grayscale, resizes them to 64x64.
    Returns a DataFrame with 'uuid' and the flattened pixel values.
    
    Parameters:
        df (pd.DataFrame): DataFrame with columns 'uuid' and 'img_path'.
    
    Returns:
        pd.DataFrame: A new DataFrame with 'uuid' and columns for each pixel in the image.
    """
    # List to store uuid and flattened image data
    uuid_list = []
    pixel_values_list = []

    # Loop through each row in the DataFrame
    for _, row in df.iterrows():
        img_path = row['img_path']
        uuid = row['uuid']

        try:
            # Open the image using Pillow
            with Image.open(img_path) as img:
                # Resize the image to 64x64
                img_resized = img.resize((64, 64))

                # Convert the image to grayscale
                img_gray = img_resized.convert("L")  # 'L' mode stands for grayscale

                # img_gray.show()
                # Convert the grayscale image to a numpy array
                img_array = np.array(img_gray)

                # Flatten the 2D array to 1D
                img_flattened = img_array.flatten()

                # Append the results to the lists
                uuid_list.append(uuid)
                pixel_values_list.append(img_flattened)
        except Exception as e:
            print(f"Error processing image {img_path}: {e}")
            # If error occurs, add None for the image
            uuid_list.append(uuid)
            pixel_values_list.append([None] * 4096)  # Assuming a 64x64 image (4096 pixels)

    # Create a new DataFrame with 'uuid' and the flattened image data
    pixel_columns = [f'pixel_{i + 1}' for i in range(4096)]  # 64x64 = 4096 pixels
    result_df = pd.DataFrame(pixel_values_list, columns=pixel_columns)
    result_df['uuid'] = uuid_list  # Add 'uuid' column at the end

    # Reorder columns to have 'uuid' as the first column
    result_df = result_df[['uuid'] + pixel_columns]

    return result_df


def preprocess_img_file(image_folder: str, image_files: list, resize: bool = False) -> pd.DataFrame:
    # Use a defaultdict to group files by prefix
    prefix_dict = defaultdict(list)
    unique_img_dic = {}

    # create a uuid dictionary with uuid as key and it's otolith images as values
    for image_file in image_files:
        # Extract the UUID from the image filename
        uuid = image_file.split('_')[0]  # Get the part before "_"

        prefix_dict[uuid].append(image_file)

    for uuid, images in prefix_dict.items():
        # Get the first image for each prefix
        if images:
            first_image = images[0]

            # Construct the full image path
            if uuid not in unique_img_dic:
                image_path = os.path.join(image_folder, first_image)
                unique_img_dic[uuid] = image_path

    unique_img_df = pd.DataFrame(list(unique_img_dic.items()), columns=['uuid', 'img_path'])

    gs_img = convert_img_grayscale(unique_img_df)

    return gs_img


In [6]:
# Path to your image folder
image_folder = os.path.join("..", "data", "images")

# 1. List all image files in the folder
image_files = [f for f in os.listdir(image_folder) if f.endswith('.jpg') or f.endswith('.png')]

# 2. Randomly select 1000 images from the folder
# sampled_images = random.sample(image_files, 1000)

# 3. read meta data
meta_data = pd.read_csv(os.path.join("..", "data", "metadata.csv"))

# 4.preprocess the images: randomly select the first image of each otolith and convert to grayscale
gs_img = preprocess_img_file(image_folder, image_files)

# 5.join the grayscale dataframe with the metadata by uuid
merged_df = pd.merge(meta_data, gs_img, on='uuid', how='left')

# 6.Separate features (X) and target (y)
pixel_columns = [f'pixel_{i + 1}' for i in range(4096)]
# cat_vars = ['is_male', 'is_female', 'is_unknown', 'is_plaice', 'is_herring']
cat_vars = ['is_plaice', 'is_herring']
# num_vars = ['length', 'weight', 'month'] + pixel_columns
num_vars = pixel_columns

feature_cols = cat_vars + num_vars

X = merged_df[feature_cols]
y = merged_df['age']  # Target variable

# 7.Create a ColumnTransformer
# Numerical Features: We'll scale them using StandardScaler.
# Categorical Features: We'll encode them using OneHotEncoder.
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), num_vars),  # Standard scaling for numerical features
    ])
# 8.Fit and transform the dateset (Standard scaling for numerical features and One-hot encoding for categorical features)
X = preprocessor.fit_transform(X)



In [7]:
# 9.select first 6000 instances for train and test
X_train = X[:6000]
y_train = y[:6000]

# select remaining 1828 instances for validation
X_val = X[6000:]
y_val = y[6000:]

# 10. Split the dataset into training and testing sets
X_train_sub, X_test, y_train_sub, y_test = train_test_split(X_train, y_train, test_size=0.2, random_state=42)



RandomForestRegressor

In [8]:
# 1. Train a Random Forest Regressor
rf = RandomForestRegressor(n_estimators=50, random_state=42)

rf.fit(X_train_sub, y_train_sub)
y_pred = rf.predict(X_test)
# 11. Evaluate the model
mse = mean_squared_error(y_test, y_pred)
rmse = mse ** 0.5
r2 = r2_score(y_test, y_pred)

print(f"Mean Squared Error (MSE): {mse:.2f}")
print(f'Root Mean Squared Error: {rmse}')
print(f"R-squared (R²): {r2:.2f}")

Mean Squared Error (MSE): 4.79
Root Mean Squared Error: 2.187610340074301
R-squared (R²): 0.52


In [9]:
# 2. Evaluate the model by the remaining 1828 instances
y_val_pred = rf.predict(X_val)
mse = mean_squared_error(y_val, y_val_pred)
rmse = mse ** 0.5
r2 = r2_score(y_val, y_val_pred)

print(f'Mean Squared Error: {mse}')
print(f'Root Mean Squared Error: {rmse}')
print(f"R-squared (R²): {r2:.2f}")

Mean Squared Error: 3.31972932166302
Root Mean Squared Error: 1.8220124372964692
R-squared (R²): 0.28


XGboost

In [10]:
# 1. Train a Random Forest Regressor
dtrain = xgb.DMatrix(X_train_sub, label=y_train_sub)
dtest = xgb.DMatrix(X_test, label=y_test)

params = {
    'objective': 'reg:squarederror',  # Regression objective using squared error
    'max_depth': 6,  # Depth of the trees
    'learning_rate': 0.1,  # Step size (shrinkage)
    'eval_metric': 'rmse'  # Evaluation metric - RMSE (Root Mean Squared Error)
}
xgb_model = xgb.train(params, dtrain, num_boost_round=50)
y_pred = xgb_model.predict(dtest)

mse = mean_squared_error(y_test, y_pred)
rmse = mse ** 0.5
r2 = r2_score(y_test, y_pred)

print(f'Mean Squared Error: {mse}')
print(f'Root Mean Squared Error: {rmse}')
print(f"R-squared (R²): {r2:.2f}")

Mean Squared Error: 4.161688090761637
Root Mean Squared Error: 2.040021590758695
R-squared (R²): 0.58


In [11]:
# 2. Evaluate the model by the remaining 1828 instances
dval = xgb.DMatrix(X_val, label=y_val)
y_val_xgb_pred = xgb_model.predict(dval)

mse = mean_squared_error(y_val, y_val_xgb_pred)
rmse = mse ** 0.5
r2 = r2_score(y_val, y_val_xgb_pred)

print(f'Mean Squared Error: {mse}')
print(f'Root Mean Squared Error: {rmse}')
print(f"R-squared (R²): {r2:.2f}")

Mean Squared Error: 2.981805855634052
Root Mean Squared Error: 1.7267906229864847
R-squared (R²): 0.35


In [12]:
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# X_train_sub, _, y_train_sub, _ = train_test_split(X_train, y_train, train_size=0.8, random_state=42)
dtrain = xgb.DMatrix(X_train_sub, label=y_train_sub)
dtest = xgb.DMatrix(X_test, label=y_test)

# Instantiate XGBoost regressor
xg_reg = xgb.XGBRegressor(objective='reg:squarederror')

param = {
    'objective': 'reg:squarederror',  # Regression task
    'eval_metric': 'rmse',  # Root Mean Squared Error for evaluation
    'max_depth': 6,  # Maximum depth of the trees
    'learning_rate': 0.1,  # Learning rate
}

# Perform cross-validation to find the best number of boosting rounds
cv_results = xgb.cv(
    params=param,  # Hyperparameters
    dtrain=dtrain,  # Training data
    num_boost_round=100,  # Maximum number of boosting rounds
    nfold=3,  # 5-fold cross-validation
    metrics="rmse",  # Evaluation metric (RMSE)
    early_stopping_rounds=15,  # Stop if no improvement after 10 rounds
    as_pandas=True,  # Return results as a pandas DataFrame
    seed=42  # Random seed for reproducibility
)

# Get the best number of boosting rounds (the best iteration)
best_num_boost_round = cv_results['test-rmse-mean'].idxmin()

# Now train the final model using the best number of boosting rounds
final_model = xgb.train(
    params=param,
    dtrain=dtrain,
    num_boost_round=best_num_boost_round
)

y_pred = final_model.predict(dtest)

mse = mean_squared_error(y_test, y_pred)
rmse = mse ** 0.5
r2 = r2_score(y_test, y_pred)

print(f'Mean Squared Error: {mse}')
print(f'Root Mean Squared Error: {rmse}')
print(f"R-squared (R²): {r2:.2f}")

Mean Squared Error: 3.8582654849831313
Root Mean Squared Error: 1.9642467983894314
R-squared (R²): 0.61


In [None]:
from sklearn.ensemble import GradientBoostingRegressor

# Create and train a Gradient Boosting Regressor
model = GradientBoostingRegressor(n_estimators=100)
model.fit(X_train_sub, y_train_sub)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
rmse = mse ** 0.5
r2 = r2_score(y_test, y_pred)

print(f'Mean Squared Error: {mse}')
print(f'Root Mean Squared Error: {rmse}')
print(f"R-squared (R²): {r2:.2f}")

In [None]:
from sklearn.ensemble import StackingRegressor
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.svm import SVR

# Create base models
base_learners = [
    ('svr', SVR()),
    ('dt', DecisionTreeRegressor())
]

# Create a meta-model
meta_model = LinearRegression()

# Create and train the Stacking Regressor
model = StackingRegressor(estimators=base_learners, final_estimator=meta_model)
model.fit(X_train_sub, y_train_sub)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
rmse = mse ** 0.5
r2 = r2_score(y_test, y_pred)
print(f'Mean Squared Error: {mse}')
print(f'Root Mean Squared Error: {rmse}')
print(f"R-squared (R²): {r2:.2f}")

In [None]:
def plot_learning_curve(X_train, X_test, y_train, y_test):
    train_sizes = [0.4, 0.6, 0.8, 1]  # Training sizes from 10% to 100% of the training data
    train_scores = []
    test_scores = []
    cv_results_dic = {}
    for train_size in train_sizes:
        # Create a subset of the training data based on the current train_size
        X_train_sub, _, y_train_sub, _ = train_test_split(X_train, y_train, train_size=train_size, random_state=42)
        dtrain = xgb.DMatrix(X_train_sub, label=y_train_sub)
        dtest = xgb.DMatrix(X_test, label=y_test)

        param = {
            'objective': 'reg:squarederror',  # Regression objective using squared error
            'max_depth': 6,  # Depth of the trees
            'learning_rate': 0.1,  # Step size (shrinkage)
            'eval_metric': 'rmse'  # Evaluation metric - RMSE (Root Mean Squared Error)
        }

        # Step 5: Perform k-fold cross-validation using xgb.cv

        print(f"Best number of boosting rounds: {best_num_boost_round}")
        # Step 8: Train the model with the best boosting rounds
        bst = xgb.train(param, dtrain, num_boost_round=best_num_boost_round)

        # Train error
        train_pred = bst.predict(dtrain)
        train_score = mean_squared_error(y_train_sub, train_pred)

        # Validation error
        val_pred = bst.predict(dtest)
        test_score = mean_squared_error(y_test, val_pred)

        # Append the errors for plotting
        train_scores.append(train_score)
        test_scores.append(test_score)

    return train_scores, test_scores, cv_results_dic


# Call the function to plot the learning curve
train_scores, test_scores, cv_results = plot_learning_curve(X_train, X_test, y_train, y_test)
train_sizes = [0.4, 0.6, 0.8, 1]  # Training sizes from 10% to 100% of the training data

# Plot the learning curve
plt.figure(figsize=(8, 6))
plt.plot(train_sizes, train_scores, label='Training score', color='blue')
plt.plot(train_sizes, test_scores, label='validation score', color='green')
# Set y-axis limits from 0 to 20
plt.ylim(0, 5)
# Show the ticks and their labels on y-axis (you can adjust the tick range based on data)
plt.yticks(np.arange(0, 5, 0.2))  # Show ticks from 0 to 20 with a step of 2

# Labels and title
plt.title('Learning Curve for XGBoost Model')
plt.xlabel('Training Set Size')
plt.ylabel('Mean Squared Error')
plt.legend(loc='best')
plt.grid(True)
plt.show()

CNN- Below is an example of building and training a CNN model for a regression model using TensorFlow and Keras.

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping

X = merged_df[pixel_columns]
y = merged_df['age'] + merged_df['month']  # Target variable

X_train = X[:6000]
y_train = y[:6000]

# select remaining 1828 instances for final testing
X_val = X[6000:]
y_val = y[6000:]

# Split data into training and testing sets
X_train_sub, X_test, y_train_sub, y_test = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

# Normalize the image data (important for CNNs)
X_train_sub = X_train_sub / 255.0
X_test = X_test / 255.0
X_val = X_val / 255.0

# Build the CNN model for regression
model = Sequential()

# Reshape the data to add the channel dimension (28x28 images with 1 channel for grayscale)
height = 64
width = 64
channels = 1
X_train_sub = X_train_sub.values.reshape(X_train_sub.shape[0], height, width, channels)
X_test = X_test.values.reshape(X_test.shape[0], height, width, channels)
X_val = X_val.values.reshape(X_val.shape[0], height, width, channels)

In [None]:
len(X_val)

In [None]:
from tensorflow.keras.initializers import HeNormal

# Add convolutional layers
model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer=HeNormal(), input_shape=(height, width, channels)))
model.add(MaxPooling2D((2, 2)))

model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D((2, 2)))

model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(MaxPooling2D((2, 2)))

# Flatten the output of the convolutional layers
model.add(Flatten())

# Fully connected (dense) layers
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))  # Dropout for regularization

# Output layer for regression: One neuron, no activation function (linear activation)
model.add(Dense(1))

# Compile the model
model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')

# Step 4: Set up Early Stopping callback to monitor validation loss
early_stopping = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)

# Train the model
history = model.fit(X_train_sub, y_train_sub, epochs=100, batch_size=64, callbacks=[early_stopping],
                    validation_data=(X_test, y_test))

# Step 7: Get the best epoch from the early stopping
best_epoch = np.argmin(history.history['val_loss']) + 1  # Adding 1 because epochs start from 1
print(f"Best Epoch (based on validation loss): {best_epoch}")

# Evaluate the model
y_pred = history.model.predict(X_test)

# Calculate Mean Squared Error (MSE) and RMSE
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, y_pred)

print(f'Mean Squared Error (MSE): {mse}')
print(f'Root Mean Squared Error (RMSE): {rmse}')
print(f"R-squared (R²): {r2:.2f}")

In [None]:
# Evaluate the model
y_pred = history.model.predict(X_test)

# Calculate Mean Squared Error (MSE) and RMSE
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, y_pred)

print(f'Mean Squared Error (MSE): {mse}')
print(f'Root Mean Squared Error (RMSE): {rmse}')
print(f"R-squared (R²): {r2:.2f}")

In [None]:
# Evaluate the model
y_val_pred = history.model.predict(X_val)

# Calculate Mean Squared Error (MSE) and RMSE
mse = mean_squared_error(y_val, y_val_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_val, y_val_pred)

print(f'Mean Squared Error (MSE): {mse}')
print(f'Root Mean Squared Error (RMSE): {rmse}')
print(f"R-squared (R²): {r2:.2f}")

In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
# Set y-axis limits from 0 to 20
plt.ylim(0, 5)
# Show the ticks and their labels on y-axis (you can adjust the tick range based on data)
plt.yticks(np.arange(0, 5, 0.5))  # Show ticks from 0 to 20 with a step of 2
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.show()