In [None]:
# Import packages
import pandas as pd
import numpy as np
import rasterio
import earthpy.plot as ep

import matplotlib.pyplot as plt
from matplotlib.colors import from_levels_and_colors
%matplotlib inline

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

import keras
from tensorflow.keras.models import load_model

from keras import Sequential
from keras.layers import Conv1D, MaxPooling1D, Dense, Dropout, Flatten, Input, GlobalMaxPooling1D
from keras.callbacks import EarlyStopping
from keras import Model
from keras.utils import to_categorical, plot_model, model_to_dot


# Check if GPU is available
import tensorflow as tf
print('Tensorflow version: ', tf.__version__)
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

TypeError: Unable to convert function return value to a Python type! The signature was
	() -> handle

# **Parameters**

In [None]:
# Parameters
FEATURES = ['b2', 'b3', 'b4', 'b5', 'b6', 'b7', 'b8', 'b8a', 'b11', 'b12', 
                        'con', 'dis', 'hom', 'asm', 'ent', 'mea', 'var', 'cor', 'sa', 'pb',
                        'pg', 'pr', 'pn', 'pcon', 'pdis', 'phom', 'pasm', 'pent', 'pmea','pvar',
                         'pcor', 'psa']

LABEL = ['class']
SPLIT = ['sample']
N_CLASSES = 5
CLASSES = [1, 2, 3, 4, 5]
PALETTE = ['dodgerblue', 'red', 'yellow', 'forestgreen', 'coral']

SAMPLE_PATH = './extract_pts_10m.csv'
IMAGE_PATH = './s2_planet_band_texture_10m_norm.tif'

In [None]:
# Load image
image = rasterio.open(IMAGE_PATH)
bandNum = image.count
height = image.height
width = image.width
crs = image.crs
transform = image.transform
shape = (height, width)

print(bandNum)
print(shape)
print(crs)

In [None]:
## Visualization

image_vis = []
for x in [5, 4, 3]:
  image_vis.append(image.read(x))
image_vis = np.stack(image_vis)

plot_size = (8, 8)
ep.plot_rgb(image_vis, figsize=plot_size, stretch=True)

In [None]:
samples = pd.read_csv(SAMPLE_PATH)
samples = samples.sample(frac = 1) # Shuffle data
samples

In [None]:
# Split into train and test based on column
train = samples[samples['sample'] == 'train']
test = samples[samples['sample'] == 'test']

# Split between features and label
train_features = train[FEATURES]
train_label = train[LABEL]
test_features = test[FEATURES]
test_label = test[LABEL]

# Function to reshape array input
def reshape_input(array):
  shape = array.shape
  return array.reshape(shape[0], shape[1], 1)

# Convert samples dataframe (pandas) to numpy array
train_input = reshape_input(train_features.to_numpy())
test_input = reshape_input(test_features.to_numpy())

# Also make label data to categorical
train_output = to_categorical(train_label.to_numpy(), N_CLASSES+1, int)
test_output = to_categorical(test_label.to_numpy(), N_CLASSES+1, int)

# Show the data shape
print(f'Train features: {train_input.shape}\nTest features: {test_input.shape}\nTrain label: {train_output.shape}\nTest label: {test_output.shape}')

In [None]:
# Make model for our data
# Input shape
train_shape = train_input.shape
input_shape = (train_shape[1], train_shape[2])

# Model parameter
neuron = 128
drop = 0.2
kernel = 2
pool = 2

# Create sequential model
model = Sequential([
    Input(input_shape),

    Conv1D(neuron * 1, kernel, activation='relu', padding='same'),
    Conv1D(neuron * 1, kernel, activation='relu', padding='same'),
    MaxPooling1D(pool),
    Dropout(drop),

    Conv1D(neuron * 2, kernel, activation='relu', padding='same'),
    Conv1D(neuron * 2, kernel, activation='relu', padding='same'),
    MaxPooling1D(pool),
    Dropout(drop),

    GlobalMaxPooling1D(),

    Dense(neuron * 2, activation='relu'),
    Dropout(drop),

    Dense(neuron * 1, activation='relu'),
    Dropout(drop),

    Dense(N_CLASSES + 1, activation='softmax')
])

model.summary()

In [None]:
# Train the model
# Compline the model
model.compile(
    optimizer='Adam',
    loss='CategoricalCrossentropy',
    metrics=['accuracy']
)

# Create callback to stop training if loss not decreasing
stop = EarlyStopping(
    monitor='loss',
    patience=10
)

# Fit the model
result = model.fit(
    x=train_input, y=train_output,
    validation_data=(test_input, test_output),
    batch_size=1024,
    callbacks=[stop],
    epochs= 100,
)

# Save the entire model as a `.keras` zip archive.
model.save('./01_model/CNN_all_s2_planet.keras')

In [None]:
# Save the entire model as a `.keras` zip archive.
model = load_model('./CNN_all_s2_planet.keras')

# Show history
history = pd.DataFrame(result.history)

plt.figure(figsize = (10, 8))
plt.plot(range(len(history['accuracy'].values.tolist())), history['accuracy'].values.tolist(), label = 'Train_Accuracy')
plt.plot(range(len(history['loss'].values.tolist())), history['loss'].values.tolist(), label = 'Train_Loss')
plt.plot(range(len(history['val_accuracy'].values.tolist())), history['val_accuracy'].values.tolist(), label = 'Test_Accuracy')
plt.plot(range(len(history['val_loss'].values.tolist())), history['val_loss'].values.tolist(), label = 'Test_Loss')
plt.xlabel('Epochs')
plt.ylabel('Value')
plt.legend()
plt.show()


In [None]:
# Predict test data
prediction = np.argmax(model.predict(test_input), 1).flatten()
label = np.argmax(test_output, 1).flatten()

# Confusion matrix
cm = confusion_matrix(label, prediction, normalize='true')
cm = ConfusionMatrixDisplay(cm)
cm.plot()

# Classification report
print(classification_report(label, prediction))

In [None]:
from sklearn.metrics import classification_report, accuracy_score

def calculate_permutation_importance(model, X_test, y_test, baseline_accuracy):
    importance_scores = []

    for i in range(X_test.shape[1]):
        # Hoán đổi cột i của X_test
        X_test_permuted = X_test.copy()
        np.random.shuffle(X_test_permuted.iloc[:, i].values)

        # Dự đoán với X_test đã hoán đổi
        permuted_preds = np.argmax(model.predict(test_input), 1).flatten()#model.predict(X_test_permuted)#.round()
        permuted_accuracy = accuracy_score(y_test, permuted_preds)
        # print(permuted_accuracy)

        # Tính toán mức độ suy giảm hiệu suất
        importance_score = baseline_accuracy  - permuted_accuracy
        importance_scores.append(importance_score)

    return np.array(importance_scores)




base_accuracy = accuracy_score(test_label, prediction)
base_accuracy
varImp = calculate_permutation_importance(model, test_features, test_label, base_accuracy)

In [None]:
varImp

In [None]:
[test_features.columns, varImp]

df_varImp = pd.DataFrame({
    'Vars': test_features.columns,
    'Value': varImp
})
df_varImp.to_csv('./accuracy/varImp_cnn_dhn.csv', index=False)

In [None]:
# Predict image using the model
image_input = []

nbands = 32

for x in range(nbands):
  image_input.append(image.read(x + 1))
image_input = reshape_input(np.stack(image_input).reshape(nbands, -1).T)

# Predict
prediction = model.predict(image_input, batch_size=1024*4)
prediction = np.argmax(prediction, 1)
prediction = prediction.reshape(shape[0], shape[1])

# Visualize
cmap, norm = from_levels_and_colors(CLASSES, PALETTE, extend='max')
ep.plot_bands(prediction, cmap=cmap, norm=norm, figsize=plot_size)

In [None]:
# Write the NumPy array to a GeoTIFF file
with rasterio.open(IMAGE_PATH) as src:
  profile = src.profile

profile.update(**{
    'count': 1,
    'dtype': prediction.dtype,
    'nodata': -9999
})

print(profile)

with rasterio.open('./02_classified_out/classed_cat_all_s2_planet_band_texture.tif', 'w', **profile) as dst:
    dst.write(prediction, 1)

print("GeoTIFF file saved successfully!")