In [None]:
import logging

import fsspec
import requests
import aiohttp
import asyncio

from getpass import getpass
from requests.auth import HTTPBasicAuth
from earthdata import Auth

jar = aiohttp.CookieJar(unsafe=True)
auth = Auth().login(strategy="interactive")

username = auth._credentials[0]
password = auth._credentials[1]

aio_auth = aiohttp.BasicAuth(login=username, password=password, encoding='utf-8')
requests_auth = HTTPBasicAuth(username, password)

In [None]:
DAAC_CD_URLS = {
    "NSIDC":  ["https://data.nsidc.earthdatacloud.nasa.gov/nsidc-cumulus-prod-protected/ATLAS/ATL08/005/2018/10/14/ATL08_20181014001049_02350102_005_01.h5",
               "https://n5eil01u.ecs.nsidc.org/DP7/ATLAS/ATL06.005/2018.10.14/ATL06_20181014051246_02380105_005_01.h5"],
    "PODAAC": ["https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/ECCO_L4_BOLUS_LLC0090GRID_MONTHLY_V4R4/OCEAN_BOLUS_VELOCITY_mon_mean_1992-03_ECCO_V4r4_native_llc0090.nc"],
    "ASF":    ["https://datapool.asf.alaska.edu/L2.2/A3/ALPSRP279121600-P2.2_UA.zip",
               "https://grfn.asf.alaska.edu/door/download/S1-GUNW-D-R-087-tops-20220607_20210606-161653-20312N_17970N-PP-22ed-v2_0_4.nc"]
}

In [None]:
# aiohttp session
aio_session = aiohttp.ClientSession(raise_for_status=True,
                                    trust_env=True)

# This is to debug OAuth using aiohttp, will return a signed url if it's a cloud-based collection
async def get_signed_url_aiohttp(granule, method):
    headers = {"Range": "bytes=0-100"}
    async with aio_session.request(method,
                                   granule,
                                   headers=headers,
                                   auth=aio_auth,
                                   allow_redirects=False) as session_resp:
        if session_resp.status > 300 and session_resp.status < 400:
            # This is the initial redirect
            redirect_url = session_resp.headers["Location"]
            print("Redirecting... ", session_resp.headers["Location"])
            print("---"*20)
            try:
                async with aio_session.request(method,
                                               redirect_url,
                                               headers = headers,
                                               auth = aio_auth,
                                               allow_redirects=False) as redirect_response:
                    # This is the answer from EDL
                    print("Redirect response headers: ", redirect_response.headers)
                    print("Cookies after first redirect: ", aio_session.cookie_jar._cookies)
                    print("---"*20)
                    if redirect_response.status > 300 and redirect_response.status < 400:
                        next_hop = redirect_response.headers["Location"]
                        try:
                            async with aio_session.request(method,
                                                           next_hop,
                                                           auth = aio_auth,
                                                           headers=headers,
                                                           allow_redirects=False) as next_hop_resp:
                                print("Next hop response headers: ", next_hop_resp.headers)
                                print("Cookies after first redirect: ", aio_session.cookie_jar._cookies)
                                print("---"*20)
                                if next_hop_resp.status > 300 and next_hop_resp.status < 400:
                                    final_hop = next_hop_resp.headers["Location"]
                                    print("Next hop redirect: ", final_hop)
                                    if final_hop.startswith("/"):
                                        final_hop = granule
                                    try:
                                        async with aio_session.request(method,
                                                                       final_hop,
                                                                       auth = aio_auth,
                                                                       headers=headers,
                                                                       allow_redirects=True) as last_hop_resp:
                                            print("Last hop reponse: ", last_hop_resp.text)
                                            return last_hop_resp.url
                                    except Exception as e:
                                        print(e.request_info)
                                        return final_hop
                                if next_hop_resp.status > 200 and next_hop_resp.status < 300:
                                    print(next_hop_resp.text)
                                    return final_hop
                                if next_hop_resp.status > 400:
                                    print ("Ooops: ", next_hop_resp)
                                return next_hop
                        except Exception as e:
                            print(e)
                            return next_hop
                    if redirect_response.status>200 and redirect_response.status < 300:
                        return redirect_url
                    else:
                        return redirect_url
            except Exception as e:
                print(e.request_info)
                return redirect_url
        if session_resp.status > 200 and session_resp.status < 300:
            print("No redirect ")
            return session_resp.url

In [None]:

signed_url_aiohttp = await get_signed_url_aiohttp(DAAC_CD_URLS["NSIDC"][1], "GET")
# funny: HEAD requests bypass OAuth and gives us the location on S3 but an invalid signature.
signed_url_aiohttp

In [None]:
# This is what edlfs should use eventually
async def async_range_read(granule, bytes_offset):
    headers = {"Range": f"bytes={bytes_offset}"}
    # aio_session has the EDL session cookie so we can reuse it and even better copy it to Dask workers
    async with aio_session.get(granule,
                               headers=headers,
                               allow_redirects=True) as resp:
        buffer = b""
        async for data in resp.content.iter_chunked(1024):
            buffer += data
        return buffer

data = await async_range_read(DAAC_CD_URLS["ASF"][1], "0-100")
data

In [None]:
req_session = requests.Session()

# earthdata session (requests with auth redirect)
earthdata_session = auth.get_session()

headers = {"Range": "bytes=0-100"}

def get_signed_url_requests_simple(granule):
    with req_session.head(granule, auth = requests_auth, allow_redirects=False, headers=headers) as r:
        if r.status_code>300 and r.status_code < 400:
            with req_session.head(r.headers["Location"], auth=req_auth) as redirect_r:
                print(redirect_r)
                return redirect_r
            
def get_signed_url_requests_earthdata(granule):
    with earthdata_session.get(granule, allow_redirects=False, headers=headers) as r:
        if r.status_code>300 and r.status_code < 400:
            with earthdata_session.head(r.headers["Location"]) as redirect_r:
                print(redirect_r)
                return redirect_r
        return r

async def get_afs():
    fs = fsspec.filesystem("https",
                           asynchronous=True)
    session = await fs.set_session()  
    return fs

In [None]:
headers = {"Range": "bytes=0-16"}
# If we don't have a .netrc this results in ACCESS DENIED because requests drops the auth for OAuth reqs
with req_session.get(DAAC_CD_URLS["ASF"][0], auth=requests_auth, headers=headers, allow_redirects=True) as r:
    print(r.text, r)

In [None]:
# This works just fine, because earthdata session allows the Auth headers to persist the OAuth redirect
headers = {"Range": "bytes=0-16"}
with earthdata_session.get(DAAC_CD_URLS["NSIDC"][0], headers=headers) as r:
    signed_url = r.request.url
    print(r.text)

In [None]:
headers = {"Range": "bytes=0-100"}
# Once we have the signed url we don't need anything else, the auth is in the URL itself so we can use plain requests 
with requests.get(signed_url, allow_redirects=True, headers=headers) as r:
    print(r.text)

In [None]:
# Now with fsspec sync, the default way of doing things with xarray
sync_fs = fsspec.filesystem("https",
                            asynchronous=False)
sync_fs

In [None]:
# This works, we get the file and it's really fast for one file
sync_fs.get(signed_url, "test-2.nc")

In [None]:
%%time
# Open is not as fast, xarray issues a ton of requests
import xarray as xr
file_like_object = sync_fs.open(signed_url)
logging.basicConfig(level=logging.DEBUG)
# logging.basicConfig(level=logging.DEBUG)
ds = xr.open_dataset(file_like_object, engine="h5netcdf", chunks="auto", cache=False)  
ds

In [None]:
# Let's try async
async_fs = await get_afs()
file_like_object_async = await async_fs.open_async(signed_url)
file_like_object_async

In [None]:
# this also works, not faster than the sync approach but some benchmarking is needed
await async_fs._get(signed_url, "test-async.nc")

In [None]:
%%time

logging.basicConfig(level=logging.DEBUG)

import xarray as xr
# it issues like dozens of requests, tried with chunks=-1 and the same thing happens
file_like_object = sync_fs.open(signed_url)
logging.basicConfig(level=logging.DEBUG)
# logging.basicConfig(level=logging.DEBUG)
ds = xr.open_dataset(file_like_object, engine="h5netcdf", chunks=-1, cache=False)  
ds

In [None]:
file_async = await async_fs.open_async(signed_url)

print(type(file_async))

# Does not work out of the box with fsspec async! 
ds = xr.open_dataset(file_async, engine="h5netcdf")
ds