# Download IMERG-Final data

In [None]:
import subprocess
import dask
from dask.distributed import Client, Semaphore
import tempfile
from pathlib import Path
import configparser
import pandas as pd
import os
import xarray as xr
import rioxarray as rxr
import requests

In [None]:
client = Client(n_workers=os.cpu_count())
client

In [None]:
secrets = configparser.ConfigParser()
secrets.read('../../secrets/secrets.ini')

In [None]:
start_date = '2017-01-01'
end_date = '2021-09-30'
imerg_final_dir = Path("../../data-precip-analysis/imerg_final")
download_dir = imerg_final_dir / "ss_method_zip"
output_dir = imerg_final_dir / "ss_method"

download_dir.mkdir(exist_ok=True)
output_dir.mkdir(exist_ok=True)

sem = Semaphore(max_leases=8)

def download_imerg_final(date, sem):
    date = pd.to_datetime(date)
    jan_1 = pd.to_datetime(f"{date.year}-01-01")
    imerg_final_id = ((date-jan_1) * 30).days

    if imerg_final_id <1000:
        if imerg_final_id <30:
            imerg_final_id = f'000{imerg_final_id}'
        elif imerg_final_id <100:
            imerg_final_id = f'00{imerg_final_id}'
        else:
            imerg_final_id = f'0{imerg_final_id}'

    link = f"https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/{date.strftime('%Y')}/{date.strftime('%m')}/{date.strftime('%d')}/gis/3B-DAY-GIS.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S000000-E235959.{imerg_final_id}.V06B.zip"
    fp_zip = (download_dir / date.strftime('%Y-%m-%d')).with_suffix('.zip')
    fp_dst = (output_dir / date.strftime('%Y-%m-%d')).with_suffix('.tif')

    cmd = [
        "wget",
        "-O",
        fp_zip,
        "--user",
        f"{secrets['imerg']['username']}",
        "--password",
        f"{secrets['imerg']['pwd']}",
        link,
        "--no-proxy"
    ]

    result = None
    if not fp_zip.exists():
        with sem:
            response = requests.head(link,auth=(secrets["imerg"]["username"], secrets["imerg"]["pwd"]))
            if response.status_code == 200:
                result = subprocess.run(cmd, capture_output=True)

        with tempfile.TemporaryDirectory(dir=download_dir) as fp_temp:
            # unzip
            cmd = [
                "unzip",
                f'{fp_zip}',
                "-d",
                f'{fp_temp}'
            ]
            result2 = subprocess.run(cmd, capture_output=True)

            # convert to cog
            fp_temp_file = Path(fp_temp) / f"3B-DAY-GIS.MS.MRG.3IMERG.{date.strftime('%Y%m%d')}-S000000-E235959.{imerg_final_id}.V06B.total.accum.tif"
            cmd = [
                "gdal_translate",
                "-of",
                "COG",
                f"{fp_temp_file}",
                f"{fp_temp_file}_cog.tif"
            ]
            result3 = subprocess.run(cmd, capture_output=True)
            print(result3)
            # mv
            cmd = [
                "mv",
                f'{fp_temp_file}_cog.tif',
                f'{fp_dst}'
            ]
            result4 = subprocess.run(cmd, capture_output=True)
                        
            return result, result2.returncode, result3.returncode, result4.returncode
    return None, None, None, None

# parallel
futures = []
for date in pd.date_range(start_date, end_date):
    future = dask.delayed(download_imerg_final)(date, sem)
    futures.append(future)

results = dask.compute(*futures)
results

In [None]:
@dask.delayed
def convert_to_da(fn, dst_dir, interp_lat_lon=None):
    da = rxr.open_rasterio(fn, masked=True, chunks=dict(x=500, y=500)).squeeze().drop('band').astype('float64')
    da.name = 'precip'
    time = pd.to_datetime(fn.stem)

    da = da.expand_dims('time')
    da = da.rename({'x': 'lon', 'y': 'lat'})
    da = da.assign_coords(
        time = ('time', [time]),
        lon = ('lon', da.lon.values.round(5)),
        lat = ('lat', da.lat.values.round(5))
    )

    if interp_lat_lon is not None:
        da = da.interp(lat=interp_lat_lon['lat'], lon=interp_lat_lon['lon'], method='nearest')

    da.attrs['units'] = 'mm'
    da.attrs['long_name'] = 'Precipitation'
    da.attrs['standard_name'] = 'precipitation'
    da.attrs['description'] = 'Accumulated precipitation estimated by IMERG Final Run'
    
    dst_fp = dst_dir / fn.name.replace('.tif', '.nc')

    da.to_netcdf(dst_fp)

In [None]:
existing_ds_fn = Path("../../data-cumberland/tennessee/basins/cumberland/pre_processing/nc/combined_data.nc")
existing_ds = xr.open_dataset(existing_ds_fn)
existing_ds

In [None]:
tif_dir = Path(f"../../data-precip-analysis/imerg_final/ss_method")
dst_dir = Path(f"../../data-precip-analysis/imerg_final/ss_method_nc")

tasks = [convert_to_da(fp, dst_dir, interp_lat_lon=dict(lat=existing_ds.lat, lon=existing_ds.lon)) for fp in tif_dir.glob("*.tif")]
tasks

In [None]:
dask.compute(*tasks)