# Classifying Swiss cities based on Aerial Imagery

the full code related to this project can be found on github: https://github.com/Elwin-Freudiger/Swisscities

Aerial imagery is used in a lot of fields. This project attempts to see how features can be extracted from images and used to classify cities in Switzerland.

All of the images used in this project were obtained thanks to the *Federal Office of Topography swisstopo*, their website allows anyone to download a mosaic of aerial images. These images are taken all across Switzerland and the edge of the country. Each image has a ground resolution of 10cm, except over the Alps where the ground resolution is 25cm. These tiles are $1km^2$, For this research, a lower download resolution of 2 meters was chosen.

the link to download the data is: https://www.swisstopo.admin.ch/en/orthoimage-swissimage-10

## Feature extraction:

Based on an image, features can be extracted

In [None]:
import rasterio
import cv2 as cv
import numpy as np
import skimage
import matplotlib.pyplot as plt
from PIL import ImageShow
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
import contextily as ctx
import xyzservices.providers as xyz
from alive_progress import alive_bar; import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from math import *

In [None]:
with rasterio.open('report/sample2.tiff') as file:
        img_array = file.read([1, 2, 3]).transpose((1, 2, 0))  # Correct dimension order for OpenCV
        metadata = file.meta
        corner = metadata['transform']
        center_x, center_y = corner * (metadata['width'] // 2, metadata['height'] // 2)

ImageShow(img_array)
mean = np.mean(img_array)
print(mean)
stdev = np.std(img_array)
print(stdev)

gray = cv.cvtColor(img_array, cv.COLOR_RGB2GRAY)
gray_dist = cv.calcHist([gray], [0], None, [256], [0,256])                    
red = cv.calcHist([img_array], [0], None, [256], [0, 256])
green = cv.calcHist([img_array], [1], None, [256], [0, 256])
blue = cv.calcHist([img_array], [2], None, [256], [0, 256])
distribution = np.column_stack((gray_dist.flatten(), red.flatten(), green.flatten(), blue.flatten()))
fig, axs = plt.subplots(2,2)
axs[0,0].plot(gray_dist, color='gray')
axs[0,1].plot(red, color = 'red')
axs[1,0].plot(green, color = 'green')
axs[1,1].plot(blue, color='blue')
axs[0,0].set_title('Grayscale distribution')
axs[0,1].set_title('Red distribution')
axs[1,0].set_title('Green distribution')
axs[1,1].set_title('Blue distribution')
fig.tight_layout()
plt.show()


co_matrix = skimage.feature.graycomatrix(gray, [5], [0], levels=256, normed=True)
contrast = skimage.feature.graycoprops(co_matrix, 'contrast')[0, 0]
print(contrast)
correlation = skimage.feature.graycoprops(co_matrix, 'correlation')[0, 0]
print(correlation)
energy = skimage.feature.graycoprops(co_matrix, 'energy')[0, 0]
print(energy)
homogeneity = skimage.feature.graycoprops(co_matrix, 'homogeneity')[0, 0]
print(homogeneity)

## Downloading the images

First, we will look at how the images are downloaded

In [None]:
City_list = ['Zurich', 'Geneva', 'Basel', 'Lausanne', 'Bern', 'Winterthur', 'Luzern', 'St_Gallen', 'Lugano', 'Biel']
Balanced_list = ['Bern', 'Zurich', 'Lugano', 'Lausanne', 'Chur', 'Schwyz', 'Glarus', 'Winterthur', 'Sarnen', 'Nendaz']

def extraction(link, city):
    try:
        response = requests.get(link, timeout=10)
        response.raise_for_status()
        with rasterio.io.MemoryFile(response.content) as file:
            with file.open() as dataset:
                img_array = dataset.read([1, 2, 3]).transpose((1, 2, 0))  # Correct dimension order for OpenCV
                metadata = dataset.meta
                corner = metadata['transform']
                center_x, center_y = corner * (metadata['width'] // 2, metadata['height'] // 2)

                mean = np.mean(img_array)
                stdev = np.std(img_array)

                gray = cv.cvtColor(img_array, cv.COLOR_RGB2GRAY)
                gray_dist = cv.calcHist([gray], [0], None, [256], [0,256])                    
                red = cv.calcHist([img_array], [0], None, [256], [0, 256])
                green = cv.calcHist([img_array], [1], None, [256], [0, 256])
                blue = cv.calcHist([img_array], [2], None, [256], [0, 256])
                distribution = np.column_stack((gray_dist.flatten(), red.flatten(), green.flatten(), blue.flatten()))

                co_matrix = skimage.feature.graycomatrix(gray, [5], [0], levels=256, normed=True)
                contrast = skimage.feature.graycoprops(co_matrix, 'contrast')[0, 0]
                correlation = skimage.feature.graycoprops(co_matrix, 'correlation')[0, 0]
                energy = skimage.feature.graycoprops(co_matrix, 'energy')[0, 0]
                homogeneity = skimage.feature.graycoprops(co_matrix, 'homogeneity')[0, 0]
                
                row = [city, center_x, center_y, mean, stdev, distribution.tolist(), contrast, correlation, energy, homogeneity]
                return row
    except Exception as e:
        print(f"Failed to process {link}: {e}")
        return None
        

def main(list):
    data = []
    for city in list:
        path = f'data/city_link/{city}.csv'
        with open(path, 'r') as link_list:
            links = link_list.read().splitlines()
        with ThreadPoolExecutor() as executor:
            with alive_bar(len(links)) as bar:
                future_images = {executor.submit(extraction, link, city): link for link in links}
                for future in as_completed(future_images):
                    row = future.result()
                    if row is not None:
                        data.append(row)
                    bar()
                    
    return pd.DataFrame(data, columns=['Location', 'East', 'North', 'Mean', 'Stdev', 'Distribution', 'Contrast', 'Correlation', 'Energy', 'Homogeneity'])

In [None]:
df_top10 = main(City_list)
df_balanced = main(Balanced_list) 
df_top10.head()
df_balanced.head()

## Understanding the dataset

In [None]:
def plot(df):
    crs_2056 = "EPSG:2056"

    path = 'report/swissBOUNDARIES3D_1_5_LV95_LN02.gdb'
    swiss_borders = gpd.read_file(path, layer='TLM_HOHEITSGRENZE')
    canton_border = swiss_borders[swiss_borders['OBJEKTART'].isin([0])]

    geometry = [Point(xy) for xy in zip(df['East'], df['North'])]

    gdf_points = gpd.GeoDataFrame(df, geometry=geometry, crs=crs_2056)

    fig, ax = plt.subplots(figsize=(10, 10))
    canton_border['geometry'].plot(ax=ax, color='grey', alpha=0)
    gdf_points.plot(ax=ax, color='red', markersize=1)

    swiss_basemap = xyz.SwissFederalGeoportal.NationalMapColor
    ctx.add_basemap(ax, crs=crs_2056, source=swiss_basemap)
    plt.axis('off')
    plt.show()

In [None]:
plot(df_top10)
print(df_top10.value_counts)

In [None]:
plot(df_balanced)
print(df_balanced.value_counts)

## Adding the additional data

In [None]:
info = ['temperature', 'sunshine', 'rainfall']

def main(df, name):
    for file in info:
        path = f'data/{file}.tiff'
        with rasterio.open(path) as dataset:
            meta = dataset.meta
            matrix = dataset.read(1)
            func = meta['transform']

            def get_value(est, nord):
                x, y = ~func * (est, nord)
                x = floor(x)
                y = floor(y)
                return matrix[y, x]

        get_value_vector = np.vectorize(get_value)

        df[file] = get_value_vector(df['East'], df['North'])
    return df

In [None]:
df_top10_withinfo = main(df_top10, 'top10')
df_top10_withinfo.head()

In [None]:
df_balanced_withinfo = main(df_balanced, 'balanced')
df_balanced_withinfo.head()

## Keras API:

The API can help create great deep learning models

https://keras.io/guides/functional_api/

Our model will have mutiple inputs. 

In [None]:
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix
import tensorflow as tf
from tensorflow.keras import layers, Model, utils
from tensorflow.keras.layers import BatchNormalization, Conv2D, MaxPooling2D, Activation, Dropout, Dense, Flatten, Input

In [None]:
def predict(df):
    df['Location'] = pd.Categorical(df['Location'])
    df['Location_code'] = df['Location'].cat.codes
    city_names = df['Location'].cat.categories

    numeric_features = ['Mean', 'Stdev', 'Contrast', 'Correlation', 'Energy', 'Homogeneity', 'temperature', 'sunshine', 'rainfall']
    scaler = StandardScaler()
    df[numeric_features] = scaler.fit_transform(df[numeric_features])

    Numeric_train, Numeric_test = train_test_split(df[numeric_features], test_size=0.3, random_state=42)
    y_train, y_test = train_test_split(df['Location_code'], test_size=0.3, random_state=42)
    Image_string_train, Image_string_test = train_test_split(df[['Distribution']], test_size=0.3, random_state=42)

    y_train = np.array(y_train.to_list())
    y_test = np.array(y_test.to_list())

    Numeric_train = Numeric_train.to_numpy()
    Numeric_test = Numeric_test.to_numpy()

    def scale_each_channel(image_str):
        array = np.array(eval(image_str)) 
        scaled_array = np.zeros_like(array, dtype=np.float32)
        for channel in range(array.shape[1]):
            channel_data = array[:, channel]
            scaled_array[:, channel] = (channel_data - np.min(channel_data)) / (np.max(channel_data) - np.min(channel_data))
        return scaled_array

    Image_train = []
    for i in Image_string_train['Distribution'].to_numpy():
        scaled_array = scale_each_channel(i)
        Image_train.append(scaled_array)
    Image_train = np.array(Image_train)

    Image_test = []
    for i in Image_string_test['Distribution'].to_numpy():
        scaled_array = scale_each_channel(i)
        Image_test.append(scaled_array)
    Image_test = np.array(Image_test)

    numeric_input = Input(shape=(9,), name='Numeric')
    image_input = Input(shape=(256, 4), name='Image')

    numeric_dense = Dense(64, activation='relu')(numeric_input)

    image_flatten = Flatten()(image_input)
    image_dense = Dense(128, activation='relu')(image_flatten)
    image_drop = Dropout(0.5)(image_dense)
    image_dense = Dense(128, activation='relu')(image_drop)

    x = layers.concatenate([numeric_dense, image_dense])

    xdense = Dense(64, activation='relu')(x)
    output = Dense(10, activation='softmax')(xdense)

    model = Model(inputs=[numeric_input, image_input], outputs=output)

    model.summary()

    model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
                metrics=['accuracy'])

    model.fit(
        {"Numeric": Numeric_train, "Image": Image_train},
        y_train, epochs=10, batch_size=32)

    test_loss, test_acc = model.evaluate([Numeric_test, Image_test], y_test, verbose=2)
    print('\nTest accuracy:', test_acc)

    y_pred_probs = model.predict([Numeric_test, Image_test])
    y_pred = np.argmax(y_pred_probs, axis=1)

    cm = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=city_names, yticklabels=city_names)
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()

In [None]:
predict(df_top10)

In [None]:
predict(df_top10_withinfo)

In [None]:
predict(df_balanced)

In [None]:
predict(df_balanced_withinfo)