In [80]:
from PIL import Image
import numpy as np
import pickle
import rioxarray

In [81]:
training_file = "super_male_uzemie.tif" #'./nosync.nosync/slope_md.tif'
test_file = "uzemicko.tif" #'./nosync.nosync/slope_whole.tif'
shape_file = "slope_deformations_reference_polygons/reference_data_existing_landslides_polygons.shp" #"Reference data/slope_deformations_reference_polygons/reference_data_existing_landslides_polygons.shp"

In [82]:
im = Image.open(training_file)
slopearray = np.array(im)

Step1: Data extraction

In [83]:
training_data = None
try:
    #avoid extracting data if already extracted
    #training_data.pickle is the extracted data from previous computation
    with open('training_data.pickle', 'rb') as handle:
        training_data = pickle.load(handle)
except:
    training_data = rioxarray.open_rasterio(training_file)
    df = training_data.to_dataframe(name="slope")

    from shapely.geometry import Point
    from shapely.geometry.polygon import Polygon
    import shapefile

    polygons = []
    sf = shapefile.Reader(shape_file)
    shapes = sf.shapes()
    rectangles = []

    for shape in shapes:
        polygons.append(Polygon(shape.points))
        rectangles.append(shape.bbox)

    #here we traverse through each polygon and remember all the points that are landslides
    #and are inside our training_data
    is_landslide = set()
    for i in range(len(polygons)):
        rectangle = rectangles[i]
        low_x, low_y, high_x, high_y = rectangle[0], rectangle[1], rectangle[2], rectangle[3]
        small_df = df.query('y >= @low_y and y <= @high_y and x >= @low_x and x <= @high_x')
        if(len(small_df) > 0):
            for _, y, x in small_df.index:
                if polygons[i].contains(Point(x, y)):
                    is_landslide.add((x, y))

    bools = np.array([1 if (x, y) in is_landslide else 0 for _, y, x in df.index])
    df.insert(loc = 0, column='is_landslide', value=bools)
    training_data = df.reset_index().drop(columns=['band', 'spatial_ref'])

    with open('training_data.pickle', 'wb') as handle:
        pickle.dump(training_data, handle)

#now training data consists of y, x coordinates, 
#whether the given coordinate is a landslide or not, and a slope.
print(training_data)

                   y            x  is_landslide      slope
0      -1.227044e+06 -255576.4999             0   2.306876
1      -1.227044e+06 -255575.4999             0   5.372233
2      -1.227044e+06 -255574.4999             0   6.274973
3      -1.227044e+06 -255573.4999             0   4.717222
4      -1.227044e+06 -255572.4999             0   5.272253
...              ...          ...           ...        ...
518077 -1.227517e+06 -254488.4999             1   2.141630
518078 -1.227517e+06 -254487.4999             1  19.460506
518079 -1.227517e+06 -254486.4999             1  30.893122
518080 -1.227517e+06 -254485.4999             1  14.834254
518081 -1.227517e+06 -254484.4999             1   6.218382

[518082 rows x 4 columns]


Step2: learning our model

In [84]:
def preprocess_numpy_array(coordinates, original):
    return coordinates['is_landslide'].to_numpy().reshape(original.shape[0], original.shape[1])

In [85]:
X_test = list()
Y_test = list()

size_of_square = 100
overlap = 30

np_coords: np.ndarray = preprocess_numpy_array(training_data, slopearray)
for i in range(0, slopearray.shape[0] - size_of_square,overlap):
    for j in range(0, slopearray.shape[1] - size_of_square, overlap):
        current_array: np.ndarray = slopearray[i:i+size_of_square, j:j+size_of_square].flatten()
        X_test.append(current_array)
        Y_test.append(np.mean(np_coords[i:i+size_of_square, j:j+size_of_square]))

In [86]:
from sklearn.linear_model import LinearRegression

x = np.array(X_test)
y = np.array(Y_test)

model = LinearRegression().fit(x, y)

Step3: using our model on the test data

In [87]:
#get coordinates from i, j position in the np.array
def get_coordinates(i, j, coordinates, original):
    top_left = coordinates.loc[i * original.shape[1] + j]
    top_right = coordinates.loc[i * original.shape[1] + j + size_of_square]
    bottom_left = coordinates.loc[(i+size_of_square) * original.shape[1] + j]
    bottom_right = coordinates.loc[(i + size_of_square) * original.shape[1] + j + size_of_square]
    return [top_left, top_right, bottom_left, bottom_right]

In [88]:
whole_slopes = np.array(Image.open(test_file))
result = []

test_data = rioxarray.open_rasterio(test_file)
test_data = test_data.to_dataframe(name="slope").reset_index().drop(columns=['band', 'spatial_ref', 'slope'])

for i in range(0, whole_slopes.shape[0] - size_of_square,overlap):
    for j in range(0, whole_slopes.shape[1] - size_of_square, overlap):
        current_array: np.ndarray = whole_slopes[i:i+size_of_square, j:j+size_of_square].flatten()
        coordinates = get_coordinates(i, j, test_data, whole_slopes)
        coordinates.append(model.predict(np.array([current_array])))
        result.append(coordinates)

result = np.array(result, dtype=object)

Step4: writing our predictions so we can see them in QGIS

In [89]:
MAGIC_CONSTANT = 0.8
import shapefile
w = shapefile.Writer('write_rec/outputik', shapefile.POLYGON)
polygons_to_draw = []
for record in result:
    score = record[4][0] 
    if score > MAGIC_CONSTANT:
        left_top_x, left_top_y = record[0]['x'], record[0]['y']
        right_top_x, right_top_y = record[1]['x'], record[1]['y']
        left_bottom_x, left_bottom_y = record[2]['x'], record[2]['y']
        right_bottom_x, right_bottom_y = record[3]['x'], record[3]['y']
        polygons_to_draw.append([
            [left_bottom_x, left_bottom_y], [left_top_x, left_top_y],
            [right_top_x, right_top_y], [right_bottom_x, right_bottom_y]
        ])

w.field('anotherec')
w.record('polygon')
w.poly(polygons_to_draw)
w.close()