In [2]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
import keras_tuner as kt
from tensorflow.keras import Input
from tensorflow.keras import backend as K
import shutil

shutil.rmtree("kt_dir", ignore_errors=True)
K.clear_session()




In [None]:
# name of city - used only in output file name
city = "Kolkata"
# load result from 2.0 training_to_parquet
training_data_file = r""  # "enhanced_formal.parquet"

# load result from 1.0 real data
real_data_file = r"" # "20250519_04_countBuildingsInSquare_mumbaiAll_Filtered_50x50.parquet"

# define name/path where classification report should be saved - no extension -> will be added in the code
classification_rpt = r""

# define name/path where confusion matrix should be saved.
confusion_mtrx = r""

# output geojson file
output_file = r""  # f"NN_chennai_colored_50x50_without_trees_v2.geojson"

#### Train Data Set

In [None]:
# load result from 2.0 training_to_parquet
df = pd.read_parquet(training_data_file)
# df.to_csv("data/origData/results/split_universe_with_median_height.csv") - better use parquet

#drop unused columns, leave only the ones needed for computation
df_adjusted = df.drop(
    columns=["Unnamed: 0", "Unnamed: 0.1", "id", "geometry", "polygon", "tile", "num_pixels_ge_3m"]
)

df_cleaned = df_adjusted.fillna(0)


# Normalize data
df_cleaned["avg_area_norm"] = df_cleaned["avg_area"] / 3000
df_cleaned.loc[df_cleaned["avg_area"] > 3000, "avg_area_norm"] = 1

df_cleaned["max_area_norm"] = df_cleaned["max_area"] / 3000
df_cleaned.loc[df_cleaned["max_area"] > 3000, "max_area_norm"] = 1

df_cleaned["avg_height_norm"] = df_cleaned["avg_height"] / 100
df_cleaned.loc[df_cleaned["avg_height"] > 3000, "avg_height_norm"] = 1

df_cleaned = df_cleaned.drop(columns=["avg_area", "max_area", "avg_height"])



In [None]:
# Separate features (X) and label (y)
X = df_cleaned.drop("class", axis=1)
y = df_cleaned["class"]

y = y.map({"formal": 0, "informal": 1})

# Scale the features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Split into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=0.2, random_state=42
)

input_dim = X_train.shape[1]

def build_model(hp):
    model = Sequential()

    model.add(Input(shape=(input_dim,)))

    # First hidden layer
    model.add(Dense(24, activation="relu", name="dense_0"))

    # Second hidden layers
    model.add(Dense(88, activation="relu", name="dense_1"))

    # Output layer
    model.add(Dense(1, activation='sigmoid'))

    # Compile
    model.compile(
        optimizer=Adam(learning_rate=1e-3),
        loss="binary_crossentropy",
        metrics=["accuracy"],
    )
    return model

# Set up early stopping
earlystopping = EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True)

# Initialize tuner
tuner = kt.RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=10,  # Try 10 different combinations
    executions_per_trial=1,
    directory='kt_dir',
    project_name='text_classifier_tuning'
)

# Run search
tuner.search(
    X_train, y_train,
    epochs=50,
    validation_split=0.1,
    callbacks=[earlystopping],
    verbose=1
)

# Get the best model
model = tuner.get_best_models(num_models=1)[0]

# Evaluate on test set
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test Loss: {loss:.4f}")
model.summary()

Reloading Tuner from kt_dir\text_classifier_tuning\tuner0.json


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
  saveable.load_own_variables(weights_store.get(inner_path))


[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - accuracy: 0.8019 - loss: 0.3700  
Test Accuracy: 0.79


In [None]:
# Predict probabilities
y_pred_prob = model.predict(X_test)

# Convert probabilities to binary labels
y_pred = (y_pred_prob > 0.5).astype("int32").flatten()

from sklearn.metrics import confusion_matrix

# Create confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Plot confusion matrix
import matplotlib.pyplot as plt
import seaborn as sns

# Plot heatmap
plt.figure(figsize=(6, 5))
sns.heatmap(
    cm,
    annot=True,
    fmt="d",
    cmap="Blues",
    cbar=False,
    xticklabels=["formal", "informal"],
    yticklabels=["formal", "informal"],
)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix Heatmap")

# Save IMAGE to file
plt.tight_layout()
plt.savefig(confusion_mtrx, dpi=300)

plt.show()
plt.close()

In [None]:
from sklearn.metrics import classification_report

# Print the classification report
report = classification_report(y_test, y_pred, target_names=["formal", "informal"])
print(report)

with open(f"{classification_rpt}_{city}.txt", "w") as f:
    f.write(report)

### REAL DATA

In [None]:
# real data
real_df = pd.read_parquet(real_data_file)

In [None]:
#same with training - drop unnecesary columns
real_df_adjusted = real_df.drop(
    columns=["Unnamed: 0", "class", "id", "geometry", "polygon"]
)

real_df_cleaned = real_df_adjusted.fillna(0)

# print(real_df_cleaned.columns)
# print(real_df_cleaned.head(5))

# Normalize data
real_df_cleaned["avg_area_norm"] = real_df_cleaned["avg_area"] / 3000
real_df_cleaned.loc[real_df_cleaned["avg_area"] > 3000, "avg_area_norm"] = 1

real_df_cleaned["max_area_norm"] = real_df_cleaned["max_area"] / 3000
real_df_cleaned.loc[real_df_cleaned["max_area"] > 3000, "max_area_norm"] = 1

real_df_cleaned["avg_height_norm"] = real_df_cleaned["avg_height"] / 100
real_df_cleaned.loc[real_df_cleaned["avg_height"] > 3000, "avg_height_norm"] = 1

real_df_cleaned_adj = real_df_cleaned.drop(columns=["avg_area", "max_area", "avg_height"])


In [77]:
# Align columns with training data
new_df = real_df_cleaned_adj.reindex(
    columns=X.columns, fill_value=0
)  # Ensure same structure

# Scale using the SAME scaler (don’t fit again)
new_scaled = scaler.transform(new_df)

In [78]:
# Step 1: Predict probabilities
pred_probs = model.predict(new_scaled)

# Step 2: Convert to percentages (e.g., 0.82 → 82.0)
percentages = (pred_probs * 100).round(2).flatten()  # Round to 2 decimal places

# Step 3: Predict binary class from probabilities
pred_classes = (pred_probs > 0.5).astype(int)  # Binary outcome

[1m590/590[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 749us/step


In [None]:
predicted_classes = ["formal" if label == 0 else "informal" for label in pred_classes]
real_df["prediction"] = predicted_classes
real_df["confidence (%)"] = percentages

real_data_adjusted_df = real_df.drop(columns=["class"])

In [None]:
real_data_filtered_df = real_data_adjusted_df[
    real_data_adjusted_df["prediction"] == "informal"
]


In [81]:
# Convert WKT to shapely geometries
from shapely import wkt

real_data_filtered_df["polygon"] = real_data_filtered_df["polygon"].apply(wkt.loads)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  real_data_filtered_df["polygon"] = real_data_filtered_df["polygon"].apply(wkt.loads)


In [82]:
# Build GeoJSON features
from shapely.geometry import mapping

color_map = {
    "high": "#00cd00", # green
    "mid": "#ff4d4d", # red
    # "low": "#1591EA", # blue
}     

features = []
for _, row in real_data_filtered_df.iterrows():

    confidence = row["confidence (%)"]
    color = (
        color_map["high"] if confidence > 60
        else color_map["mid"]
    )

    
    feature = {
        "type": "Feature",
        "properties": {
            "id": row["id"],
            "confidence": round(row["confidence (%)"], 2),
            "class": row["prediction"],
            "fill": color,
            "stroke": color
        },
        "geometry": mapping(row["polygon"]),
    }
    features.append(feature)

In [83]:
# Wrap in a FeatureCollection
geojson_dict = {"type": "FeatureCollection", "features": features}

In [None]:
# Save to final file
import json

with open(output_file,
    "w",
) as f:
    json.dump(geojson_dict, f, indent=2)