In [None]:
import numpy as np
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Reshape
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.models import Sequential
import tensorflow as tf
from matplotlib import pyplot as plt
import laspy as lp
import matplotlib
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KDTree
import pandas as pd

# 1. Load the data

In [None]:
X_train = np.load('CNN_FULL_Training_features.npy')
y_train = np.load('CNN_FULL_targets_train.npy')
X_val = np.load('CNN_Validation_features.npy')
y_val = np.load('CNN_Validation_targets.npy')
X_test = np.load('CNN_test_1tile_features.npy')
y_test = np.load('CNN_test_1tile_targets.npy')

In [None]:
X_train = X_train[:, :-1]
X_test = X_test[:, :-1]

# 2. Find the neighbours for each point

In [None]:
# Define the number of nearest neighbors to consider
k_neighbors = 10

# Calculate the nearest neighbors for each data point using KD-Tree
tree = KDTree(X_train)
tree2 = KDTree(X_test)
nearest_neighbors_train = tree.query(X_train, k=k_neighbors+1, return_distance=False)[:, 1:]
nearest_neighbors_test = tree2.query(X_test, k=k_neighbors+1, return_distance=False)[:, 1:]

# 3. Prepare the data format for the CNN models

In [None]:
# Generate the input data for training the model
def generate_input_array(X, nearest_neighbors):
    input_array = np.concatenate([X[:,3:][nearest_neighbors[:, i]] for i in range(k_neighbors)], axis=1)
    return input_array.reshape(-1, k_neighbors, X[:,3:].shape[1], 1)

input_array_train = generate_input_array(X_train, nearest_neighbors_train)
input_array_test = generate_input_array(X_test, nearest_neighbors_test)

# Normalize the nearest_neighbors arrays
scaler = StandardScaler()
input_array_train = scaler.fit_transform(input_array_train.reshape(input_array_train.shape[0], -1))
input_array_test = scaler.fit_transform(input_array_test.reshape(input_array_test.shape[0], -1))
input_array_train = input_array_train.reshape(-1, k_neighbors, X_train[:,3:].shape[1], 1)
input_array_test = input_array_test.reshape(-1, k_neighbors, X_test[:,3:].shape[1], 1)


# Desing the CNN Model

In [None]:
# Define the CNN model
model = Sequential()
model.add(Conv1D(128, kernel_size=3, activation='relu', padding='same', input_shape=(k_neighbors, 8)))
model.add(BatchNormalization())
model.add(Conv1D(128, kernel_size=3, activation='relu', padding='same'))
model.add(MaxPooling1D(pool_size=2))
model.add(Dropout(0.3))

model.add(Conv1D(64, kernel_size=2, activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(Conv1D(64, kernel_size=2, activation='relu', padding='same'))
model.add(Dropout(0.3))

model.add(Conv1D(32, kernel_size=2, activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(Conv1D(32, kernel_size=2, activation='relu', padding='same'))
model.add(MaxPooling1D(pool_size=2))
model.add(Dropout(0.3))

model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(0.3))

model.add(Dense(64, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(0.3))

model.add(Dense(32, activation='relu'))
model.add(BatchNormalization())
model.add(Dropout(0.3))

model.add(Dense(7, activation='softmax'))

model.summary()

# Train and evaluate the model

In [None]:
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the model
model.fit(input_array_train, y_train, epochs=10, batch_size=64)

loss, accuracy = model.evaluate(input_array_test, y_test)

# Print the accuracy as a percentage
print("Accuracy: {:.2f}%".format(accuracy * 100))

# Predict

In [None]:
y_pred_modified = np.argmax(model.predict(input_array_test), axis=-1)

# Per class accuraccies, reports and confusion matrix

In [None]:
cm_modified = confusion_matrix(y_test, y_pred_modified)
class_accuracies_modified = cm_modified.diagonal() / cm_modified.sum(axis=1)
for i, acc in enumerate(class_accuracies_modified):
    print(f"Accuracy for class {i} (Modified architecture): {acc:.2f}")

In [None]:
# Save the model
model.save('second_HALF_CNN_10epoch_10neigh_64batch.h5')

In [None]:
from sklearn.metrics import classification_report
target_names = ['unchanged', 'new_building', 'demolition', 'new_vegetation', 'vegetation_growth', 'vegetation_loss', 'mobile_objects']
print(classification_report(y_test, y_pred_modified, target_names=target_names))

In [None]:
# Get the report in a csv format
report = classification_report(y_test, y_pred_modified, target_names=target_names, output_dict=True)
df = pd.DataFrame(report).transpose()
# Save the DataFrame as a CSV file
df.to_csv('BESTclassification_report_cnn-secondhalf.csv')

In [None]:
# Generate confusion matrix plot
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

cm = confusion_matrix(y_test, y_pred_modified)
sns.heatmap(cm, annot=True, cmap="Blues", fmt="d")
plt.title("Confusion Matrix")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")

# Save the plot as an image file
plt.savefig('BESTconfusion_matrix-half.png', dpi=300, bbox_inches='tight')

# Cluster cleaning the output

In [None]:
targets = y_pred_modified
from sklearn.cluster import DBSCAN
from sklearn.neighbors import BallTree


for label in np.unique(targets):
    current_points = X_test[:,0:3][targets == label]
    clustering = DBSCAN(eps=2.5, min_samples=1).fit(current_points)
    current_unique_labels = np.unique(clustering.labels_)
    small_cluster_points = np.zeros(current_points.shape[0], dtype=bool)
    
    for current_label in current_unique_labels:
        if current_label == -1:
            continue
        current_cluster = clustering.labels_ == current_label
        if current_cluster.sum() < 20:
            small_cluster_points[current_cluster] = True

    current_labels = targets[targets == label].copy()
    current_labels[small_cluster_points] = 0
    targets[targets == label] = current_labels

# Go through all zero clusters and change small ones to closest non-zero label
zero_indices = np.where(targets == 0)[0]  # Indices of zero targets
zero_points = X_test[:,0:2][zero_indices]  # Use only first two dimensions
clustering = DBSCAN(eps=1.5, min_samples=1).fit(zero_points)
unique_labels = np.unique(clustering.labels_)
small_cluster_points = np.zeros(zero_points.shape[0], dtype=bool)

for current_label in unique_labels:
    if current_label == -1:
        continue
    current_cluster = clustering.labels_ == current_label
    if current_cluster.sum() < 10:
        small_cluster_points[current_cluster] = True

small_zero_indices = zero_indices[small_cluster_points]  # Indices of small zero clusters
non_zero_targets = targets[targets != 0]
non_zero_points = X_test[:,0:2][targets != 0]  # Use only first two dimensions

# Create a BallTree for efficient nearest neighbor search
tree = BallTree(non_zero_points)

# Query the BallTree for nearest neighbors
distances, indices = tree.query(zero_points[small_cluster_points], k=1)

# Replace zero labels with nearest non-zero labels
targets[small_zero_indices] = non_zero_targets[indices.flatten()]

# Print the updated labels
print(targets)


# Save output as las file

In [None]:
print_coord = X_test[:, :3]
fn = "output_file.las"

vals = np.linspace(0, 1, 100)
np.random.shuffle(vals)
cmap = plt.cm.colors.ListedColormap(plt.cm.tab20(vals))
header = lp.header.Header()
header.data_format_id = 2
fp = lp.file.File(fn, mode = 'w', header = header)
fp.header.scale = [0.01, 0.01, 0.01]
fp.header.offset = [min(print_coord[:,0]), min(print_coord[:,1]), min(print_coord[:,2])]
fp.x = print_coord[:, 0]
fp.y = print_coord[:, 1]
fp.z = print_coord[:, 2]
fp.pt_src_id = targets
#fp.intensity = intensity
fp.close()