In [1]:
import geopandas as gpd
from shapely.geometry import Point
import whitebox
import rasterio as rio
from rasterio.warp import calculate_default_transform, reproject
from rasterio.enums import Resampling
from skyfield.api import Topos, load, utc
from datetime import datetime, timedelta
import numpy as np, math
from pyproj import Geod, CRS

# Initialize the geodetic calculator with the WGS84 ellipsoid
geod = Geod(ellps='WGS84')

In [2]:
def reproject_raster(input_raster, output_raster, dst_crs):
    with rio.open(input_raster) as src:
        transform, width, height = calculate_default_transform(
            src.crs, dst_crs, src.width, src.height, *src.bounds)
        kwargs = src.meta.copy()
        kwargs.update({
            'crs': dst_crs,
            'transform': transform,
            'width': width,
            'height': height
        })

        with rio.open(output_raster, 'w', **kwargs) as dst:
            for i in range(1, src.count + 1):
                reproject(
                    source=rio.band(src, i),
                    destination=rio.band(dst, i),
                    src_transform=src.transform,
                    src_crs=src.crs,
                    dst_transform=transform,
                    dst_crs=dst_crs,
                    resampling=Resampling.nearest)

In [3]:
if __name__ == '__main__':
    input_raster = 'DEM/dem_resample.tif'
    output_raster = 'DEM/dem_reprojected.tif'
    dst_epsg = 4326  # Destinazione EPSG
    
    # Creare un oggetto CRS per l'EPSG di destinazione
    dst_crs = CRS.from_epsg(dst_epsg)
    
    reproject_raster(input_raster, output_raster, dst_crs)

In [4]:
wbt = whitebox.WhiteboxTools()

# Percorsi ai file
dem_file_path = r'E:\Dati analisi\DEM\dem_reprojected.tif'  # Assicurati di usare un raw string o di fare escape dei backslash
output_file_path = r'E:\Dati analisi\DEM\viewshed.tif'
observer_point_shp = r'E:\Dati analisi\DEM\observer_location.shp'

# Imposta le coordinate dell'osservatore e altri parametri
observer_x = 44.6547465    # Coordinata X dell'osservatore (adattalo al tuo CRS)
observer_y = 9.3065963 # Coordinata Y dell'osservatore (adattalo al tuo CRS)
observer_z = 1.75  # Altezza dell'osservatore sopra il terreno in metri

# Crea un GeoDataFrame per il punto dell'osservatore
gdf = gpd.GeoDataFrame(
    {'id': [1], 'geometry': [Point(observer_y, observer_x)], 'Z': [observer_z]},
    crs="EPSG:4326"  # Sostituisci con il codice EPSG adeguato per il tuo CRS
)

# Salva il GeoDataFrame come shapefile
gdf.to_file(observer_point_shp)

try:
    # Esegui la viewshed analysis
    wbt.viewshed(
        dem_file_path,
        observer_point_shp,
        output_file_path,
        height=observer_z,
    )
    print("Viewshed analysis completed successfully.")
except Exception as e:
    print(f"Error during viewshed analysis: {e}")

.\whitebox_tools.exe --run="Viewshed" --dem='E:\Dati analisi\DEM\dem_reprojected.tif' --stations='E:\Dati analisi\DEM\observer_location.shp' --output='E:\Dati analisi\DEM\viewshed.tif' --height=1.75 -v --compress_rasters=False

****************************
* Welcome to Viewshed      *
* Powered by WhiteboxTools *
* www.whiteboxgeo.com      *
****************************
Reading data...
Locating view stations: 0%
Station 1 of 1
Calculating view angle (Station 1 of 1): 1%
Calculating view angle (Station 1 of 1): 2%
Calculating view angle (Station 1 of 1): 3%
Calculating view angle (Station 1 of 1): 4%
Calculating view angle (Station 1 of 1): 5%
Calculating view angle (Station 1 of 1): 6%
Calculating view angle (Station 1 of 1): 7%
Calculating view angle (Station 1 of 1): 8%
Calculating view angle (Station 1 of 1): 9%
Calculating view angle (Station 1 of 1): 10%
Calculating view angle (Station 1 of 1): 11%
Calculating view angle (Station 1 of 1): 12%
Calculating view angle (Station 1 of 1

In [5]:
# Load GNSS satellite data and the timescale
stations_url = 'http://celestrak.com/NORAD/elements/gnss.txt'
satellites = load.tle_file(stations_url)
ts = load.timescale()

[#################################] 100% gnss.txt


In [6]:
# Define the time moments of interest
start_time = datetime(2023, 11, 7, 0, 0, 0, tzinfo=utc)
end_time = datetime(2023, 11, 8, 0, 0, 0, tzinfo=utc)
times = [ts.utc(start_time + timedelta(minutes=i)) for i in range(int((end_time - start_time).total_seconds() // 60))]

In [7]:
# Load the DEM and viewshed raster datasets
with rio.open(dem_file_path) as dem_dataset:
    dem_array = dem_dataset.read(1)
    dem_affine_transform = dem_dataset.transform
    
    # Assuming observer coordinates are in geospatial coordinates, we convert them to pixel coordinates
    observer_row, observer_col = rio.transform.rowcol(dem_affine_transform, observer_y, observer_x)

# The observer's elevation is the elevation at their location from the DEM plus their height
observer_elevation = dem_array[observer_row, observer_col] + observer_z

In [8]:
observer = Topos(latitude_degrees=observer_y, longitude_degrees=observer_x, elevation_m=observer_elevation)

In [9]:
# Load the viewshed array
with rio.open(output_file_path) as viewshed_dataset:
    viewshed_array = viewshed_dataset.read(1)
    affine_transform = viewshed_dataset.transform

# Find the visible pixels (value = 1) in the viewshed array
visible_pixels = np.argwhere(viewshed_array == 1)

In [10]:
def calculate_horizon_angle(observer_x, observer_y, observer_elevation, pixel_x, pixel_y, pixel_elevation, affine, geod):
    # Convert pixel coordinates to geographic coordinates
    geo_y, geo_x = rio.transform.xy(affine, pixel_y, pixel_x, offset='center')
    
    # Calculate the horizontal geodesic distance
    _, _, horizontal_distance = geod.inv(observer_x, observer_y, geo_x, geo_y)
    
    # Calculate the vertical distance (elevation difference)
    vertical_distance = pixel_elevation - observer_elevation
    
    # Calculate the horizon angle
    horizon_angle = math.atan2(vertical_distance, horizontal_distance)
    horizon_angle_degrees = math.degrees(horizon_angle)
    
    return horizon_angle_degrees

In [11]:
%%time 

# Initialize the list to store the number of visible satellites
visible_satellite_counts = []

# Initialize the list to store the number of visible satellites
visible_satellite_counts = []

# Loop through each time and satellite to determine visibility
for time in times:
    visible_satellites = set()  # Use a set to store unique satellite IDs
    
    for sat in satellites:
        difference = sat - observer
        topocentric = difference.at(time)
        alt, az, distance = topocentric.altaz()

        # Skip if the satellite's elevation angle is below the horizon
        if alt.degrees <= 0:
            continue

        # Check visibility for each pixel within the viewshed
        for pixel in visible_pixels:
            row, col = pixel
            pixel_elevation = dem_array[row, col]
            
            # Call the function to get the horizon angle
            horizon_angle = calculate_horizon_angle(
                observer_x, observer_y, observer_elevation, col, row, pixel_elevation, affine_transform, geod
            )

            # Check if the satellite is above the horizon angle for the pixel
            if alt.degrees > horizon_angle:
                visible_satellites.add(sat)  # Add the satellite ID to the set
                # Do not break since we want to check all pixels for visibility
    
    visible_satellite_counts.append(len(visible_satellites))  # Store the count of unique satellites

# Determine the minimum number of visible satellites
min_visible = min(visible_satellite_counts)
print(f"The minimum number of visible GNSS satellites during the day is: {min_visible}")

KeyboardInterrupt: 

## Calcolo raster copertura

In [11]:
import numpy as np
import rasterio as rio
import math
from skyfield.api import Topos, load
from datetime import datetime, timedelta
from pytz import utc
from geographiclib.geodesic import Geodesic

In [14]:
# Carica il DEM e inizializza i dati necessari
dem_file_path = 'E:\Dati analisi\DEM\dem_reprojected.tif'
viewshed_file_path = 'E:\\Dati analisi\\DEM\\viewshed.tif'

# Carica i dati GNSS e inizializza la scala temporale
stations_url = 'http://celestrak.com/NORAD/elements/gnss.txt'
satellites = load.tle_file(stations_url)
ts = load.timescale()

# Definisci i momenti temporali di interesse
start_time = datetime(2023, 11, 7, 0, 0, 0, tzinfo=utc)
end_time = datetime(2023, 11, 8, 0, 0, 0, tzinfo=utc)
times = [ts.utc(start_time + timedelta(minutes=i)) for i in range(int((end_time - start_time).total_seconds() // 60))]

geod = Geodesic.WGS84  # Inizializza l'oggetto per il calcolo geodetico

# Carica il DEM e i dataset del viewshed
with rio.open(dem_file_path) as dem_dataset:
    dem_array = dem_dataset.read(1)
    dem_affine_transform = dem_dataset.transform
    observer_row, observer_col = rio.transform.rowcol(dem_affine_transform, observer_y, observer_x)
    observer_elevation = dem_array[observer_row, observer_col] + observer_z

# Carica l'array viewshed
with rio.open(viewshed_file_path) as viewshed_dataset:
    viewshed_array = viewshed_dataset.read(1)
    viewshed_affine_transform = viewshed_dataset.transform

# Trova i pixel visibili (valore = 1) nell'array del viewshed
visible_pixels = np.argwhere(viewshed_array == 1)

def calculate_distance_and_horizon_angle(pixel_x, pixel_y, observer_x, observer_y, pixel_elevation, observer_elevation, affine):
    # Converti le coordinate del pixel in coordinate geografiche
    geo_y, geo_x = rio.transform.xy(affine, pixel_y, pixel_x, offset='center')
    
    # Calcola la distanza geodetica orizzontale
    g = geod.Inverse(observer_y, observer_x, geo_y, geo_x)
    azimuth1, azimuth2, horizontal_distance = g['azi1'], g['azi2'], g['s12']
    
    # Calcola la distanza verticale (differenza di elevazione)
    vertical_distance = pixel_elevation - observer_elevation
    
    # Calcola la distanza 3D usando il teorema di Pitagora
    distance_3d = math.sqrt(horizontal_distance ** 2 + vertical_distance ** 2)
    
    # Calcola l'angolo all'orizzonte
    horizon_angle = math.atan2(vertical_distance, horizontal_distance)
    horizon_angle_degrees = math.degrees(horizon_angle)
    
    return distance_3d, horizon_angle_degrees

# Inizializza la matrice per immagazzinare il conteggio minimo di satelliti visibili per ogni pixel
min_visible_matrix = np.full(dem_array.shape, np.nan)

# Loop sui pixel visibili dal punto di osservazione
for pixel in visible_pixels:
    row, col = pixel
    pixel_elevation = dem_array[row, col] + observer_z  # Aggiungi l'altezza dell'osservatore se necessario
    pixel_distance, horizon_angle = calculate_distance_and_horizon_angle(col, row, observer_x, observer_y, pixel_elevation, observer_elevation, dem_affine_transform)
    visible_satellite_counts = []

    # Calcola le coordinate dell'osservatore per il pixel attuale
    observer_lat, observer_lon = rio.transform.xy(dem_affine_transform, row, col, offset='center')
    observer = Topos(latitude_degrees=observer_lat, longitude_degrees=observer_lon, elevation_m=pixel_elevation)

    # Loop su ogni tempo e satellite per determinare la visibilità
    for time in times:
        visible_satellites = 0
        for sat in satellites:
            difference = sat - observer
            topocentric = difference.at(time)
            alt, az, distance = topocentric.altaz()

            # Controlla se il satellite è sopra l'angolo dell'orizzonte del pixel e l'elevazione è maggiore di 10 gradi
            if alt.degrees > horizon_angle and alt.degrees > 10:
                visible_satellites += 1

        visible_satellite_counts.append(visible_satellites)

    # Trova il numero minimo di satelliti visibili in tutti i tempi considerati
    min_visible = min(visible_satellite_counts) if visible_satellite_counts else np.nan
    min_visible_matrix[row, col] = min_visible

# Ora `min_visible_matrix` contiene il numero minimo di satelliti visibili per ogni pixel nel viewshed

# Salva `min_visible_matrix` in un nuovo raster
output_min_visible_path = 'path_to_save_min_visible.tif'
with rio.open(viewshed_file_path) as src:
    profile = src.profile
    profile.update(dtype=rio.float32, nodata=np.nan)

    with rio.open(output_min_visible_path, 'w', **profile) as dst:
        dst.write(min_visible_matrix, 1)

print("Il file con il numero minimo di satelliti GNSS visibili è stato salvato.")

Il file con il numero minimo di satelliti GNSS visibili è stato salvato.
