In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dense, Flatten, Dropout, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.regularizers import l2

In [None]:
ds=pd.read_csv('/content/drive/MyDrive/fishpond_dataset/pond_dataset.csv', encoding='unicode_escape')

In [None]:
ds.head(10)

In [None]:
ds['images']=ds['images']+str('.jpg')
ds.head(10)

In [None]:
print(len(ds))

In [None]:
# Split into train (80%) and temp (20%)
train_ds, test_ds = train_test_split(ds, test_size=0.2, random_state=42)


# Print dataset sizes
print(f"Train set: {len(train_ds)} samples")
print(f"Test set: {len(test_ds)} samples")

In [None]:
import os

base_path = '/content/drive/MyDrive/fishpond_dataset/images'

# Get all image file paths
image_paths = [os.path.join(base_path, f) for f in os.listdir(base_path) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]

# Print the first three image paths
print(image_paths[:3])

In [None]:
import os

# Load dataset
csv_filenames = set(train_ds["images"].astype(str))  # Convert to set for faster lookup
image_filenames = set(f for f in os.listdir(base_path) if f.lower().endswith(('.png', '.jpg', '.jpeg')))

# Find mismatches
missing_in_folder = csv_filenames - image_filenames  # Images listed in CSV but not in folder
missing_in_csv = image_filenames - csv_filenames  # Images in folder but missing from CSV

print(f"Images in CSV but NOT in folder: {len(missing_in_folder)} -> {list(missing_in_folder)[:5]}")
print(f"Images in folder but NOT in CSV: {len(missing_in_csv)} -> {list(missing_in_csv)[:5]}")


In [None]:
from sklearn.preprocessing import StandardScaler
import numpy as np

# Initialize StandardScaler
scaler = StandardScaler()

# Fit on training labels and transform both train & test labels
train_ds[["pH", "TDS"]] = scaler.fit_transform(train_ds[["pH", "TDS"]])
test_ds[["pH", "TDS"]] = scaler.transform(test_ds[["pH", "TDS"]])


def pixel_normalization(img):
    return img / 255.0  # Normalize to [0,1]



# Data Augmentation & Normalization
train_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)

test_datagen = ImageDataGenerator(preprocessing_function=preprocess_input)

# Ensure 'images' column contains only filenames (no directory path)
train_ds["images"] = train_ds["images"].astype(str)  # Ensure it's a string
test_ds["images"] = test_ds["images"].astype(str)

# Create Generators
train_generator = train_datagen.flow_from_dataframe(
    dataframe=train_ds,
    directory=base_path,  # Folder containing images
    x_col="images",  # Just the filenames
    y_col=["pH", "TDS"],  # Regression labels
    target_size=(100, 100),
    batch_size=32,
    class_mode="raw",
    shuffle=True
)

test_generator = test_datagen.flow_from_dataframe(
    dataframe=test_ds,
    directory=base_path,
    x_col="images",
    y_col=["pH", "TDS"],
    target_size=(100, 100),
    batch_size=32,
    class_mode="raw",
    shuffle=False
)


In [None]:
# Load Pretrained ResNet50 Model (Without Top Layers)
base_model = ResNet50(weights="imagenet", include_top=False, input_shape=(100,100, 3))

# Freeze base model layers
for layer in base_model.layers:
    layer.trainable = False

# Fine-Tuning (Unfreeze some layers)
for layer in base_model.layers[-20:]:  # Unfreeze last 20 layers
    layer.trainable = True

# Add Custom Regression Head
x = GlobalAveragePooling2D()(base_model.output)
#x = Dense(512, activation="relu", kernel_regularizer=l2(0.001))(x)  # Added L2 Regularization
x = Dense(512, activation="relu")(x)
x = Dropout(0.30)(x)
x = Dense(256, activation="relu")(x)
x = Dense(2, activation="linear")(x)  # Output 2 values (pH, TDS)

# Define Model
model = Model(inputs=base_model.input, outputs=x)

# Compile Model
model.compile(optimizer=Adam(learning_rate=0.0001), loss="mse", metrics=["mae"])

# Model Summary
model.summary()


In [None]:

# Train Model
history = model.fit(
    train_generator,
    validation_data=test_generator,
    epochs=10,  # Adjust based on performance
    steps_per_epoch=len(train_generator),
    validation_steps=len(test_generator)
)


In [None]:
import numpy as np
import pandas as pd
import os
from tensorflow.keras.preprocessing import image

# Pick a few sample images from test dataset
sample_images = test_ds.sample(10)  # Randomly select 5 test samples

for _, row in sample_images.iterrows():
    img_path = os.path.join(base_path, row["images"])  # Get full image path

    # Load & preprocess image
    img = image.load_img(img_path, target_size=(100, 100, 3))
    img_array = image.img_to_array(img)
    img_array = preprocess_input(img_array)  # Use same preprocessing as training
    img_array = np.expand_dims(img_array, axis=0)  # Add batch dimension

    # Make prediction
    predictions = model.predict(img_array)
    predicted_pH, predicted_TDS = predictions[0]

    # If you applied MinMaxScaler earlier, inverse transform the predictions
    actual_pH, actual_TDS = row["pH"], row["TDS"]
    if "scaler" in globals():
        predicted_pH, predicted_TDS = scaler.inverse_transform([[predicted_pH, predicted_TDS]])[0]

    # Print results
    print(f"Image: {row['images']}")
    print(f"  Actual pH: {actual_pH:.2f}, Predicted pH: {predicted_pH:.2f}")
    print(f"  Actual TDS: {actual_TDS:.2f}, Predicted TDS: {predicted_TDS:.2f}\n")


In [None]:
import matplotlib.pyplot as plt

# Plot Training & Validation Loss
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()


In [None]:
train_loss, train_mae = model.evaluate(train_generator)
val_loss, val_mae = model.evaluate(test_generator)

print(f"Train Loss: {train_loss:.2f}, Train MAE: {train_mae:.2f}")
print(f"Validation Loss: {val_loss:.2f}, Validation MAE: {val_mae:.2f}")

#Train Loss: 49.64, Train MAE: 4.18
#Validation Loss: 299.57, Validation MAE: 9.21 (100*100 pixel 10 epoch 20 unfreeze)No DA

#Train Loss: 267.54, Train MAE: 10.28
#Validation Loss: 571.42, Validation MAE: 14.04 (224*224 pixel 10 epoch 20 unfreeze)

#Train Loss: 143.14, Train MAE: 7.61
#Validation Loss: 369.10, Validation MAE: 11.16 (100*100 pixel 10 epoch 20 unfreeze 0.4dropout 0.01 Regularization)

#Train Loss: 854.52, Train MAE: 19.55
#Validation Loss: 1134.72, Validation MAE: 21.49 (100*100 pixel 10 epoch 10 unfreeze 0.4dropout 0.01 Regularization)


#Train Loss: 352.28, Train MAE: 12.42
#Validation Loss: 674.77, Validation MAE: 15.72 (100*100 pixel 10 epoch 10 unfreeze 0.35dropout 0.001 Regularization) from DA

#Train Loss: 1146.32, Train MAE: 22.39
#Validation Loss: 850.52, Validation MAE: 18.74 (100*100 20epoch 0 unfreezed 0.3 dropout 0.001 Regulaization 0.001 LR)

#Train Loss: 399.78, Train MAE: 11.50
#Validation Loss: 526.72, Validation MAE: 13.25 (100*100 20epoch 5 unfreezed 0.3 dropout 0.001 Regulaization 0.0001 LR)

#Train Loss: 405.81, Train MAE: 12.52
#Validation Loss: 339.85, Validation MAE: 11.14 (100*100 20epoch 20 unfreezed 0.3 dropout 0.001 Regulaization 0.0001 LR)

In [None]:
def mean_absolute_percentage_error(y_true, y_pred):
    """Calculates MAPE given y_true and y_pred."""
    # Ensure both arrays have the same length before calculation
    min_len = min(len(y_true), len(y_pred))
    y_true = y_true[:min_len]
    y_pred = y_pred[:min_len]

    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

# Get predictions for the entire test dataset without batching
y_true = test_ds[["pH", "TDS"]].values
y_pred = model.predict(test_generator, steps=len(test_generator), verbose=0)

# Calculate and print MAPE for each output (pH and TDS)
mape_pH = mean_absolute_percentage_error(y_true[:, 0], y_pred[:, 0])
mape_TDS = mean_absolute_percentage_error(y_true[:, 1], y_pred[:, 1])
min_samples = min(y_true.shape[0], y_pred.shape[0])
y_true = y_true[:min_samples]
y_pred = y_pred[:min_samples]

print(f"MAPE for pH: {mape_pH:.2f}%")
print(f"MAPE for TDS: {mape_TDS:.2f}%")



In [None]:
from sklearn.metrics import r2_score

# Get the true values for the test set
y_true = test_ds[["pH", "TDS"]].values  # Extract pH and TDS columns as NumPy array

y_pred = model.predict(test_generator, steps=len(test_generator), verbose=0)

# Ensure y_true and y_pred have the same number of samples
min_samples = min(y_true.shape[0], y_pred.shape[0])
y_true = y_true[:min_samples]
y_pred = y_pred[:min_samples]

# Calculate R-squared for each output (pH and TDS)
r2_pH = r2_score(y_true[:, 0], y_pred[:, 0])
r2_TDS = r2_score(y_true[:, 1], y_pred[:, 1])

print(f"R-squared for pH: {r2_pH:.2f}")
print(f"R-squared for TDS: {r2_TDS:.2f}")
