In [None]:
import pandas as pd
import numpy as np
from PIL import Image
import os
import tensorflow as tf
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D,Dropout,BatchNormalization,Input
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import RMSprop
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from tensorflow.keras.callbacks import EarlyStopping


In [None]:
csv_file_path = '/kaggle/input/flood-data/Kaggle/devset_images_gt.csv'
df = pd.read_csv(csv_file_path)

image_folder_path = '/kaggle/input/flood-data/Kaggle/devset_images/devset_images'
image_files = [file for file in os.listdir(image_folder_path) if not file.startswith('._') and file.endswith('.jpg')]

selected_image_files = []
selected_labels = []

for image_file in image_files:
    image_id = int(image_file.split('.')[0])  
    label = df[df['id'] == image_id]['label'].values
    if len(label) > 0:
        selected_image_files.append(image_file)
        selected_labels.append(label[0])

X = []
y = []

for image_file, label in zip(selected_image_files, selected_labels):
    image_path = os.path.join(image_folder_path, image_file)
    image = Image.open(image_path)
    image = image.resize((256, 256))  
    image = np.array(image) 
    X.append(image)
    y.append(label)

X = np.array(X)
y = np.array(y)

from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)


In [None]:
combined_input = Input(shape=(256,256,3))

In [None]:
# resnet_base = tf.keras.applications.EfficientNetB5(weights='imagenet', include_top=False)
# for layer in resnet_base.layers:
#     layer.trainable = True
# x = resnet_base(combined_input)
# x = GlobalAveragePooling2D()(x)
# x = Dense(8, activation='relu')(x)
# x = Dropout(0.5)(x)
# x = BatchNormalization()(x)
# x = Dense(4, activation='relu')(x)
# x = Dropout(0.5)(x)
# x = BatchNormalization()(x)
# resnet_output = Dense(1, activation='sigmoid')(x)  # Output layer

# resnet_model = Model(inputs=combined_input, outputs=resnet_output)
effnet_base = tf.keras.applications.EfficientNetB7(weights='imagenet', include_top=False)

# Freeze the layers in the base model
for layer in effnet_base.layers:
    layer.trainable = False

# Build the custom head of the model
x = effnet_base(combined_input)
x = GlobalAveragePooling2D()(x)
x = Dense(1028, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
x = Dense(514, activation='sigmoid')(x)
x = BatchNormalization()(x)
x = Dense(256, activation='relu')(x)
x = BatchNormalization()(x)

# Output layer
effnet_output = Dense(1, activation='sigmoid')(x)

# Combine the base model and the custom head
effnet_model = Model(inputs=combined_input, outputs=effnet_output)

In [None]:
resnet_base = tf.keras.applications.ResNet50(weights='imagenet', include_top=False)
for layer in resnet_base.layers:
    layer.trainable = True
x = resnet_base(combined_input)
x = GlobalAveragePooling2D()(x)
x = Dense(256, activation='relu')(x)
x = BatchNormalization()(x)
x = Dropout(0.5)(x)
x = Dense(128, activation='sigmoid')(x)
x = BatchNormalization()(x)
x = Dense(64, activation='relu')(x)
x = BatchNormalization()(x)
resnet_output = Dense(1, activation='sigmoid')(x)  # Output layer
resnet_model = Model(inputs=combined_input, outputs=resnet_output)


In [None]:
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.callbacks import LearningRateScheduler
import math
def lr_schedule(epoch):
    initial_lr = 0.001  
    drop = 0.5  
    epochs_drop = 10  
    new_lr = initial_lr * math.pow(drop, math.floor((1+epoch)/epochs_drop))
    return new_lr
optimizer = RMSprop(learning_rate=0.001)
lr_scheduler = LearningRateScheduler(lr_schedule)


In [None]:
combined_output = tf.keras.layers.average([resnet_output, effnet_output]) 
combined_model = Model(inputs=combined_input, outputs=combined_output)

combined_model.summary()

In [None]:
combined_model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
from keras.preprocessing.image import ImageDataGenerator

datagen = ImageDataGenerator(
    rotation_range=40,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

In [None]:
epochs = 50
batch_size = 16

In [None]:
early_stopping = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)

history = combined_model.fit(datagen.flow(X_train, y_train, batch_size=batch_size),  # Corrected line, added comma
                            steps_per_epoch=len(X_train) // batch_size,
                            epochs=epochs,
                            validation_data=(X_test, y_test),
                            callbacks=[lr_scheduler,early_stopping])

In [None]:
loss, accuracy = combined_model.evaluate(X_test, y_test)

print("Loss:", loss)
print("Accuracy:", accuracy)

In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [None]:
import os
import numpy as np
import pandas as pd
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing import image
from PIL import Image


test_image_folder_path = '/kaggle/input/flood-data/Kaggle/testset_images/testset_images'
test_image_files = [file for file in os.listdir(test_image_folder_path) if not file.startswith('._')]
X_test = []

for image_file in test_image_files:
    image_id_str = image_file.split('.')[0]
    image_id = int(image_id_str)
    image_path = os.path.join(test_image_folder_path, image_file)
    image = Image.open(image_path)
    image = image.resize((256, 256))
    image = np.array(image)
    X_test.append(image)

X_test = np.array(X_test)

test_predictions = combined_model.predict(X_test)
binary_predictions = (test_predictions > 0.5).astype(int).flatten()

test_df = pd.DataFrame({'id': [int(file.split('.')[0]) for file in test_image_files],
                        'label': binary_predictions})

test_predictions_csv_path = '/kaggle/working/test_predictions.csv'
test_df.to_csv(test_predictions_csv_path, index=False)

print(f'Test predictions saved to {test_predictions_csv_path}')
