In [1]:
import sys
import rasterio
import time
from pvlib import solarposition
sys.path.append('/home/potzschf/repos/')
from helperToolz.helpsters import *
from helperToolz.dicts_and_lists import *
from helperToolz.guzinski import growingSeasonChecker

In [2]:
tile = 'X0070_Y0044'
year = 2019
svf = 1
albedo = 0.2

# File paths for slope and aspect rasters (in degrees)
tau_path = '/data/Aldhani/eoagritwin/et/Auxiliary/ERA5/tiff/low_res/tau_clear/'
slope_path = f'/data/Aldhani/eoagritwin/et/Auxiliary/DEM/Force_Tiles/SLOPE/SLOPE_{tile}.tif'
aspect_path = f'/data/Aldhani/eoagritwin/et/Auxiliary/DEM/Force_Tiles/ASPECT/ASPECT_{tile}.tif'
dem_path = f'/data/Aldhani/eoagritwin/et/Auxiliary/DEM/Force_Tiles/DEM/DEM_{tile}.tif'
lat_path = f'/data/Aldhani/eoagritwin/et/Auxiliary/DEM/Force_Tiles/LAT/Latitude_{tile}.tif'
lon_path = f'/data/Aldhani/eoagritwin/et/Auxiliary/DEM/Force_Tiles/LON/Longitude_{tile}.tif'
acq_times = getFilelist(f'/data/Aldhani/eoagritwin/et/Sentinel3/LST/LST_values/Acq_time/int_format/{year}', '.tif')
# acq_time_list = [file for file in acq_times if not any(substr in file for substr in ['readable', 'maxLST', 'order'])]
acq_time_list = [file for file in acq_times if not any(substr in file for substr in ['readable', 'minVZA', 'order'])]

trash = '/data/Aldhani/eoagritwin/et/Auxiliary/trash/'
stor_dir = f'/data/Aldhani/eoagritwin/et/Auxiliary/DEM/Force_Tiles/INCIDENCE2/maxLST/{year}/{tile}/'
os.makedirs(stor_dir, exist_ok=True)

In [None]:
for file in acq_time_list:
    month = file.split('.tif')[0].split('_')[-1]
    if growingSeasonChecker(int(MONTH_TO_02D[month])):
        # load all data and convert if needed
        with rasterio.open(slope_path) as slope_src:
            slope = slope_src.read(1)  # Read first band
            
        with rasterio.open(aspect_path) as aspect_src:
            aspect = aspect_src.read(1)

        with rasterio.open(lon_path) as lon_src:
            lon = lon_src.read(1)

        with rasterio.open(lat_path) as lat_src:
            lat = lat_src.read(1)

        with rasterio.open(dem_path) as dem_src:
            dem = dem_src.read(1)

        # Replace no data or negative values with nan if needed
        slope = np.where(slope < 0, np.nan, slope)
        aspect = np.where(aspect < 0, np.nan, aspect)

        # Convert degrees to radians for trigonometric calculations
        slope_rad = np.deg2rad(slope)
        aspect_rad = np.deg2rad(aspect)

        # warp S3 dates into tile and read-in
        warped_ds = warp_raster_to_reference(file, reference_path=slope_path, output_path='MEM', resampling='near')
        days = warped_ds.RasterCount

        # warp TAU Clear into tile and read-in
        t_file = f"{tau_path}Tau_clear_{year}_{str(int(f'{MONTH_TO_02D[month]}'))}.tif"
        tau_warped = warp_raster_to_reference(t_file, reference_path=slope_path, output_path='MEM', resampling='near')
        
        #for day in range(days):
        day = 0
        if os.path.exists(f'{stor_dir}INCIDENCE_{tile}_{year}_{month}_{(day+1):02d}.tif'):
            t = time.localtime()
            ti = time.strftime("%H:%M:%S", t)
            # print(f"already exists - next one at {ti}")
        else:
            try:
                
                print(f'Working on tile {tile} on {(day+1):02d}/{month}/{year}')

                time_warp = warped_ds.GetRasterBand(day+1).ReadAsArray()
            
                # Flatten Unix time and convert to datetime
                timestamps_flat = pd.to_datetime(time_warp.ravel(), unit='s', utc=True)

                # Flatten lat/lon and DEM
                lat_flat = lat.ravel()
                lon_flat = lon.ravel()
                dem_flat = dem.ravel()
                # Compute solar position
                solpos = solarposition.get_solarposition(time=timestamps_flat, latitude=lat_flat, longitude=lon_flat, altitude=dem_flat)

                # Convert degrees to radians and reshape to original 2D
                zenith_rad = np.deg2rad(solpos['zenith'].values).reshape(time_warp.shape)
                azimuth_rad = np.deg2rad(solpos['azimuth'].values).reshape(time_warp.shape)

                # --- Clear-sky horizontal irradiance ---
                # get tau at acquisition time
                dat_tau, time_tau = stackReader(tau_warped, bands=True)
                #### get Tau Clear (interpolated from both modelled values that are closest to LST)
                taus = []

                for t in np.unique(time_warp):
                    if t == 0:
                        continue
                    dt = pd.to_datetime(t, unit='s', tz='UTC')
                    dt_hour = dt.floor('h')
                    h_diff = dt_hour.hour % 3
                    if h_diff:
                        dt_floored = dt_hour - pd.Timedelta(hours=dt_hour.hour % 3)
                    else:
                        dt_floored  = dt_hour
                    for band_low, tau_time in enumerate(time_tau): # the two neighbouting ERA5 air temp values
                        if dt_floored == pd.Timestamp(tau_time): # this will get the 3-hourly value before the acquisition
                            
                            minutes = pd.Timestamp(dt).minute

                            band_up = band_low + 1 # this get the 3-hourly value after the acquisition

                            tau_intpol = dat_tau[:,:,band_low] - (dat_tau[:,:,band_low] - dat_tau[:,:,band_up]) * (np.array((minutes + h_diff*60), dtype=np.float32) / 180).reshape(1,1) # add one dimension for broadcasting


                            tau_at_acq = np.where(time_warp == t, tau_intpol, np.nan)
                            print(tau_at_acq.shape)
                            taus.append(tau_at_acq)


                            break

                tau_at_doy = np.nansum(np.dstack(taus), axis=2)
                tau_at_doy[tau_at_doy==0] = np.nan

                ghi_flat = SOLAR_CONST * np.cos(zenith_rad) * tau_at_doy
                ghi_flat = np.maximum(ghi_flat, 0)

                # Elevation correction
                ghi_elev = ghi_flat * (1 + 0.06 * (dem / 1000.0))

                # --- Direct/diffuse split (Erbs-like) ---
                kt = ghi_elev / (SOLAR_CONST * np.cos(zenith_rad) + 1e-6)
                diffuse_fraction = np.where(
                    kt <= 0.22, 1 - 0.09 * kt,
                    np.where(
                        kt <= 0.80,
                        0.9511 - 0.1604*kt + 4.388*kt**2 - 16.638*kt**3 + 12.336*kt**4,
                        0.165
                    )
                )
                diffuse_fraction = np.clip(diffuse_fraction, 0, 1)

                diffuse_flat = ghi_elev * diffuse_fraction
                direct_flat = ghi_elev - diffuse_flat

                if era5_ghi is not None:
                    cloud_factor = era5_ghi / (ghi_elev + 1e-6)
                    cloud_factor = np.clip(cloud_factor, 0, 1.2)  # cap overshoots
                    diffuse_flat *= cloud_factor
                    direct_flat *= cloud_factor
                    ghi_elev = diffuse_flat + direct_flat  # recombine

                # --- Terrain incidence correction ---
                cos_incidence = (
                    np.cos(zenith_rad) * np.cos(slope_rad) +
                    np.sin(zenith_rad) * np.sin(slope_rad) * np.cos(azimuth_rad - aspect_rad)
                )
                cos_incidence = np.maximum(cos_incidence, 0)

                direct_tilted = direct_flat * cos_incidence / np.maximum(np.cos(zenith_rad), 1e-6)
                diffuse_tilted = diffuse_flat * svf * (1 + np.cos(slope_rad)) / 2
                reflected = ghi_elev * albedo * (1 - np.cos(slope_rad)) / 2

                global_irradiance_tilted = direct_tilted + diffuse_tilted + reflected

                
            except Exception as e:
                print(e)
                t = time.localtime()
                ti = time.strftime("%H:%M:%S", t)
                print(f"thrown at {ti}")
                continue
                    

Working on tile X0070_Y0044 on 01/July/2019


In [None]:
def compute_tilted_irradiance_cloudy(
    aod550,
    tcwv, elevation,
    solar_zenith_deg, slope_deg, aspect_deg, solar_azimuth_deg,
    era5_ghi=None,
    era5_air_temp=None,  # New parameter (°C)
    svf=1.0, albedo=0.2
):
    """
    Compute global tilted irradiance (GTI, W/m²)
    under clear-sky or cloudy conditions.
    
    If ERA5_GHI is provided, applies a cloudiness correction
    as described in literature (ratio method).
    """


    # --- Clear-sky horizontal irradiance ---
    aod700 = aod550 * (700 / 550) ** -1.3
    tcwv_cm = tcwv / 10.0
    tau_clear = np.exp(-0.9 * aod700) * np.exp(-0.15 * tcwv_cm)
    ghi_flat = SOLAR_CONST * np.cos(zenith_rad) * tau_clear
    ghi_flat = np.maximum(ghi_flat, 0)

    # Elevation correction
    ghi_elev = ghi_flat * (1 + 0.06 * (elevation / 1000.0))

    # --- Direct/diffuse split (Erbs-like) ---
    kt = ghi_elev / (SOLAR_CONST * np.cos(zenith_rad) + 1e-6)
    if kt <= 0.22:
        diffuse_fraction = 1 - 0.09 * kt
    elif kt <= 0.80:
        diffuse_fraction = 0.9511 - 0.1604*kt + 4.388*kt**2 - 16.638*kt**3 + 12.336*kt**4
    else:
        diffuse_fraction = 0.165
    diffuse_fraction = np.clip(diffuse_fraction, 0, 1)

    diffuse_flat = ghi_elev * diffuse_fraction
    direct_flat = ghi_elev - diffuse_flat

    # --- Cloud correction ---
    if era5_ghi is not None:
        cloud_factor = era5_ghi / (ghi_elev + 1e-6)
        cloud_factor = np.clip(cloud_factor, 0, 1.2)  # cap overshoots
        diffuse_flat *= cloud_factor
        direct_flat *= cloud_factor
        ghi_elev = diffuse_flat + direct_flat  # recombine

    # --- Terrain incidence correction ---
    cos_incidence = (
        np.cos(zenith_rad) * np.cos(slope) +
        np.sin(zenith_rad) * np.sin(slope) * np.cos(azimuth_rad - aspect)
    )
    cos_incidence = np.maximum(cos_incidence, 0)

    direct_tilted = direct_flat * cos_incidence / np.maximum(np.cos(zenith_rad), 1e-6)
    diffuse_tilted = diffuse_flat * svf * (1 + np.cos(slope)) / 2
    reflected = ghi_elev * albedo * (1 - np.cos(slope)) / 2

    return direct_tilted + diffuse_tilted + reflected