In [None]:
import ee
import geemap
import math
# geemap.set_proxy(port=7788)
ee.Initialize()

In [None]:
geom_shp = r"yh.shp"
roi = geemap.shp_to_ee(geom_shp).geometry()
Map = geemap.Map()
Map.centerObject(roi, 9)
Map.addLayer(roi, {'color': '#808080', 'opacity': 0.1}, '延河流域')
buffered_roi = roi.buffer(500)
black_border = ee.Image().paint(buffered_roi, 1, 2)
Map.addLayer(black_border, {'palette': '#000000'}, '黑色边框')
Map

In [None]:

dataset_name = 'ECMWF/ERA5/DAILY' 
variable_name = 'total_precipitation'  
def calculate_daily_erosion_index(image):
    precipitation = image.select([variable_name])
    precipitation_mm = precipitation.multiply(1000)
    masked_precipitation = precipitation_mm.where(precipitation_mm.lt(12), 0)
    erosion_index = masked_precipitation.pow(1.7265)
    return erosion_index

era5_collection = ee.ImageCollection(
    dataset_name).filterDate('1990-07-08', '2020-07-08')

monthly_Erosion_dict = {}


for month in range(1, 13):
    for half in [(1, 15), (16, 31)]:
        half_month_data = era5_collection.filter(ee.Filter.calendarRange(month, month, 'month')) \
                                         .filter(ee.Filter.calendarRange(*half, 'DAY_OF_MONTH')) \
                                         .map(calculate_daily_erosion_index)
        half_month_index = half_month_data.sum().divide(30)
        coefficient = 0.3937 if 5 <= month <= 9 else 0.3101
        key = f"R_{month*2 - 1 + half[0]//16}"
        monthly_Erosion_dict[key] = half_month_index.multiply(coefficient)

year_R_sum = ee.ImageCollection(list(monthly_Erosion_dict.values())).sum()

halfMoon_WR_dict = {}
for key, value in monthly_Erosion_dict.items():
    halfMoon_WR_dict[f"WR_{key[2:]}"] = value.divide(year_R_sum)

halfMoon_WR_dict

In [None]:
def maskL789sr(image):
    # Bit 0 - Fill
    # Bit 1 - Dilated Cloud
    # Bit 2 - Cirrus
    # Bit 3 - Cloud
    # Bit 4 - Cloud Shadow
    qaMask = image.select('QA_PIXEL').bitwiseAnd(int('11111', 2)).eq(0)

    saturationMask = image.select('QA_RADSAT').eq(0)

    opticalBands = image.select('SR_B.').multiply(0.0000275).add(-0.2)
    thermalBands = image.select('ST_B.*').multiply(0.00341802).add(149.0)

    return image.addBands(opticalBands, None, True) \
        .addBands(thermalBands, None, True) \
        .updateMask(qaMask) \
        .updateMask(saturationMask)


def maskS2clouds(image):
    qa = image.select('QA60')
    cloudBitMask = 1 << 10
    cirrusBitMask = 1 << 11
    mask = qa.bitwiseAnd(cloudBitMask).eq(0).And(
        qa.bitwiseAnd(cirrusBitMask).eq(0))
    return image.updateMask(mask).divide(10000) \
        .copyProperties(image, ["system:time_start"])

In [None]:
s2_date = (ee.ImageCollection('COPERNICUS/S2_SR_HARMONIZED')
           .filter(ee.Filter.calendarRange(2019, 2021, 'year'))
           .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 20))
           .filterBounds(roi)
           .map(maskS2clouds)  
           .map(lambda img: img.normalizedDifference(["B8", "B4"]).rename("NDVI")))  

s2_max = s2_date.max().clip(roi)


s2_me = s2_date.median().clip(roi)

dictPercent_max = s2_max.reduceRegion(
    ee.Reducer.percentile([5, 10, 15, 20, 25, 30, 35, 40,
                          45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95]),
    roi,  
    500   
)

dictPercent_me = s2_me.reduceRegion(
    ee.Reducer.percentile([5, 10, 15, 20, 25, 30, 35, 40,
                          45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95]),
    roi,  
    500   
)

In [None]:
NDVI_DICT = {}

def getMODIS_NDVI(month):
    datey = ee.ImageCollection('MODIS/MOD09GA_006_NDVI') \
        .filter(ee.Filter.calendarRange(2019, 2021, 'year'))

    for x, y in [[1, 15], [16, 31]]:
        dataset = datey.filter(ee.Filter.calendarRange(month, month, 'month')) \
            .filter(ee.Filter.calendarRange(x, y, 'DAY_OF_MONTH')).select('NDVI')
        NDVI_DICT.setdefault(
            f"MODIS_NDVI_{month*2-x%2}", dataset.max().clip(roi))
        NDVI_DICT.setdefault(f"MODIS_NDVI_{month*2-x%2}_num", dataset.size())
        
def getS2_NDVI(month):

    datey = (ee.ImageCollection(
        'COPERNICUS/S2_SR_HARMONIZED').filter(ee.Filter.calendarRange(2019, 2021, 'year')))
    for x, y in [[1, 15], [16, 31]]:
        dataset = (datey.filter(ee.Filter.calendarRange(month, month, 'month'))
                   .filter(ee.Filter.calendarRange(x, y, 'DAY_OF_MONTH'))
                   .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 20))
                   .filterBounds(roi)
                   .map(maskS2clouds)   
                   .map(lambda img: img.normalizedDifference(["B8", "B4"]).rename("NDVI")))  
        NDVI_DICT.setdefault(f"S2_NDVI_{month*2-x%2}", dataset.max().clip(roi))
        NDVI_DICT.setdefault(f"S2_NDVI_{month*2-x%2}_num", dataset.size())

def getl8_NDVI(month):
    datey = ee.ImageCollection('LANDSAT/LC08/C02/T1_L2') \
        .filter(ee.Filter.calendarRange(2019, 2021, 'year'))

    for x, y in [[1, 15], [16, 31]]:
        dataset = (datey.filter(ee.Filter.calendarRange(month, month, 'month'))
                   .filter(ee.Filter.calendarRange(x, y, 'DAY_OF_MONTH'))
                   .filter(ee.Filter.lt('CLOUD_COVER', 20))
                   .filterBounds(roi)
                   .map(maskL789sr)  
                   .map(lambda img: img.normalizedDifference(["SR_B5", "SR_B4"]).rename("NDVI")))  

        NDVI_DICT.setdefault(f"L8_NDVI_{month*2-x%2}", dataset.max().clip(roi))
        NDVI_DICT.setdefault(f"L8_NDVI_{month*2-x%2}_num", dataset.size())

def getl7_NDVI(month):
    datey = ee.ImageCollection('LANDSAT/LE07/C02/T1_L2') \
        .filter(ee.Filter.calendarRange(2019, 2021, 'year'))

    for x, y in [[1, 15], [16, 31]]:
        dataset = (datey.filter(ee.Filter.calendarRange(month, month, 'month'))
                   .filter(ee.Filter.calendarRange(x, y, 'DAY_OF_MONTH'))
                   .filter(ee.Filter.lt('CLOUD_COVER', 20))
                   .filterBounds(roi)
                   .map(maskL789sr)  
                   .map(lambda img: img.normalizedDifference(["SR_B4", "SR_B3"]).rename("NDVI")))  
        NDVI_DICT.setdefault(f"L7_NDVI_{month*2-x%2}", dataset.max().clip(roi))
        NDVI_DICT.setdefault(f"L7_NDVI_{month*2-x%2}_num", dataset.size())


In [None]:
for month in range(1, 13):
    getMODIS_NDVI(month)
    getS2_NDVI(month)
    getl8_NDVI(month)
    getl7_NDVI(month)

In [None]:
def s2mask(num):
    r1 = num + 1
    l1 = num - 1

    if l1 == 0:
        l1 = 24
    if r1 == 25:
        r1 = 1

    s2NDVI_MODIS_1 = NDVI_DICT[f'S2_NDVI_{l1}'].divide(
        NDVI_DICT[f'MODIS_NDVI_{l1}']).multiply(NDVI_DICT[f'MODIS_NDVI_{num}'])
    s2NDVI_MODIS_2 = NDVI_DICT[f'S2_NDVI_{r1}'].divide(
        NDVI_DICT[f'MODIS_NDVI_{r1}']).multiply(NDVI_DICT[f'MODIS_NDVI_{num}'])
    s2NDVI_MODIS = (s2NDVI_MODIS_1.add(s2NDVI_MODIS_2)).divide(2)

    s2NDVI_L7_1 = NDVI_DICT[f'S2_NDVI_{l1}'].divide(
        NDVI_DICT[f'L7_NDVI_{l1}']).multiply(NDVI_DICT[f'L7_NDVI_{num}'])
    s2NDVI_L7_2 = NDVI_DICT[f'S2_NDVI_{r1}'].divide(
        NDVI_DICT[f'L7_NDVI_{r1}']).multiply(NDVI_DICT[f'L7_NDVI_{num}'])
    s2NDVI_L7 = (s2NDVI_L7_1.add(s2NDVI_L7_2)).divide(2)

    s2NDVI_L8_1 = NDVI_DICT[f'S2_NDVI_{l1}'].divide(
        NDVI_DICT[f'L8_NDVI_{l1}']).multiply(NDVI_DICT[f'L8_NDVI_{num}'])
    s2NDVI_L8_2 = NDVI_DICT[f'S2_NDVI_{r1}'].divide(
        NDVI_DICT[f'L8_NDVI_{r1}']).multiply(NDVI_DICT[f'L8_NDVI_{num}'])
    s2NDVI_L8 = (s2NDVI_L8_1.add(s2NDVI_L8_2)).divide(2)

    # 根据不同的条件返回不同的NDVI图像(19个半月L8完全缺失)
    if num in [18, 20]:
        s2NDVI_L7a = NDVI_DICT[f'S2_NDVI_{num}'].unmask(s2NDVI_L7)
        return s2NDVI_L7a.unmask(s2NDVI_MODIS)
    elif num == 19:#(19个半月S2去云后数据不足仍存在大面积异常弃用)
        return s2NDVI_L7.unmask(s2NDVI_MODIS)
    else:
        s2NDVI_L8a = NDVI_DICT[f'S2_NDVI_{num}'].unmask(s2NDVI_L8)
        s2NDVI_L7a = s2NDVI_L8a.unmask(s2NDVI_L7)
        return s2NDVI_L7a.unmask(s2NDVI_MODIS)
    # try:
    #     s2NDVI_L8a = NDVI_DICT[f'S2_NDVI_{num}'].unmask(s2NDVI_L8)
    #     s2NDVI_L7a = s2NDVI_L8a.unmask(s2NDVI_L7)
    #     return s2NDVI_L7a.unmask(s2NDVI_MODIS)
    # except:
    #     try:
    #         s2NDVI_L7a = NDVI_DICT[f'S2_NDVI_{num}'].unmask(s2NDVI_L7)
    #         return s2NDVI_L7a.unmask(s2NDVI_MODIS)
    #     except:
    #         return NDVI_DICT[f'S2_NDVI_{num}'].unmask(s2NDVI_MODIS)

In [None]:
S2_NDVI = {}
for i in range(1, 25):
    S2_NDVI.setdefault(f"S2_NDVI_{i}", s2mask(i))

def getFVC(img):
    p10 = ee.Number(dictPercent_me.get("NDVI_p10"))
    p90 = ee.Number(dictPercent_max.get("NDVI_p90"))
    imgFVC = (img.subtract(p10)).divide(p90.subtract(p10))
    lower_ndvi_mask = img.lt(p10)
    higher_ndvi_mask = img.gt(p90)
    middle_ndvi_mask = ee.Image(1).subtract(
        higher_ndvi_mask).subtract(lower_ndvi_mask)
    imgFVC = (
        (lower_ndvi_mask.multiply(0))
        .add(middle_ndvi_mask.multiply(imgFVC))
        .add(higher_ndvi_mask.multiply(1))
    )
    return imgFVC.rename("FVC").toFloat()

In [None]:
S2_FVC = {}

for k, v in S2_NDVI.items():
    S2_FVC.setdefault(k.replace('NDVI', 'FVC'), getFVC(v))
for k, v in S2_FVC.items():
    S2_NDVI.setdefault(k, v.where(v.lte(0), 0).where(v.gt(1), 1))

In [None]:
ESA = ee.ImageCollection("ESA/WorldCover/v100").first().clip(roi)

In [None]:
def SLR_img(FVC):
    SLR10s = "0.44468*E**(-3.20096*0.9)-0.04099*E**(FVC-FVC*0.9)+0.25"
    SLR20s = "1/(1.17647+0.86242*1.05905**(FVC*100))"
    SLR30s = "1/(1.25+0.78845*1.05968**(FVC*100))"
    SLR10 = FVC.expression(expression=SLR10s, opt_map={
                           'E': ee.Image(ee.Number(math.e)), 'FVC': FVC})
    SLR20 = FVC.expression(expression=SLR20s, opt_map={'FVC': FVC})
    SLR30 = FVC.expression(expression=SLR30s, opt_map={'FVC': FVC})

    SLR = (
        ESA.where(ESA.eq(10), SLR10).where(
            ESA.eq(20), SLR20).where(ESA.eq(30), SLR30)
        .where(ESA.eq(40), 1).where(ESA.eq(50), 0.01).where(ESA.eq(60), 1)
        .where(ESA.eq(70), 0).where(ESA.eq(80), 0).where(ESA.eq(90), 0)
        .where(ESA.eq(95), 0) .where(ESA.eq(100), 0)
    )

    return SLR.toFloat()

In [None]:
halfMoon_SLR_dict = {}
for k, v in S2_FVC.items():
    SLR_value = SLR_img(v)
    SLR_key = k.replace("FVC", "SLR")
    halfMoon_SLR_dict.setdefault(SLR_key, SLR_value)

In [None]:
WR_list = list(halfMoon_WR_dict.values())
SLR_list = list(halfMoon_SLR_dict.values())

In [None]:
halfMoon_B_list = [WR_list[i].multiply(
    SLR_list[i]).rename("B") for i in range(24)]
year_B_mean = ee.ImageCollection(halfMoon_B_list).sum()