In [5]:
import os
import shutil
import tempfile
import time
import netrc
import requests

import numpy as np
import pandas as pd
import xarray as xr

from pathlib import Path
from datetime import datetime
from urllib.parse import urlparse
from IPython.display import clear_output

In [6]:
LINKS_FILE = "./data/IMERGDP links.txt"
PROCESSED_DIR = Path("./data/processed/IMERGDP/Granules")
RAW_DIR = Path("./data/raw/IMERGDP")


if RAW_DIR.exists():
    for f in RAW_DIR.iterdir():
        if f.is_file():
            f.unlink()
        elif f.is_dir():
            shutil.rmtree(f)

In [7]:
def process_imerg_file(file_path):
    """
    Process a single IMERG file and extract precipitation statistics for the area of interest.
    
    Args:
        file_path (str): Path to the IMERG NC4 file
        
    Returns:
        None: Saves the results to a CSV file in the processed directory
    """
    # Open the dataset
    ds = xr.open_dataset(file_path)
    
    # Define the bounding box coordinates
    lat_min, lat_max = 30.34000, 30.56000
    lon_min, lon_max = -91.28000, -91.02000
    
    # Subset the dataset to the bounding box
    ds_subset = ds.sel(lat=slice(lat_min, lat_max), 
                      lon=slice(lon_min, lon_max))
    
    # Calculate mean precipitation over the spatial subset
    mean_precip = float(ds_subset['precipitation'].mean(dim=['lat', 'lon']))
    
    # Count valid precipitation values (not NaN and not -9999)
    valid_count = int(((ds_subset['precipitation'] != -9999) & 
                      (~np.isnan(ds_subset['precipitation']))).sum())
    
    # Get the date from the dataset attributes
    date_str = ds_subset.attrs.get('BeginDate') or ds_subset.attrs.get('EndDate')
    if date_str:
        date = datetime.strptime(date_str, '%Y-%m-%d')
        formatted_date = date.strftime('%Y-%m-%d')
    else:
        # If date not in attributes, try to get it from time coordinate
        if 'time' in ds_subset.coords:
            date = ds_subset.time.values[0]
            formatted_date = pd.Timestamp(date).strftime('%Y-%m-%d')
        else:
            formatted_date = "Date not found"
    
    # Create DataFrame
    df = pd.DataFrame({
        'date': [date],
        'precipitation': [mean_precip],
        'valid_count': [valid_count]
    })
    
    # Create output filename based on input filename
    filename = os.path.basename(file_path)
    output_filename = os.path.splitext(filename)[0] + '.csv'
    output_path = os.path.join(PROCESSED_DIR, output_filename)
    
    # Save to CSV
    df.to_csv(output_path, index=False)
    
    # Close the dataset
    ds.close()

In [8]:
import os, tempfile, time, netrc, requests
from urllib.parse import urlparse
from pathlib import Path
from IPython.display import clear_output

# --- your process_imerg_file(...) goes here ---

def _urs_auth_from_netrc():
    """Read Earthdata credentials from ~/.netrc"""
    try:
        n = netrc.netrc()
        host = "urs.earthdata.nasa.gov"
        auth = n.authenticators(host)
        if auth and auth[0] and auth[2]:
            from requests.auth import HTTPBasicAuth
            return HTTPBasicAuth(auth[0], auth[2])
    except FileNotFoundError:
        pass
    return None

def make_header(already, processed_in_run, total, overall):
    return (f"Processed before start: {already} | "
            f"Processed this run: {processed_in_run} | "
            f"Total processed: {overall} / Required total: {total}")

def _render_status(header_text, body_lines):
    clear_output(wait=True)
    print(header_text)
    print("-" * len(header_text))
    for line in body_lines:
        print(line)

def _download_with_requests(url: str, out_path: Path, auth, get_header):
    out_tmp = out_path.with_suffix(out_path.suffix + ".part")
    with requests.Session() as s:
        s.auth = auth
        with s.get(url, stream=True, timeout=60, allow_redirects=True) as r:
            r.raise_for_status()
            total = int(r.headers.get("content-length", 0))
            got = 0
            chunk = 1024 * 256
            start = time.time()
            with open(out_tmp, "wb") as f:
                for b in r.iter_content(chunk_size=chunk):
                    if not b:
                        continue
                    f.write(b)
                    got += len(b)
                    pct = (got / total * 100) if total else 0
                    speed = got / max(1e-6, (time.time() - start))
                    body = [
                        f"DOWNLOADING {out_path.name}",
                        (f"Progress: {pct:6.2f}%  ({got:,}/{total:,} bytes)  |  "
                         f"{speed/1e6:,.2f} MB/s") if total else f"Progress: {got:,} bytes",
                        "PROCESSING: pending",
                        "NEXT FILE: pending",
                    ]
                    _render_status(get_header(), body)
    out_tmp.replace(out_path)

def download_and_process_imerg():
    PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
    RAW_DIR.mkdir(parents=True, exist_ok=True)

    processed_set = {p.stem for p in PROCESSED_DIR.glob("*.csv")}
    with open(LINKS_FILE, "r") as f:
        urls = [ln.strip() for ln in f if ln.strip() and not ln.strip().startswith("#")]

    already = len(processed_set)
    total_required = len(urls)
    processed_in_run = 0

    def get_header():
        overall = len(processed_set)
        return make_header(already, processed_in_run, total_required, overall)

    # initial header
    _render_status(get_header(), ["Waiting…"])

    auth = _urs_auth_from_netrc()

    with tempfile.NamedTemporaryFile(prefix="earthdata_cookies_", delete=False) as cj:
        cookiejar = Path(cj.name)
    try:
        for url in urls:
            filename = Path(urlparse(url).path).name
            basename = Path(filename).stem

            if basename in processed_set:
                _render_status(get_header(), [
                    f"SKIPPING {basename} (already processed)",
                    "DOWNLOADING: skipped",
                    "PROCESSING: skipped",
                    "DONE",
                    "NEXT FILE",
                ])
                time.sleep(0.01)
                continue

            raw_path = RAW_DIR / filename

            # Download with live progress
            try:
                _download_with_requests(url, raw_path, auth, get_header)
            except Exception as e:
                _render_status(get_header(), [
                    f"DOWNLOADING {filename}: ERROR -> {e}",
                    "PROCESSING: skipped",
                    "NEXT FILE",
                ])
                time.sleep(0.1)
                continue

            # Process
            _render_status(get_header(), [
                f"DOWNLOADING {filename}: 100.00% (saved)",
                f"PROCESSING {filename} …",
                "NEXT FILE: pending",
            ])
            try:
                process_imerg_file(str(raw_path))
                raw_path.unlink(missing_ok=True)
                processed_set.add(basename)
                processed_in_run += 1
                _render_status(get_header(), [
                    f"DOWNLOADING {filename}: 100.00% (saved)",
                    f"PROCESSING {filename}: done",
                    "DONE",
                    "NEXT FILE",
                ])
            except Exception as e:
                raw_path.unlink(missing_ok=True)
                _render_status(get_header(), [
                    f"DOWNLOADING {filename}: 100.00% (saved)",
                    f"PROCESSING {filename}: ERROR -> {e}",
                    "NEXT FILE",
                ])
            time.sleep(0.1)

        _render_status(get_header(), ["All files processed!"])
    finally:
        cookiejar.unlink(missing_ok=True)

# run
download_and_process_imerg()

Processed before start: 1096 | Processed this run: 0 | Total processed: 1096 / Required total: 1096
---------------------------------------------------------------------------------------------------
All files processed!
