In [None]:

# !pip install geemap[all] --quiet

import ee, geemap, os, json, math
from datetime import datetime

ee.Authenticate()
ee.Initialize(project='users project_name')

In [None]:
# use your own region of interest
REGION_ASSET = 'users path for assests of study area in earth engine'
region_fc = ee.FeatureCollection(REGION_ASSET)
REGION = region_fc.geometry()


SCALE_NDVI = 250
SCALE_LST  = 1000
SCALE_RAIN = 5000


print("Region_loaded", REGION_ASSET)

Loaded region from asset: projects/debayan2002/assets/east_godavari


In [15]:
DATE_START = '2001-01-01'
DATE_END   = '2025-01-01'

# Baseline doy for anomaly detect
BASELINE_START = '2001-01-01'
BASELINE_END   = '2011-01-01'
# Output
OUT_DIR = 'outputs'
os.makedirs(OUT_DIR, exist_ok=True)

# Flags
DO_ANOMALY      = True
#DO_SPLIT_GIF    = True
DO_EXPORT_CSV   = True
DO_LAG          = True
LAG_MONTHS      = 1

In [None]:
# DATASETS
MODIS_NDVI = ee.ImageCollection('MODIS/061/MOD13Q1').select(['NDVI','SummaryQA'])
CHIRPS     = ee.ImageCollection('UCSB-CHG/CHIRPS/DAILY').select('precipitation')
MODIS_LST  = ee.ImageCollection('MODIS/061/MOD11A2').select('LST_Day_1km')

def ndvi_clean(i):
    mask = i.select('SummaryQA').lte(1)
    return (i.updateMask(mask)
              .select('NDVI')
              .multiply(0.0001)  # scale factor
              .copyProperties(i, ['system:time_start']))

def lst_scale(i):
    return (i.select('LST_Day_1km')
              .multiply(0.02)     # to Kelvin
              .copyProperties(i, ['system:time_start']))

NDVI = MODIS_NDVI.map(ndvi_clean)
LST  = MODIS_LST.map(lst_scale)
RAIN = CHIRPS

print("Collections:", NDVI.size().getInfo(), LST.size().getInfo(), RAIN.size().getInfo())


Collections ready: 586 1172 16283


In [17]:
#iterating and filtering each month to calculate and scrape the mean for ndvi and sum for precipitation value
def monthly_reduce(ic, reducer, start=DATE_START, end=DATE_END):
    start = ee.Date(start); end = ee.Date(end)
    n = end.difference(start, 'month').subtract(1)
    months = ee.List.sequence(0, n)

    def per_m(m):
        m = ee.Number(m)
        d0 = start.advance(m, 'month')
        d1 = d0.advance(1, 'month')
        img = ic.filterDate(d0, d1).reduce(reducer)
        img = ee.Image(img).set({
            'system:time_start': d0.millis(),
            'year': d0.get('year'),
            'month': d0.get('month')
        })
        return img
    return ee.ImageCollection(months.map(per_m))

NDVI_M = monthly_reduce(NDVI, ee.Reducer.mean())
RAIN_M = monthly_reduce(RAIN, ee.Reducer.sum())
LST_M  = monthly_reduce(LST,  ee.Reducer.mean())

print('Monthly collections:', NDVI_M.size().getInfo(), RAIN_M.size().getInfo(), LST_M.size().getInfo())


Monthly collections: 288 288 288


In [18]:
#anomaly calculation
if DO_ANOMALY:
    NDVI_BASE = NDVI_M.filterDate(BASELINE_START, BASELINE_END).mean()
    RAIN_BASE = RAIN_M.filterDate(BASELINE_START, BASELINE_END).mean()
    LST_BASE  = LST_M .filterDate(BASELINE_START, BASELINE_END).mean()

    def add_anom(ic, base, name):
        return ic.map(lambda i: (ee.Image(i).subtract(base)
                                    .rename(name)
                                    .copyProperties(i, i.propertyNames())))
    NDVI_ANOM = add_anom(NDVI_M, NDVI_BASE, 'NDVI_anom')
    RAIN_ANOM = add_anom(RAIN_M, RAIN_BASE, 'RAIN_anom')
    LST_ANOM  = add_anom(LST_M , LST_BASE , 'LST_anom')
    print("Anomaly series created.")
else:
    NDVI_ANOM = ee.ImageCollection([])
    RAIN_ANOM = ee.ImageCollection([])
    LST_ANOM  = ee.ImageCollection([])
    print("Anomaly series skipped.")


Anomaly series created.


In [19]:
#visual_parameters
NDVI_VIZ = {'min': 0, 'max': 1, 'palette': ['#8B4513', '#9A5120', '#A9602D', '#B8703A', '#C58047', '#D49055', '#E3A063', '#E9B081', '#EEC09F', '#F3D0BD', '#F8E0DB', '#FCF0F9', '#FCF3E1', '#FDF6C8', '#FDF9AF', '#FEFC96', '#F7F580', '#E7E86C', '#D7DC58', '#C7CF45', '#B7C332', '#A7B71F', '#97AA0D', '#889E0A', '#789209', '#698607', '#5A7A06', '#4B6E05', '#3C6203', '#2D5602', '#1F4A01', '#154000', '#133800', '#123000', '#112800', '#102000', '#101800', '#101000', '#101000', '#122200']}
RAIN_VIZ = {'min': 10,   'max': 450, 'palette': [ "#FFFFFF", "#EAF7FF", "#D5EEFF", "#C0E6FF", "#ABE0FF","#96D9FF", "#81D2FF", "#6CCBFF", "#57C4FF", "#42BDFF","#2DB6FF", "#18AFFF", "#009FF7", "#008FE0", "#007FC9","#006FB2", "#005F9B", "#004F84", "#003F6D", "#002F56","#001F3F"]}
LST_VIZ  = {'min': 274, 'max': 325, 'palette': [    "#FFFFCC", "#FFF5B3", "#FFE999", "#FFDD80", "#FFD166","#FFC34D", "#FFB733", "#FFA91A", "#FF9C00", "#FF8000","#FF6600", "#FF4C00", "#FF3300", "#E02600", "#C21A00","#A40D00", "#860000", "#660066", "#4B004B", "#330033"]}

ANOM_VIZ_NDVI = {'min': -0.3, 'max': 0.3, 'palette': ['green','yellow','red']}
ANOM_VIZ_RAIN = {'min': -200, 'max': 200, 'palette': ['brown','white','blue']}
ANOM_VIZ_LST  = {'min': -5,   'max':  5,  'palette': ['blue','yellow','red']}

#separa
def to_rgb(ic, vis):
    return ic.map(lambda i:ee.Image(i).visualize(**vis).clip(REGION).copyProperties(i, ['system:time_start']))


NDVI_RGB = to_rgb(NDVI_M, NDVI_VIZ)
RAIN_RGB = to_rgb(RAIN_M, RAIN_VIZ)
LST_RGB  = to_rgb(LST_M,  LST_VIZ)

if DO_ANOMALY:
    NDVI_ANOM_RGB = to_rgb(NDVI_ANOM, ANOM_VIZ_NDVI)
    RAIN_ANOM_RGB = to_rgb(RAIN_ANOM, ANOM_VIZ_RAIN)
    LST_ANOM_RGB  = to_rgb(LST_ANOM , ANOM_VIZ_LST)


In [20]:
#creating timestams separately ..... matched each doy properties of imagecollection in background, will later stamped with output_gif
def labels_from_ic(ic, fmt='YYYY-MMM'):
    size = ic.size()
    n = int(ee.Number(size).getInfo() or 0)
    if n == 0:
        return []  # nothing to label

    lst = ic.toList(size)
    def _fmt(img):
        img = ee.Image(img)
        ts = img.get('system:time_start')
        ts_safe = ee.Algorithms.If(ts, ts, img.date().millis())
        return ee.Date(ts_safe).format(fmt)
    return ee.List(lst.map(_fmt)).getInfo()



#font fills and parameters
def overlay_labels_on_gif(in_path, out_path, labels, xy=('3%','8%'), font_size=28,
                          fill=(255,255,255), stroke=(0,0,0), stroke_width=2):
    try:
        from PIL import Image, ImageDraw, ImageFont
    except ImportError:
        import sys, subprocess
        subprocess.check_call([sys.executable, "-m", "pip", "install", "pillow"])
        from PIL import Image, ImageDraw, ImageFont

    # selecting font type
    try:
        font = ImageFont.truetype("DejaVuSans.ttf", font_size)
    except Exception:
        try:
            font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", font_size)
        except Exception:
            font = ImageFont.load_default()

    im = Image.open(in_path)
    n_frames = getattr(im, "n_frames", 1)

    # setting the frames per second of the timestamp separately so it matches the gif image collection

    if len(labels) < n_frames:
        labels = labels + [labels[-1]]*(n_frames - len(labels))
    elif len(labels) > n_frames:
        labels = labels[:n_frames]

    frames = []
    for i in range(n_frames):
        im.seek(i)
        frame = im.convert("RGBA")
        draw = ImageDraw.Draw(frame)

        W, H = frame.size
        def p(v, tot):
            return int(float(v.strip('%'))/100.0 * tot) if isinstance(v,str) and v.endswith('%') else int(v)
        x, y = p(xy[0], W), p(xy[1], H)

        draw.text((x, y), str(labels[i]), font=font, fill=fill,
                  stroke_width=stroke_width, stroke_fill=stroke)

        frames.append(frame.convert("P", palette=Image.ADAPTIVE))

    duration = im.info.get("duration", 150)
    loop = im.info.get("loop", 0)
    frames[0].save(out_path, save_all=True, append_images=frames[1:],
                   duration=duration, loop=loop, disposal=2, optimize=False)
    return out_path


In [None]:
# TIMELAPSES (GIF)

import os

REGION_BOUNDS = REGION.simplify(1000).bounds(1000)
# 2) Use dimensions only, scale may cause user a memory limit error
gif_strctr_para = {
    'region': REGION_BOUNDS,
    'dimensions': 200,
    'framesPerSecond': 5
}

ndvi_gif = os.path.join(OUT_DIR, 'ndvi_monthly.gif')
rain_gif = os.path.join(OUT_DIR, 'rain_monthly.gif')
lst_gif  = os.path.join(OUT_DIR, 'lst_monthly.gif')

# Compat wrapper for different geemap signatures
def _download_ic(ic, out_path, args):
    try:
        geemap.download_ee_video(ic, args, out_path)
    except TypeError:
        geemap.download_ee_video(ic, out_path, args)


_download_ic(NDVI_RGB, ndvi_gif, gif_strctr_para)
_download_ic(RAIN_RGB, rain_gif, gif_strctr_para)
_download_ic(LST_RGB,  lst_gif,  gif_strctr_para)

if DO_ANOMALY:
    ndvi_anom_gif = os.path.join(OUT_DIR, 'ndvi_anomaly.gif')
    rain_anom_gif = os.path.join(OUT_DIR, 'rain_anomaly.gif')
    lst_anom_gif  = os.path.join(OUT_DIR, 'lst_anomaly.gif')
    _download_ic(NDVI_ANOM_RGB, ndvi_anom_gif, gif_strctr_para)
    _download_ic(RAIN_ANOM_RGB, rain_anom_gif, gif_strctr_para)
    _download_ic(LST_ANOM_RGB,  lst_anom_gif,  gif_strctr_para)

In [None]:
import os

def safe_stamp(ic_rgb, ic_src, gif_path, label_fmt='YYYY-MMM'):
    if not os.path.exists(gif_path):
        print("Skip stamp (file missing):", gif_path)
        return
    labels = labels_from_ic(ic_rgb, fmt=label_fmt)
    if not labels:
        print("No labels from RGB IC; falling back to source IC for:", gif_path)
        labels = labels_from_ic(ic_src, fmt=label_fmt)
    if not labels:
        print("Skip stamp (no frames/labels):", gif_path)
        return
    overlay_labels_on_gif(
        gif_path, gif_path, labels,
        xy=('3%','8%'), font_size=32,
        fill=(255,255,255), stroke=(0,0,0), stroke_width=2
    )
    print("Stamped:", gif_path)

# primary gifs stamping
safe_stamp(NDVI_RGB, NDVI_M, ndvi_gif)
safe_stamp(RAIN_RGB, RAIN_M, rain_gif)
safe_stamp(LST_RGB,  LST_M,  lst_gif)

# anomaly gifs stamping
if DO_ANOMALY:
    safe_stamp(NDVI_ANOM_RGB, NDVI_ANOM, ndvi_anom_gif)
    safe_stamp(RAIN_ANOM_RGB, RAIN_ANOM, rain_anom_gif)
    safe_stamp(LST_ANOM_RGB,  LST_ANOM,  lst_anom_gif)


In [None]:
# defininng csv parameters

def regional_series(ic, band, geom=REGION, scale=1000):
    def to_feat(i):
        date = ee.Date(i.get('system:time_start')).format('YYYY-MM')
        mean = ee.Image(i).reduceRegion(ee.Reducer.mean(), geom, scale).get(band)
        return ee.Feature(None, {'date': date, band: mean})
    return ee.FeatureCollection(ic.map(to_feat))

if DO_EXPORT_CSV:
    ts_ndvi = regional_series(NDVI_M, 'NDVI_mean', scale=SCALE_NDVI)
    ts_rain = regional_series(RAIN_M, 'precipitation_sum', scale=SCALE_RAIN)
    ts_lst  = regional_series(LST_M , 'LST_Day_1km_mean', scale=SCALE_LST)

    # Exporting the csv of each months of every year to Drive
    ee.batch.Export.table.toDrive(
        collection=ts_ndvi, description='EG_NDVI_monthly',folder='gif_csv', fileFormat='CSV'
    ).start()

    ee.batch.Export.table.toDrive(
        collection=ts_rain, description='EG_RAIN_monthly',folder='gif_csv', fileFormat='CSV'
    ).start()

    ee.batch.Export.table.toDrive(
        collection=ts_lst , description='EG_LST_monthly',folder='gif_csv', fileFormat='CSV'
    ).start()

    print("Drive CSV exports started (NDVI, Rain, LST).")

In [None]:
# lag check - comparing 1 month rainfall with its next month showing if rain affecting ndvi values

#shifting dates by 1 month
def shift_dates(fc, months):
    def shift_feat(f):
        d = ee.Date.parse('YYYY-MM', f.get('date'))
        d2 = d.advance(months, 'month')
        return f.set('date_shifted', d2.format('YYYY-MM'))
    return fc.map(shift_feat)

if DO_LAG and DO_EXPORT_CSV:
    # Build collections again (reuse from above to avoid client-side getInfo)
    ts_ndvi = regional_series(NDVI_M, 'NDVI_mean', scale=SCALE_NDVI)
    ts_rain = regional_series(RAIN_M, 'precipitation_sum', scale=SCALE_RAIN)
    ts_rain_shift = shift_dates(ts_rain, LAG_MONTHS)

#joining rain with shifted month of ndvi
    filter_eq = ee.Filter.equals(leftField='date_shifted', rightField='date')
    joined = ee.Join.inner().apply(ts_rain_shift, ts_ndvi, filter_eq)

#flattening the table structure so it's align with rainfall and 1 month lag ndvi properly
    def merge_props(j):
        left = ee.Feature(j.get('primary'))
        right = ee.Feature(j.get('secondary'))
        return ee.Feature(None, {
            'date': right.get('date'),
            'rain_mm': left.get('precipitation_sum'),
            'ndvi': right.get('NDVI_mean')
        })
    lag_fc = ee.FeatureCollection(joined.map(merge_props))

    # Export joined table
    ee.batch.Export.table.toDrive(
        collection=lag_fc, description=f'EG_RAIN_vs_NDVI_lag{LAG_MONTHS}', fileFormat='CSV'
    ).start()
    print(f"Drive CSV export started for lag={LAG_MONTHS} months (RAIN vs NDVI).")
else:
    print("Lag analysis skipped or CSV export disabled.")