In [1]:
import copy
import numpy as np
from pystac_client import Client
from shapely.geometry import Point
from rasterio.windows import Window
from pyproj import CRS
import rioxarray
from xarray import Dataset

import a301_lib
import pystac
import pandas as pd
import xarray

in a301_lib init


Define functions to find sentinel data 

In [2]:
def get_landsat_dataset(date, lon, lat, window, bands=None):
    """
    retrieve windowed bands specified in the bands variable.
    Save the clipped geotiffs as xarray.DattArrays, returned
    in an xarray Dataset
    
    Parameters
    ----------
    
    date: str
       date in the form yyy-mm-yy
    lon: float
       longitude of point in the scene (degrees E)
    lat: 
        latitude of point in the scene (degrees N)
    window: rasterio.Window
        window for clipping the scene to a subscene
    bands: list
        list of bands in the form ['B01','B02',...]
        the default is ['B04','B05','B06']
  
    Returns
    -------
    the_dataset: xarray.Dataset
       dataset with rioxarrays of requested bands plus Fmask
    """
    if bands is None:
        bands = ['B04','B05','B06']
    #
    # set up the search -- we are looking for only 1 scene per date
    #
    the_point = Point(lon, lat)
    cmr_api_url = "https://cmr.earthdata.nasa.gov/stac/LPCLOUD"
    client = Client.open(cmr_api_url)
    
    search = client.search(
        collections=["HLSL30.v2.0"],
        intersects=the_point,
        datetime= date
    )
    items = search.get_all_items()
    print(f"found {len(items)} item")
    #
    # get the metadata and add date, cloud_cover and band_name to the new DataArrays
    #
    props = items[0].properties
    out_dict = {}
    bands.extend(['Fmask'])
    for the_band in bands:
        print(f"inside get_landsat_scene: reading {the_band}")
        href = items[0].assets[the_band].href
        print(href)
        lazy_ds = rioxarray.open_rasterio(href,mask_and_scale=True)
        #
        # now read the window
        #
        clipped_ds = lazy_ds.rio.isel_window(window)
        #
        # add some custom attributes
        #
        clipped_ds.attrs['date'] = props['datetime'] #date and time
        clipped_ds.attrs['cloud_cover'] = props['eo:cloud_cover']
        clipped_ds.attrs['band_name'] = the_band
        print(clipped_ds.attrs)
        utm_zone = clipped_ds.attrs['HORIZONTAL_CS_NAME'][-3:-1].strip()
        if lat < 0:
            is_southern=True
        else:
            is_southern=False
        clipped_ds.attrs['cartopy_epsg_code'] = find_epsg_code(utm_zone,south=is_southern)
        clipped_ds.attrs['day']=props['datetime'][:10]  #yyyy-mm-dd
        out_dict[the_band] = clipped_ds
    #
    # convert the mask to 1=no cloud over land, np.nan=otherwise
    #
    out_dict['Fmask'] = get_clear_mask(out_dict['Fmask'])
    coords = out_dict['Fmask'].coords
    attrs = out_dict['Fmask'].attrs
    dataset = Dataset(data_vars = out_dict, coords = coords, attrs = attrs )
    return dataset

def find_epsg_code(utm_zone, south=False):
    """
    https://gis.stackexchange.com/questions/365584/convert-utm-zone-into-epsg-code
    
    cartopy wants crs names as epsg codes, i.e. UTM zone 10N is EPSG:32610, 10S is EPSG:32710
    """
    crs = CRS.from_dict({'proj': 'utm', 'zone': utm_zone, 'south': south})
    epsg, code = crs.to_authority()
    cartopy_epsg_code = code
    return cartopy_epsg_code

def get_clear_mask(fmask_ds):
    """
    return a DataArray copy of fmask_ds but with the pixel values set to
    1 where there is both no cloud and pixel is land, and np.nan where there is cloud
    or the pixel is water
    
    The bit patterns from the HSL QA mask are:
     
    Bits are listed from the MSB (bit 7) to the LSB (bit 0): 
    7-6    aerosol:
           00 - climatology
           01 - low
           10 - average
           11 - high
    5      water
    4      snow/ice
    3      cloud shadow
    2      adjacent to cloud
    1      cloud
    0      cirrus cloud
    
    so a bit mask of 0b00100011 when anded with a QA value
    will return non-zero when there is water, a cloud, or a cirrus cloud

    Parameters
    ----------

    fmask_ds: xarray DataArray
       landsat or sentinel hls scene create with rioxarray.open_rasterio

    Returns
    -------

    clearmask_ds: xarray DataArray
      new array with the clear sky mask for each pixel set to 1 if clear, np.nan if water or cloud
    """
    #
    # convert float32 to unsigned 8 bit integer
    #
    bit_mask = fmask_ds.data.astype(np.uint8)
    ref_mask = np.zeros_like(bit_mask)
    #
    # don't destroy original fmask DataArray
    #
    clearmask_ds = copy.deepcopy(fmask_ds)
    #
    # work with unsigned 8 bit values instead of
    # base 10 floats
    #
    bit_mask = fmask_ds.data.astype(np.uint8)
    #
    # create a reference mask that will select
    # bits 5, 1 and 0, which we want tocheck
    #
    ref_mask = np.zeros_like(bit_mask)
    ref_mask[...] = 0b00100011  #find water (bit 5), cloud (bit 1) , cirrus (bit 0)
    #
    # if all three of those bits are 0, then bitwise_and will return 0
    # otherwise it will return either a value greater than 0
    #
    cloudy_values = np.bitwise_and(bit_mask,ref_mask)
    cloudy_values[cloudy_values>0]=1  #cloud or water
    cloudy_values[cloudy_values==0]=0 #rest of scene
    #
    # now invert this, writing np.nan where there is 
    # cloud or water.  Go back to float32 so we can use np.nan
    #
    clear_mask = cloudy_values.astype(np.float32)
    clear_mask[cloudy_values == 1]=np.nan
    clear_mask[cloudy_values == 0]=1
    clearmask_ds.data = clear_mask
    return clearmask_ds


In [3]:
ana_lon, ana_lat = -122.223611, 46.257066 #St Helens analysis region
con_lat, con_lon = 46.190630, -122.286143 #St Helens control region
ana_location = Point(ana_lon, ana_lat)
con_location = Point(con_lon, con_lat)
date_range = "2013-01-01/2024-12-31"
#
# filename to save the dataframe for future analysis
#
csv_filename = a301_lib.data_share / "ajb/landsat/StHelens_search.csv"

In [4]:
# connect to the STAC endpoint
cmr_api_url = "https://cmr.earthdata.nasa.gov/stac/LPCLOUD"
client = Client.open(cmr_api_url)

search = client.search(
    collections=["HLSL30.v2.0"],
    intersects=ana_location,
    datetime= date_range
) 

items = search.get_all_items()
print(len(items))

282


In [5]:
scene_list = []
for index, value in enumerate(items):
    props = value.properties
    the_date = pystac.utils.str_to_datetime(props['datetime'])
    scene_dict = dict(scene = index,
                      cloud_cover = props['eo:cloud_cover'],
                      datetime = the_date 
                       )
    scene_list.append(scene_dict)

scene_list[0]

{'scene': 0,
 'cloud_cover': 86,
 'datetime': datetime.datetime(2013, 4, 14, 18, 57, 28, 350000, tzinfo=tzutc())}

In [6]:
the_df = pd.DataFrame.from_records(scene_list)

def make_seasoncol(row):
    seasons = {'djf':[12,1,2],
               'mam':[3,4,5],
               'jja':[6,7,8],
               'son':[9,10,11]}
    for season,months in seasons.items():
        month = row['datetime'].month
        year = row['datetime'].year
        if month in months:
            #
            # the winter of 2013 begins in
            # december 2012.  So the year of the
            # scene and the year of the season diverge
            #
            if month == 12:
                row['season_year'] = year + 1
            else:
                row['season_year'] = year
            row['season']=season
            row['year']= year
            row['month']= month
            row['day']= row['datetime'].day
    return row

new_df = the_df.apply(make_seasoncol,axis=1)
new_df = new_df[['scene','cloud_cover','season','year','season_year','month','day']]
new_df.head()

#csv_filename = a301_lib.data_share / "ajb/sentinel/StHelens_search.csv"
new_df.to_csv(csv_filename,index=False)

In [7]:
clear_df = the_df[the_df['cloud_cover'] < 50]
len(clear_df)

116

In [8]:
season_df = new_df.groupby(['season_year','season'])
season_dict = dict(list(season_df))

In [9]:
season_dict[(2016,'jja')]['cloud_cover']
season_dict[(2016,'jja')].iloc[2]

scene            65
cloud_cover      15
season          jja
year           2016
season_year    2016
month             7
day              27
Name: 65, dtype: object

In [10]:
def find_min(a_df):
    """
    What does this function do?
    """
    min_row = a_df['cloud_cover'].argmin()
    return min_row

#
# explain this loop
#
out_list = []
for the_key, a_df in season_dict.items():
    min_row = find_min(a_df)
    min_scene = a_df.iloc[min_row]
    the_series = pd.Series(min_scene)
    out_list.append(the_series)
    
new_frame = pd.DataFrame.from_records(out_list, index='scene')

new_frame.head()
print(len(new_frame))

csv_filename = a301_lib.data_share / "ajb/landsat/ClearStHelens_search.csv"
new_frame.to_csv(csv_filename,index=False)

48


In [11]:
geotiff_dir = a301_lib.sat_data / "ajb/landsat/ndvi_geotiffs"
geotiff_dir.mkdir(exist_ok = True, parents=True)

In [12]:
import os
os.environ["GDAL_HTTP_COOKIEFILE"] = "./cookies.txt"
os.environ["GDAL_HTTP_COOKIEJAR"] = "./cookies.txt"

In [13]:
# Write analysis images
do_write=False
if do_write:
    the_window = Window(col_off=1928, row_off=2503, width=134, height=135)
    for row_num in np.arange(0,48):
        row = new_frame.iloc[row_num]
        year,month,day = row['year'],row['month'],row['day']
        the_date = f"{year:02d}-{month:02d}-{day:02d}"
        print(the_date)
        the_scene = get_landsat_dataset(the_date, ana_lon, ana_lat, the_window, bands = ['B04',"B05"]) 
        file_path = geotiff_dir / f"analysis/testlandsat_{the_date}_StHelens.nc"
        print(f"saving to {file_path}")
        the_scene.to_netcdf(file_path)

In [14]:
# Write control images
do_write=False
if do_write:
    the_window = Window(col_off=1770, row_off=2751, width=134, height=134)
    for row_num in np.arange(0,48):
        row = new_frame.iloc[row_num]
        year,month,day = row['year'],row['month'],row['day']
        the_date = f"{year:02d}-{month:02d}-{day:02d}"
        print(the_date)
        the_scene = get_landsat_dataset(the_date, con_lon, con_lat, the_window, bands = ['B04',"B05"]) 
        file_path = geotiff_dir / f"control/landsat_{the_date}_StHelens.nc"
        print(f"saving to {file_path}")
        the_scene.to_netcdf(file_path)

In [15]:
def calc_ndvi(the_ds):
    #
    # xarray was unhappy with the extra third dimension
    # for the landsat bands:  [1, nrows, ncols]
    # so squeeze it out
    #
    the_ds = the_ds.squeeze()
    fmask = the_ds['Fmask']
    NIR = the_ds['B05']*fmask.data
    RED = the_ds['B04']*fmask.data
    ndvi  = (NIR - RED)/(NIR + RED)
    #
    # Fmask doesn't find every bad pixel, so go ahead
    # and set pixels to np.nan for any ndvi not between 0-1
    #
    ndvi.data[ndvi.data < 0] = np.nan
    ndvi.data[ndvi.data > 1] = np.nan
    #
    # Make a new dataArray 
    #
    ndvi_array = xarray.DataArray(data = ndvi, dims = ["y","x"])
    #
    # you'll get nan conversion errors unless you specifiy nan as
    # your missing value
    #
    ndvi_array.rio.write_nodata(np.nan, inplace=True)
    #
    # copy the crs and affine transform from band 4
    #
    ndvi_array.rio.write_crs(RED.rio.crs, inplace=True)
    ndvi_array.rio.write_transform(RED.rio.transform(), inplace=True)
    #
    # add some attributes
    #
    ndvi_array = ndvi_array.assign_attrs({'day':the_ds.day,
                                          'band_name':'ndvi',
                                          'history':'written by write_ndvi notebook'})
    #
    # add the ndvi_array to the dataset and return
    #
    ndvi_dataset = the_ds.assign(variables = {'ndvi' : ndvi_array})
    return ndvi_dataset

def write_ndvi_array(in_dir, out_dir, write_it):
    '''
    Writes a new array to the geotiff netCDF files in "in_dir" and writes them to "out_dir"
    
    Parameters:
    -----------
    in_dir: PosixPath
        PosixPath to directory with the original netCDF for analysis
    
    out_dir: PosixPath
        PosixPath to the directoy to store the new netCDF files with NDVI array
    
    write_it: Boolean
        True to write files new files, False to not
        
    Returns:
    --------
    string message of result
    '''
    if write_it:
        in_files = list(in_dir.glob("*nc"))
        for the_file in in_files:
            the_ds = rioxarray.open_rasterio(the_file,mode = 'r',mask_and_scale = True)
            #
            # Give the file the same name, but put it in the new folder
            #
            out_file = out_dir / the_file.name
            new_ds = calc_ndvi(the_ds)
            new_ds.to_netcdf(out_file)
        return "dataset downloaded"
    else:
        return "dataset not downloaded"

In [16]:
in_dir = a301_lib.sat_data / "ajb/landsat/ndvi_geotiffs/analysis"
out_dir = a301_lib.sat_data / "ajb/landsat/ndvi_geotiffs_output/analysis"
in_dir.mkdir(exist_ok = True, parents=True)
out_dir.mkdir(exist_ok = True, parents=True)

write_ndvi_array(in_dir, out_dir, False)

'dataset not downloaded'

In [17]:
in_dir = a301_lib.sat_data / "ajb/landsat/ndvi_geotiffs/control"
out_dir = a301_lib.sat_data / "ajb/landsat/ndvi_geotiffs_output/control"
in_dir.mkdir(exist_ok = True, parents=True)
out_dir.mkdir(exist_ok = True, parents=True)

write_ndvi_array(in_dir, out_dir, False)

'dataset not downloaded'

In [18]:
STAC_URL = 'https://cmr.earthdata.nasa.gov/stac'
catalog = Client.open(f'{STAC_URL}/USGS_EROS/')
products = [c for c in catalog.get_children()]

In [19]:
for p in products: 
    print(f"{p.id}: {p.title}")


ISERV_1: International Space Station SERVIR Environmental Research and Visualization System V1
Landsat Level-1 Collection 2_Collection 2: Landsat Level-1 Collection 2
Landsat Level-2 Surface Reflectance Collection 2_Collection 2: Landsat Level-2 Surface Reflectance Collection 2
Landsat Level-2 Surface Temperature Collection 2_Collection 2: Landsat Level-2 Surface Temperature Collection 2
