-
Notifications
You must be signed in to change notification settings - Fork 4
/
raster.py
117 lines (96 loc) 路 3.68 KB
/
raster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import logging
from pathlib import Path
from typing import List, Tuple
import numpy as np
import numpy.typing as npt
import rasterio as rio
from haversine import haversine
from rasterio.io import DatasetReader
from rasterio.mask import mask
from rasterio.merge import merge
from rasterio.windows import Window
from mapa.utils import path_to_clipped_tiff, path_to_merged_tiff
log = logging.getLogger(__name__)
def clip_tiff_to_bbox(input_tiff: Path, bbox_geometry: dict, bbox_hash: str, cache_dir: Path) -> Path:
log.debug("馃敧 clipping region of interest...")
data = rio.open(input_tiff)
out_img, out_transform = mask(data, shapes=[bbox_geometry], crop=True)
out_meta = data.meta.copy()
out_meta.update(
{
"driver": "GTiff",
"height": out_img.shape[1],
"width": out_img.shape[2],
"transform": out_transform,
"crs": data.crs,
}
)
clipped_tiff = path_to_clipped_tiff(bbox_hash, cache_dir)
with rio.open(clipped_tiff, "w", **out_meta) as file:
file.write(out_img)
return clipped_tiff
def tiff_to_array(tiff: DatasetReader) -> np.ndarray:
array = tiff.read()
# drop higher dimension to get 2-dimensional (x * y) array
return array[0, :, :]
def remove_empty_first_and_last_rows_and_cols(array: npt.ArrayLike) -> np.ndarray:
# remove first and last cols in case of all zero
if not array[:, 0].any():
array = np.delete(array, (0), axis=1)
if not array[:, -1].any():
array = np.delete(array, (-1), axis=1)
# remove first and last rows in case of all zero
if not array[0].any():
array = np.delete(array, (0), axis=0)
if not array[-1].any():
array = np.delete(array, (-1), axis=0)
return array
def cut_array_to_square(array: npt.ArrayLike) -> np.ndarray:
rows, cols = array.shape
if rows > cols:
diff = rows - cols
# remove last n=diff rows
return array[:-diff, :]
elif cols > rows:
diff = cols - rows
return array[:, :-diff]
else:
return array
def _get_coordinate_of_pixel(row: int, col: int, tiff: DatasetReader) -> Tuple[float]:
meta = tiff.meta
window = Window(0, 0, meta["width"], meta["height"])
meta["transform"] = rio.windows.transform(window, tiff.transform)
x, y = rio.transform.xy(meta["transform"], row, col, offset="center")
# swap lat and lon because that is the order expected by haversine
return (y, x)
def determine_elevation_scale(tiff: DatasetReader, model_size: int) -> float:
array = tiff.read()[0, :, :]
_, cols = array.shape
# get lat lon coordinate of top left and top right pixel
top_left_coor = _get_coordinate_of_pixel(0, 0, tiff)
top_right_coor = _get_coordinate_of_pixel(0, cols, tiff)
# get distance in meter between the two coordinates
distance = haversine(top_left_coor, top_right_coor, unit="m")
# to find out what 1 meter in reality corresponds to in the model, we need to divide the model size by the distance
one_meter_in_model = model_size / distance
return one_meter_in_model
def merge_tiffs(tiffs: List[Path], bbox_hash: str, cache_dir: Path) -> Path:
datasets = []
for tiff in tiffs:
data = rio.open(tiff)
datasets.append(data)
mosaic, out_trans = merge(datasets)
out_meta = datasets[0].meta.copy()
out_meta.update(
{
"driver": "GTiff",
"height": mosaic.shape[1],
"width": mosaic.shape[2],
"transform": out_trans,
"crs": data.crs,
}
)
tiff = path_to_merged_tiff(bbox_hash, cache_dir)
with rio.open(tiff, "w", **out_meta) as dest:
dest.write(mosaic)
return tiff